In [3]:
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession.builder \
    .appName("ETL") \
    .getOrCreate()

In [7]:
data1 = "github_events_01.json"

In [8]:
data_2 = "github_events_02.json"

In [9]:
data_folder = "data"

In [11]:
data = spark.read.option("multiline", "true").json(data_folder)

In [12]:
data.show(5)

+--------------------+--------------------+-----------+--------------------+--------------------+------+--------------------+-----------------+
|               actor|          created_at|         id|                 org|             payload|public|                repo|             type|
+--------------------+--------------------+-----------+--------------------+--------------------+------+--------------------+-----------------+
|{https://avatars....|2022-08-17T15:52:40Z|23487963576|{https://avatars....|{started, null, n...|  true|{6296790, spring-...|       WatchEvent|
|{https://avatars....|2022-08-17T15:52:40Z|23487963624|                null|{null, null, null...|  true|{525860969, gurra...|      CreateEvent|
|{https://avatars....|2022-08-17T15:52:40Z|23487963529|                null|{null, e80c84c7bb...|  true|{350706029, afbel...|        PushEvent|
|{https://avatars....|2022-08-17T15:52:40Z|23487963558|{https://avatars....|{created, null, {...|  true|{226399669, CMSgo...|IssueCommen

In [13]:
data.printSchema()

root
 |-- actor: struct (nullable = true)
 |    |-- avatar_url: string (nullable = true)
 |    |-- display_login: string (nullable = true)
 |    |-- gravatar_id: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- login: string (nullable = true)
 |    |-- url: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- id: string (nullable = true)
 |-- org: struct (nullable = true)
 |    |-- avatar_url: string (nullable = true)
 |    |-- gravatar_id: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- login: string (nullable = true)
 |    |-- url: string (nullable = true)
 |-- payload: struct (nullable = true)
 |    |-- action: string (nullable = true)
 |    |-- before: string (nullable = true)
 |    |-- comment: struct (nullable = true)
 |    |    |-- author_association: string (nullable = true)
 |    |    |-- body: string (nullable = true)
 |    |    |-- created_at: string (nullable = true)
 |    |    |-- html_url: string (nullable 

In [14]:
data.createOrReplaceTempView("staging_events")

In [15]:
table = spark.sql("""
    select
        *
        
    from
        staging_events
""").show(10)

+--------------------+--------------------+-----------+--------------------+--------------------+------+--------------------+-----------------+
|               actor|          created_at|         id|                 org|             payload|public|                repo|             type|
+--------------------+--------------------+-----------+--------------------+--------------------+------+--------------------+-----------------+
|{https://avatars....|2022-08-17T15:52:40Z|23487963576|{https://avatars....|{started, null, n...|  true|{6296790, spring-...|       WatchEvent|
|{https://avatars....|2022-08-17T15:52:40Z|23487963624|                null|{null, null, null...|  true|{525860969, gurra...|      CreateEvent|
|{https://avatars....|2022-08-17T15:52:40Z|23487963529|                null|{null, e80c84c7bb...|  true|{350706029, afbel...|        PushEvent|
|{https://avatars....|2022-08-17T15:52:40Z|23487963558|{https://avatars....|{created, null, {...|  true|{226399669, CMSgo...|IssueCommen

In [17]:
#Create a actor table
table_actors = spark.sql("""
    select
        actor.id as actor_id
        , actor.login
        , actor.url
        , actor.display_login as display_name
    from
        staging_events
""")
destination = "actors"
table_actors.write.mode("overwrite").csv(destination)

In [19]:
table_actors.createOrReplaceTempView("table_actors")

In [21]:
table_actors.show(5)

+---------+----------+--------------------+------------+
| actor_id|     login|                 url|display_name|
+---------+----------+--------------------+------------+
| 44167236|evilgaoshu|https://api.githu...|  evilgaoshu|
|111333037|  gurram47|https://api.githu...|    gurram47|
| 65502770|afbeltranr|https://api.githu...|  afbeltranr|
| 99458559|  karla-vm|https://api.githu...|    karla-vm|
|  3787410|   hsluoyz|https://api.githu...|     hsluoyz|
+---------+----------+--------------------+------------+
only showing top 5 rows



In [22]:
#Create a repo table
table_repo = spark.sql("""
    select
        repo.id as repo_id
        , repo.name
        , repo.url
    from
        staging_events
""")
destination = "repo"
table_repo.write.mode("overwrite").csv(destination)

In [23]:
table_repo.createOrReplaceTempView("table_repo")

In [24]:
table_repo.show(5)

+---------+--------------------+--------------------+
|  repo_id|                name|                 url|
+---------+--------------------+--------------------+
|  6296790|spring-projects/s...|https://api.githu...|
|525860969|gurram47/AP201100...|https://api.githu...|
|350706029| afbeltranr/Agrilab2|https://api.githu...|
|226399669|CMSgov/cms-carts-...|https://api.githu...|
|521980272|casdoor/casdoor-c...|https://api.githu...|
+---------+--------------------+--------------------+
only showing top 5 rows



In [25]:
# Create table 'orgs'

table_org = spark.sql("""
    select
        org.id as org_id
        , org.login as org_name
        , org.url
    from
        staging_events
""")

destination = "org"
table_org.write.mode("overwrite").csv(destination)

In [26]:
table_org.createOrReplaceTempView("table_org")

In [27]:
table_org.show(5)

+--------+---------------+--------------------+
|  org_id|       org_name|                 url|
+--------+---------------+--------------------+
|  317776|spring-projects|https://api.githu...|
|    null|           null|                null|
|    null|           null|                null|
| 3209407|         CMSgov|https://api.githu...|
|72992104|        casdoor|https://api.githu...|
+--------+---------------+--------------------+
only showing top 5 rows



In [29]:
# Create table 'events'

table_events = spark.sql("""
    select
        id
        , type
        , public
        , created_at
        , actor.id as actor_id
        , repo.id as repo_id
        , org.id as org_id
    from
        staging_events
""")

destination = "events"
table_events.write.mode("overwrite").csv(destination)

In [30]:
table_events.createOrReplaceTempView("table_events")

In [32]:
table_events.show(5)

+-----------+-----------------+------+--------------------+---------+---------+--------+
|         id|             type|public|          created_at| actor_id|  repo_id|  org_id|
+-----------+-----------------+------+--------------------+---------+---------+--------+
|23487963576|       WatchEvent|  true|2022-08-17T15:52:40Z| 44167236|  6296790|  317776|
|23487963624|      CreateEvent|  true|2022-08-17T15:52:40Z|111333037|525860969|    null|
|23487963529|        PushEvent|  true|2022-08-17T15:52:40Z| 65502770|350706029|    null|
|23487963558|IssueCommentEvent|  true|2022-08-17T15:52:40Z| 99458559|226399669| 3209407|
|23487963581| PullRequestEvent|  true|2022-08-17T15:52:40Z|  3787410|521980272|72992104|
+-----------+-----------------+------+--------------------+---------+---------+--------+
only showing top 5 rows



In [35]:
spark.sql("""
    select
        table_events.created_at
        ,table_events.public
        , table_actors.display_name
        , table_repo.name
        , table_repo.url
        , table_org.org_name
    from table_events
    inner join table_actors
        on table_actors.actor_id = table_events.actor_id
    inner join table_repo
        on table_repo.repo_id = table_events.repo_id
    left join table_org
        on table_org.org_id = table_events.org_id

""").show()

+--------------------+------+--------------+--------------------+--------------------+---------------+
|          created_at|public|  display_name|                name|                 url|       org_name|
+--------------------+------+--------------+--------------------+--------------------+---------------+
|2022-08-17T15:52:40Z|  true|    evilgaoshu|spring-projects/s...|https://api.githu...|spring-projects|
|2022-08-17T15:52:40Z|  true|      gurram47|gurram47/AP201100...|https://api.githu...|           null|
|2022-08-17T15:52:40Z|  true|    afbeltranr| afbeltranr/Agrilab2|https://api.githu...|           null|
|2022-08-17T15:52:40Z|  true|      karla-vm|CMSgov/cms-carts-...|https://api.githu...|         CMSgov|
|2022-08-17T15:52:40Z|  true|       hsluoyz|casdoor/casdoor-c...|https://api.githu...|        casdoor|
|2022-08-17T15:52:40Z|  true|       mnw1020|    mnw1020/obsidian|https://api.githu...|           null|
|2022-08-17T15:52:40Z|  true|        ikjo93|ikjo93/Data-Struc...|https://