# ETL with Spark (Local)

In [None]:
from pyspark.sql import SparkSession


In [None]:
import pandas as pd
import glob

In [None]:
spark = SparkSession.builder \
    .appName("ETL") \
    .getOrCreate()

In [None]:
data_folder = "data"

In [None]:
data = spark.read.option("multiline", "true").json(data_folder)

In [None]:
data.show(10)

+--------------------+--------------------+-----------+--------------------+--------------------+------+--------------------+-----------------+
|               actor|          created_at|         id|                 org|             payload|public|                repo|             type|
+--------------------+--------------------+-----------+--------------------+--------------------+------+--------------------+-----------------+
|{https://avatars....|2022-08-17T15:52:40Z|23487963576|{https://avatars....|{started, null, n...|  true|{6296790, spring-...|       WatchEvent|
|{https://avatars....|2022-08-17T15:52:40Z|23487963624|                null|{null, null, null...|  true|{525860969, gurra...|      CreateEvent|
|{https://avatars....|2022-08-17T15:52:40Z|23487963529|                null|{null, e80c84c7bb...|  true|{350706029, afbel...|        PushEvent|
|{https://avatars....|2022-08-17T15:52:40Z|23487963558|{https://avatars....|{created, null, {...|  true|{226399669, CMSgo...|IssueCommen

In [None]:
data.printSchema()

root
 |-- actor: struct (nullable = true)
 |    |-- avatar_url: string (nullable = true)
 |    |-- display_login: string (nullable = true)
 |    |-- gravatar_id: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- login: string (nullable = true)
 |    |-- url: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- id: string (nullable = true)
 |-- org: struct (nullable = true)
 |    |-- avatar_url: string (nullable = true)
 |    |-- gravatar_id: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- login: string (nullable = true)
 |    |-- url: string (nullable = true)
 |-- payload: struct (nullable = true)
 |    |-- action: string (nullable = true)
 |    |-- before: string (nullable = true)
 |    |-- comment: struct (nullable = true)
 |    |    |-- author_association: string (nullable = true)
 |    |    |-- body: string (nullable = true)
 |    |    |-- created_at: string (nullable = true)
 |    |    |-- html_url: string (nullable 

In [None]:
#Query
data.select("id", "type","created_at").show(10)

+-----------+-----------------+--------------------+
|         id|             type|          created_at|
+-----------+-----------------+--------------------+
|23487963576|       WatchEvent|2022-08-17T15:52:40Z|
|23487963624|      CreateEvent|2022-08-17T15:52:40Z|
|23487963529|        PushEvent|2022-08-17T15:52:40Z|
|23487963558|IssueCommentEvent|2022-08-17T15:52:40Z|
|23487963581| PullRequestEvent|2022-08-17T15:52:40Z|
|23487963532|        PushEvent|2022-08-17T15:52:40Z|
|23487963524|        PushEvent|2022-08-17T15:52:40Z|
|23487963526|        PushEvent|2022-08-17T15:52:40Z|
|23487963492|        PushEvent|2022-08-17T15:52:40Z|
|23487963504|      DeleteEvent|2022-08-17T15:52:40Z|
+-----------+-----------------+--------------------+
only showing top 10 rows



In [None]:
# Create table staging_events
data.createOrReplaceTempView("staging_events")

In [None]:
# Query table
table_staging_events = spark.sql("""
    select
        id
        , type
        , created_at
        , to_date(created_at) as date
        , year(created_at) as year
        , actor.login
        , actor.url as actor_url
        , repo.name as repo_name
        , repo.url as repo_url
        
    from
        staging_events
""")
table_staging_events.show(10)

+-----------+-----------------+--------------------+----------+----+------------+--------------------+--------------------+--------------------+
|         id|             type|          created_at|      date|year|       login|           actor_url|           repo_name|            repo_url|
+-----------+-----------------+--------------------+----------+----+------------+--------------------+--------------------+--------------------+
|23487963576|       WatchEvent|2022-08-17T15:52:40Z|2022-08-17|2022|  evilgaoshu|https://api.githu...|spring-projects/s...|https://api.githu...|
|23487963624|      CreateEvent|2022-08-17T15:52:40Z|2022-08-17|2022|    gurram47|https://api.githu...|gurram47/AP201100...|https://api.githu...|
|23487963529|        PushEvent|2022-08-17T15:52:40Z|2022-08-17|2022|  afbeltranr|https://api.githu...| afbeltranr/Agrilab2|https://api.githu...|
|23487963558|IssueCommentEvent|2022-08-17T15:52:40Z|2022-08-17|2022|    karla-vm|https://api.githu...|CMSgov/cms-carts-...|https:/

In [None]:
output_csv = "output_csv"
table_staging_events.write.partitionBy("year").mode("overwrite").csv(output_csv)

In [None]:
# Create table actors

table_actor = spark.sql("""
    select
        actor.id
        , actor.display_login
        , actor.url
        , created_at
        , to_date(created_at) as date
        , year(created_at) as year
        
    from
        staging_events
""")

table_actor.createOrReplaceTempView("actors")

In [None]:
destination = "actors"

In [None]:
table_actor.write.partitionBy("year").mode("overwrite").csv(destination)

In [None]:
table_actor.show(10)

+---------+-------------+--------------------+--------------------+----------+----+
|       id|display_login|                 url|          created_at|      date|year|
+---------+-------------+--------------------+--------------------+----------+----+
| 44167236|   evilgaoshu|https://api.githu...|2022-08-17T15:52:40Z|2022-08-17|2022|
|111333037|     gurram47|https://api.githu...|2022-08-17T15:52:40Z|2022-08-17|2022|
| 65502770|   afbeltranr|https://api.githu...|2022-08-17T15:52:40Z|2022-08-17|2022|
| 99458559|     karla-vm|https://api.githu...|2022-08-17T15:52:40Z|2022-08-17|2022|
|  3787410|      hsluoyz|https://api.githu...|2022-08-17T15:52:40Z|2022-08-17|2022|
| 11596368|      mnw1020|https://api.githu...|2022-08-17T15:52:40Z|2022-08-17|2022|
| 82401504|       ikjo93|https://api.githu...|2022-08-17T15:52:40Z|2022-08-17|2022|
| 60316309|      Gabe616|https://api.githu...|2022-08-17T15:52:40Z|2022-08-17|2022|
| 37953029| BadProfessor|https://api.githu...|2022-08-17T15:52:40Z|2022-08-1

In [None]:
# Create table events

table_event = spark.sql("""
    select
        id
        , type
        , created_at
        , day(created_at) as day
        , month(created_at) as month
        , year(created_at) as year
        , date(created_at) as date
    from
        staging_events
""")
table_event.createOrReplaceTempView("events")

In [None]:
destination1 = "events"

In [None]:
table_event.write.partitionBy("month").mode("overwrite").csv(destination1)

In [None]:
table_event.show(10)

+-----------+-----------------+--------------------+---+-----+----+----------+
|         id|             type|          created_at|day|month|year|      date|
+-----------+-----------------+--------------------+---+-----+----+----------+
|23487963576|       WatchEvent|2022-08-17T15:52:40Z| 17|    8|2022|2022-08-17|
|23487963624|      CreateEvent|2022-08-17T15:52:40Z| 17|    8|2022|2022-08-17|
|23487963529|        PushEvent|2022-08-17T15:52:40Z| 17|    8|2022|2022-08-17|
|23487963558|IssueCommentEvent|2022-08-17T15:52:40Z| 17|    8|2022|2022-08-17|
|23487963581| PullRequestEvent|2022-08-17T15:52:40Z| 17|    8|2022|2022-08-17|
|23487963532|        PushEvent|2022-08-17T15:52:40Z| 17|    8|2022|2022-08-17|
|23487963524|        PushEvent|2022-08-17T15:52:40Z| 17|    8|2022|2022-08-17|
|23487963526|        PushEvent|2022-08-17T15:52:40Z| 17|    8|2022|2022-08-17|
|23487963492|        PushEvent|2022-08-17T15:52:40Z| 17|    8|2022|2022-08-17|
|23487963504|      DeleteEvent|2022-08-17T15:52:40Z|

In [None]:
# Create table repos

table_repo = spark.sql("""
    select
        repo.id
        , repo.name
        , repo.url
        , created_at
        , day(created_at) as day
        , month(created_at) as month
        , year(created_at) as year
        , date(created_at) as date
    from
        staging_events
""")

table_repo.createOrReplaceTempView("repos")

In [None]:
destination2 = "repos"

In [None]:
table_repo.write.partitionBy("month").mode("overwrite").csv(destination2)

In [None]:
table_repo.show(10)

+---------+--------------------+--------------------+--------------------+---+-----+----+----------+
|       id|                name|                 url|          created_at|day|month|year|      date|
+---------+--------------------+--------------------+--------------------+---+-----+----+----------+
|  6296790|spring-projects/s...|https://api.githu...|2022-08-17T15:52:40Z| 17|    8|2022|2022-08-17|
|525860969|gurram47/AP201100...|https://api.githu...|2022-08-17T15:52:40Z| 17|    8|2022|2022-08-17|
|350706029| afbeltranr/Agrilab2|https://api.githu...|2022-08-17T15:52:40Z| 17|    8|2022|2022-08-17|
|226399669|CMSgov/cms-carts-...|https://api.githu...|2022-08-17T15:52:40Z| 17|    8|2022|2022-08-17|
|521980272|casdoor/casdoor-c...|https://api.githu...|2022-08-17T15:52:40Z| 17|    8|2022|2022-08-17|
|438998478|    mnw1020/obsidian|https://api.githu...|2022-08-17T15:52:40Z| 17|    8|2022|2022-08-17|
|525362201|ikjo93/Data-Struc...|https://api.githu...|2022-08-17T15:52:40Z| 17|    8|2022|20