# ETL with Spark (Local)

In [1]:
from pyspark.sql import SparkSession
# from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType, DateType, TimestampType

# import pyspark.sql.functions as F

In [2]:
spark = SparkSession.builder \
    .appName("ETL") \
    .getOrCreate()

In [3]:
data = "github_events_01.json"

In [4]:
data_2 = "github_events_02.json"

In [5]:
data_folder = "data"

In [6]:
data = spark.read.option("multiline", "true").json(data_folder)

In [7]:
data.show(3)

+--------------------+--------------------+-----------+--------------------+--------------------+------+--------------------+-----------+
|               actor|          created_at|         id|                 org|             payload|public|                repo|       type|
+--------------------+--------------------+-----------+--------------------+--------------------+------+--------------------+-----------+
|{https://avatars....|2022-08-17T15:52:40Z|23487963576|{https://avatars....|{started, null, n...|  true|{6296790, spring-...| WatchEvent|
|{https://avatars....|2022-08-17T15:52:40Z|23487963624|                null|{null, null, null...|  true|{525860969, gurra...|CreateEvent|
|{https://avatars....|2022-08-17T15:52:40Z|23487963529|                null|{null, e80c84c7bb...|  true|{350706029, afbel...|  PushEvent|
+--------------------+--------------------+-----------+--------------------+--------------------+------+--------------------+-----------+
only showing top 3 rows



In [8]:
data.printSchema()

root
 |-- actor: struct (nullable = true)
 |    |-- avatar_url: string (nullable = true)
 |    |-- display_login: string (nullable = true)
 |    |-- gravatar_id: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- login: string (nullable = true)
 |    |-- url: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- id: string (nullable = true)
 |-- org: struct (nullable = true)
 |    |-- avatar_url: string (nullable = true)
 |    |-- gravatar_id: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- login: string (nullable = true)
 |    |-- url: string (nullable = true)
 |-- payload: struct (nullable = true)
 |    |-- action: string (nullable = true)
 |    |-- before: string (nullable = true)
 |    |-- comment: struct (nullable = true)
 |    |    |-- author_association: string (nullable = true)
 |    |    |-- body: string (nullable = true)
 |    |    |-- created_at: string (nullable = true)
 |    |    |-- html_url: string (nullable 

In [9]:
data.select("id", "type").show()

+-----------+--------------------+
|         id|                type|
+-----------+--------------------+
|23487963576|          WatchEvent|
|23487963624|         CreateEvent|
|23487963529|           PushEvent|
|23487963558|   IssueCommentEvent|
|23487963581|    PullRequestEvent|
|23487963532|           PushEvent|
|23487963524|           PushEvent|
|23487963526|           PushEvent|
|23487963492|           PushEvent|
|23487963504|         DeleteEvent|
|23487963536|PullRequestReview...|
|23487963495|         CreateEvent|
|23487963522|           PushEvent|
|23487963444|           PushEvent|
|23487963462|    PullRequestEvent|
|23487963480|         IssuesEvent|
|23487963457|           PushEvent|
|23487963413|           PushEvent|
|23487963429|           PushEvent|
|23487963448|           PushEvent|
+-----------+--------------------+
only showing top 20 rows



In [10]:
data.createOrReplaceTempView("staging_events")

In [11]:
table = spark.sql("""
    select
        *
        
    from
        staging_events
""").show(3)

+--------------------+--------------------+-----------+--------------------+--------------------+------+--------------------+-----------+
|               actor|          created_at|         id|                 org|             payload|public|                repo|       type|
+--------------------+--------------------+-----------+--------------------+--------------------+------+--------------------+-----------+
|{https://avatars....|2022-08-17T15:52:40Z|23487963576|{https://avatars....|{started, null, n...|  true|{6296790, spring-...| WatchEvent|
|{https://avatars....|2022-08-17T15:52:40Z|23487963624|                null|{null, null, null...|  true|{525860969, gurra...|CreateEvent|
|{https://avatars....|2022-08-17T15:52:40Z|23487963529|                null|{null, e80c84c7bb...|  true|{350706029, afbel...|  PushEvent|
+--------------------+--------------------+-----------+--------------------+--------------------+------+--------------------+-----------+
only showing top 3 rows



In [12]:
#Create a actor table
table_actor = spark.sql("""
    select
        actor.id as actor_id
        , actor.login
        , actor.display_login as display_name
        , actor.url
        , actor.avatar_url
    from
        staging_events
""")
destination = "actors"
table_actor.write.mode("overwrite").csv(destination)

In [13]:
table_actor.createOrReplaceTempView("table_actor")

In [14]:
table_actor.show(3)

+---------+----------+------------+--------------------+--------------------+
| actor_id|     login|display_name|                 url|          avatar_url|
+---------+----------+------------+--------------------+--------------------+
| 44167236|evilgaoshu|  evilgaoshu|https://api.githu...|https://avatars.g...|
|111333037|  gurram47|    gurram47|https://api.githu...|https://avatars.g...|
| 65502770|afbeltranr|  afbeltranr|https://api.githu...|https://avatars.g...|
+---------+----------+------------+--------------------+--------------------+
only showing top 3 rows



In [15]:
#Create a repo table
table_repo = spark.sql("""
    select
        repo.id as repo_id
        , repo.name
        , repo.url
    from
        staging_events
""")
destination = "repo"
table_repo.write.mode("overwrite").csv(destination)

In [16]:
table_repo.createOrReplaceTempView("table_repo")

In [17]:
table_repo.show(3)

+---------+--------------------+--------------------+
|  repo_id|                name|                 url|
+---------+--------------------+--------------------+
|  6296790|spring-projects/s...|https://api.githu...|
|525860969|gurram47/AP201100...|https://api.githu...|
|350706029| afbeltranr/Agrilab2|https://api.githu...|
+---------+--------------------+--------------------+
only showing top 3 rows



In [18]:
# Create table 'orgs'

table_org = spark.sql("""
    select
        org.id as org_id
        , org.avatar_url
        , org.gravatar_id
        , org.login as organize_name
        , org.url
    from
        staging_events
""")

destination = "org"
table_org.write.mode("overwrite").csv(destination)

In [19]:
table_org.createOrReplaceTempView("table_org")

In [20]:
table_org.show(3)

+------+--------------------+-----------+---------------+--------------------+
|org_id|          avatar_url|gravatar_id|  organize_name|                 url|
+------+--------------------+-----------+---------------+--------------------+
|317776|https://avatars.g...|           |spring-projects|https://api.githu...|
|  null|                null|       null|           null|                null|
|  null|                null|       null|           null|                null|
+------+--------------------+-----------+---------------+--------------------+
only showing top 3 rows



In [21]:
# Create table 'events'

table_event = spark.sql("""
    select
        id
        , created_at
        , public
        , type
        , actor.id as id_actor
        , repo.id as id_repo
        , org.id as id_org
    from
        staging_events
""")

destination = "event"
table_event.write.mode("overwrite").csv(destination)

In [22]:
table_event.createOrReplaceTempView("table_event")

In [23]:
table_event.show(3)

+-----------+--------------------+------+-----------+---------+---------+------+
|         id|          created_at|public|       type| id_actor|  id_repo|id_org|
+-----------+--------------------+------+-----------+---------+---------+------+
|23487963576|2022-08-17T15:52:40Z|  true| WatchEvent| 44167236|  6296790|317776|
|23487963624|2022-08-17T15:52:40Z|  true|CreateEvent|111333037|525860969|  null|
|23487963529|2022-08-17T15:52:40Z|  true|  PushEvent| 65502770|350706029|  null|
+-----------+--------------------+------+-----------+---------+---------+------+
only showing top 3 rows



In [24]:
spark.sql("""
    select
        table_event.created_at
        , table_actor.display_name
        , table_repo.name
        , table_repo.url
        , table_org.organize_name
    from table_event
    inner join table_actor
        on table_actor.actor_id = table_event.id_actor
    inner join table_repo
        on table_repo.repo_id = table_event.id_repo
    left join table_org
        on table_org.org_id = table_event.id_org

""").show()

+--------------------+--------------+--------------------+--------------------+---------------+
|          created_at|  display_name|                name|                 url|  organize_name|
+--------------------+--------------+--------------------+--------------------+---------------+
|2022-08-17T15:52:40Z|    evilgaoshu|spring-projects/s...|https://api.githu...|spring-projects|
|2022-08-17T15:52:40Z|      gurram47|gurram47/AP201100...|https://api.githu...|           null|
|2022-08-17T15:52:40Z|    afbeltranr| afbeltranr/Agrilab2|https://api.githu...|           null|
|2022-08-17T15:52:40Z|      karla-vm|CMSgov/cms-carts-...|https://api.githu...|         CMSgov|
|2022-08-17T15:52:40Z|       hsluoyz|casdoor/casdoor-c...|https://api.githu...|        casdoor|
|2022-08-17T15:52:40Z|       mnw1020|    mnw1020/obsidian|https://api.githu...|           null|
|2022-08-17T15:52:40Z|        ikjo93|ikjo93/Data-Struc...|https://api.githu...|           null|
|2022-08-17T15:52:40Z|       Gabe616|Gab