# ETL with Spark (Local)

In [1]:
from pyspark.sql import SparkSession
# from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType, DateType, TimestampType

# import pyspark.sql.functions as F
import pandas as pd
import glob

In [72]:
data = "github_events_01.json"
data_2 = "github_events_02.json"

In [75]:
spark = SparkSession.builder \
    .appName("ETL") \
    .getOrCreate()

In [None]:
data_folder = "data"

['data/github_events_01.json', 'data/github_events_02.json']

In [None]:
data = spark.read.option("multiline", "true").json(data_folder)

In [None]:
data.show()

In [None]:
data.printSchema()

In [None]:
data.select("id", "type").show()

In [None]:
data.createOrReplaceTempView("staging_events")

In [None]:
table = spark.sql("""
    select
        *
        
    from
        staging_events
""").show(1)

In [None]:
actors_output_csv = "../actors/output_csv"
actors_output_parquet = "../actors/output_parquet"

table_actors = spark.sql("""
    select
        actor.id
        , actor.login 
        , actor.display_login 
        , actor.gravatar_id 
        , actor.url 
        , actor.avatar_url
        , type as type
    
    from
        staging_events
""")

table_actors.show(5)

In [None]:
table_actors.write.partitionBy("type").mode("overwrite").csv(actors_output_csv)
table_actors.write.partitionBy("type").mode("overwrite").csv(actors_output_parquet)

In [None]:
orgs_output_csv = "../orgs/output_csv"
orgs_output_parquet = "../orgs/output_parquet"

table_orgs = spark.sql("""
    select
        org.id
        , org.login
        , org.gravatar_id 
        , org.url 
        , org.avatar_url 
        , type as type
    
    from
        staging_events
""")

table_orgs.show(5)

In [None]:
table_orgs.write.partitionBy("type").mode("overwrite").csv(orgs_output_csv)
table_orgs.write.partitionBy("type").mode("overwrite").csv(orgs_output_parquet)

In [None]:
repos_output_csv = "../repos/output_csv"
repos_output_parquet = "../repos/output_parquet"

table_repos = spark.sql("""
    select
        repo.id
        , repo.name
        , repo.url
        , type as type
    
    from
        staging_events
""")

table_repos.show(5)

In [None]:
table_repos.write.partitionBy("type").mode("overwrite").csv(repos_output_csv)
table_repos.write.partitionBy("type").mode("overwrite").csv(repos_output_parquet)

In [None]:
events_output_csv = "../events/output_csv"
events_output_parquet = "../events/output_parquet"

table_events = spark.sql("""
    select
        id
        , repo.id as repo_id
        , org.id as org_id
        , actor.id as actor_id 
        , type as type
        , public
        , timestamp(created_at) as datetime

    from
        staging_events
""")

table_events.show(5)

In [None]:
table_events.write.partitionBy("type").mode("overwrite").csv(events_output_csv)
table_events.write.partitionBy("type").mode("overwrite").csv(events_output_parquet)

In [None]:
table_actors.createOrReplaceTempView("actors")
table_orgs.createOrReplaceTempView("orgs")
table_repos.createOrReplaceTempView("repos")
table_events.createOrReplaceTempView("events")

In [None]:
df = spark.sql("""
    select
        events.type
        , datetime
        , actors.id as 
        
    from events
    join actors
    on
        actors.type  = events.type and actors.id  = events.actor_id 
    where events.type = 'CreateEvent'
""")

df.show()