# ETL with Spark (Local)

In [40]:
from pyspark.sql import SparkSession
# from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType, DateType, TimestampType

# import pyspark.sql.functions as F

In [41]:
import pandas as pd
import glob

In [42]:
data = "github_events_01.json"

In [43]:
data_2 = "github_events_02.json"

In [44]:
spark = SparkSession.builder \
    .appName("ETL") \
    .getOrCreate()

In [45]:
data_folder = "data"

In [46]:
data = spark.read.option("multiline", "true").json(data_folder)

In [47]:
# data = spark.read.option("multiline", "true").json(data_2)

In [48]:
data.show(5)

+--------------------+--------------------+-----------+--------------------+--------------------+------+--------------------+-----------------+
|               actor|          created_at|         id|                 org|             payload|public|                repo|             type|
+--------------------+--------------------+-----------+--------------------+--------------------+------+--------------------+-----------------+
|{https://avatars....|2022-08-17T15:52:40Z|23487963576|{https://avatars....|{started, null, n...|  true|{6296790, spring-...|       WatchEvent|
|{https://avatars....|2022-08-17T15:52:40Z|23487963624|                null|{null, null, null...|  true|{525860969, gurra...|      CreateEvent|
|{https://avatars....|2022-08-17T15:52:40Z|23487963529|                null|{null, e80c84c7bb...|  true|{350706029, afbel...|        PushEvent|
|{https://avatars....|2022-08-17T15:52:40Z|23487963558|{https://avatars....|{created, null, {...|  true|{226399669, CMSgo...|IssueCommen

In [49]:
data.printSchema()

root
 |-- actor: struct (nullable = true)
 |    |-- avatar_url: string (nullable = true)
 |    |-- display_login: string (nullable = true)
 |    |-- gravatar_id: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- login: string (nullable = true)
 |    |-- url: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- id: string (nullable = true)
 |-- org: struct (nullable = true)
 |    |-- avatar_url: string (nullable = true)
 |    |-- gravatar_id: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- login: string (nullable = true)
 |    |-- url: string (nullable = true)
 |-- payload: struct (nullable = true)
 |    |-- action: string (nullable = true)
 |    |-- before: string (nullable = true)
 |    |-- comment: struct (nullable = true)
 |    |    |-- author_association: string (nullable = true)
 |    |    |-- body: string (nullable = true)
 |    |    |-- created_at: string (nullable = true)
 |    |    |-- html_url: string (nullable 

In [50]:
data.select("id", "type").show(5)

+-----------+-----------------+
|         id|             type|
+-----------+-----------------+
|23487963576|       WatchEvent|
|23487963624|      CreateEvent|
|23487963529|        PushEvent|
|23487963558|IssueCommentEvent|
|23487963581| PullRequestEvent|
+-----------+-----------------+
only showing top 5 rows



In [51]:
data.createOrReplaceTempView("staging_events")

In [53]:
table = spark.sql("""
    select
        *
        
    from
        staging_events
""").show(5)

+--------------------+--------------------+-----------+--------------------+--------------------+------+--------------------+-----------------+
|               actor|          created_at|         id|                 org|             payload|public|                repo|             type|
+--------------------+--------------------+-----------+--------------------+--------------------+------+--------------------+-----------------+
|{https://avatars....|2022-08-17T15:52:40Z|23487963576|{https://avatars....|{started, null, n...|  true|{6296790, spring-...|       WatchEvent|
|{https://avatars....|2022-08-17T15:52:40Z|23487963624|                null|{null, null, null...|  true|{525860969, gurra...|      CreateEvent|
|{https://avatars....|2022-08-17T15:52:40Z|23487963529|                null|{null, e80c84c7bb...|  true|{350706029, afbel...|        PushEvent|
|{https://avatars....|2022-08-17T15:52:40Z|23487963558|{https://avatars....|{created, null, {...|  true|{226399669, CMSgo...|IssueCommen

In [55]:
table = spark.sql("""
    select
        id
        , type
        , created_at
        , to_date(created_at) as date
        , year(created_at) as year
        , actor.login
        , actor.url as actor_url
        , repo.name
        , repo.url as repo_url
        , org.login as org_name
        , org.url as org_url
        
    from
        staging_events
""")

In [57]:
table.show(5)

+-----------+-----------------+--------------------+----------+----+----------+--------------------+--------------------+--------------------+---------------+--------------------+
|         id|             type|          created_at|      date|year|     login|           actor_url|                name|            repo_url|       org_name|             org_url|
+-----------+-----------------+--------------------+----------+----+----------+--------------------+--------------------+--------------------+---------------+--------------------+
|23487963576|       WatchEvent|2022-08-17T15:52:40Z|2022-08-17|2022|evilgaoshu|https://api.githu...|spring-projects/s...|https://api.githu...|spring-projects|https://api.githu...|
|23487963624|      CreateEvent|2022-08-17T15:52:40Z|2022-08-17|2022|  gurram47|https://api.githu...|gurram47/AP201100...|https://api.githu...|           null|                null|
|23487963529|        PushEvent|2022-08-17T15:52:40Z|2022-08-17|2022|afbeltranr|https://api.githu...|

In [58]:
output_csv = "../output_csv"
output_parquet = "../output_parquet"

In [59]:
table.write.partitionBy("year").mode("overwrite").csv(output_csv)

In [60]:
table.write.partitionBy("year").mode("overwrite").parquet(output_parquet)

In [64]:
table_events = spark.sql("""
    select
        id
        , type
        , created_at
        , day(created_at) as day
        , month(created_at) as month
        , year(created_at) as year
        , date(created_at) as date
    from
        staging_events
""")

In [65]:
table_events.show(5)

+-----------+-----------------+--------------------+---+-----+----+----------+
|         id|             type|          created_at|day|month|year|      date|
+-----------+-----------------+--------------------+---+-----+----+----------+
|23487963576|       WatchEvent|2022-08-17T15:52:40Z| 17|    8|2022|2022-08-17|
|23487963624|      CreateEvent|2022-08-17T15:52:40Z| 17|    8|2022|2022-08-17|
|23487963529|        PushEvent|2022-08-17T15:52:40Z| 17|    8|2022|2022-08-17|
|23487963558|IssueCommentEvent|2022-08-17T15:52:40Z| 17|    8|2022|2022-08-17|
|23487963581| PullRequestEvent|2022-08-17T15:52:40Z| 17|    8|2022|2022-08-17|
+-----------+-----------------+--------------------+---+-----+----+----------+
only showing top 5 rows



In [66]:
destination = "../events"

In [67]:
table_events.write.partitionBy("year", "month", "day").mode("overwrite").csv(destination)

In [68]:
table_events.write.partitionBy("date").mode("overwrite").csv(destination)

In [69]:
table_actors = spark.sql("""
    select
        actor.login
        , id as event_id
        , actor.url as actor_url
    from
        staging_events
""")
destination = "../actors"
table_actors.write.mode("overwrite").csv(destination)

In [72]:
table_actors.show(5)

+----------+-----------+--------------------+
|     login|   event_id|           actor_url|
+----------+-----------+--------------------+
|evilgaoshu|23487963576|https://api.githu...|
|  gurram47|23487963624|https://api.githu...|
|afbeltranr|23487963529|https://api.githu...|
|  karla-vm|23487963558|https://api.githu...|
|   hsluoyz|23487963581|https://api.githu...|
+----------+-----------+--------------------+
only showing top 5 rows



In [70]:
table_repos = spark.sql("""
    select
        repo.name
        , id as event_id
        , repo.url as repo_url
        
    from
        staging_events
""")
destination = "../repos"
table_repos.write.mode("overwrite").csv(destination)

In [73]:
table_repos.show(5)

+--------------------+-----------+--------------------+
|                name|   event_id|            repo_url|
+--------------------+-----------+--------------------+
|spring-projects/s...|23487963576|https://api.githu...|
|gurram47/AP201100...|23487963624|https://api.githu...|
| afbeltranr/Agrilab2|23487963529|https://api.githu...|
|CMSgov/cms-carts-...|23487963558|https://api.githu...|
|casdoor/casdoor-c...|23487963581|https://api.githu...|
+--------------------+-----------+--------------------+
only showing top 5 rows



In [71]:
table_orgs = spark.sql("""
    select
        org.login as org_name
        , id as event_id
        , org.url as org_url
        
    from
        staging_events
""")
destination = "../orgs"
table_orgs.write.mode("overwrite").csv(destination)

In [74]:
table_orgs.show(5)

+---------------+-----------+--------------------+
|       org_name|   event_id|             org_url|
+---------------+-----------+--------------------+
|spring-projects|23487963576|https://api.githu...|
|           null|23487963624|                null|
|           null|23487963529|                null|
|         CMSgov|23487963558|https://api.githu...|
|        casdoor|23487963581|https://api.githu...|
+---------------+-----------+--------------------+
only showing top 5 rows

