In [5]:
from pyspark.sql import SparkSession
# from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType, DateType, TimestampType

# import pyspark.sql.functions as F

In [6]:
import pandas as pd
import glob

In [7]:
spark = SparkSession.builder \
    .appName("etl") \
    .getOrCreate()

In [10]:
data_folder = "work/data"

In [12]:
data = spark.read.option("multiline", "true").json(data_folder)
data.show(3)

+--------------------+--------------------+-----------+--------------------+--------------------+------+--------------------+-----------+
|               actor|          created_at|         id|                 org|             payload|public|                repo|       type|
+--------------------+--------------------+-----------+--------------------+--------------------+------+--------------------+-----------+
|{https://avatars....|2022-08-17T15:52:40Z|23487963576|{https://avatars....|{started, null, n...|  true|{6296790, spring-...| WatchEvent|
|{https://avatars....|2022-08-17T15:52:40Z|23487963624|                null|{null, null, null...|  true|{525860969, gurra...|CreateEvent|
|{https://avatars....|2022-08-17T15:52:40Z|23487963529|                null|{null, e80c84c7bb...|  true|{350706029, afbel...|  PushEvent|
+--------------------+--------------------+-----------+--------------------+--------------------+------+--------------------+-----------+
only showing top 3 rows



In [13]:
data.printSchema()

root
 |-- actor: struct (nullable = true)
 |    |-- avatar_url: string (nullable = true)
 |    |-- display_login: string (nullable = true)
 |    |-- gravatar_id: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- login: string (nullable = true)
 |    |-- url: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- id: string (nullable = true)
 |-- org: struct (nullable = true)
 |    |-- avatar_url: string (nullable = true)
 |    |-- gravatar_id: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- login: string (nullable = true)
 |    |-- url: string (nullable = true)
 |-- payload: struct (nullable = true)
 |    |-- action: string (nullable = true)
 |    |-- before: string (nullable = true)
 |    |-- comment: struct (nullable = true)
 |    |    |-- author_association: string (nullable = true)
 |    |    |-- body: string (nullable = true)
 |    |    |-- created_at: string (nullable = true)
 |    |    |-- html_url: string (nullable 

In [14]:
data.select("id", "type").show()

+-----------+--------------------+
|         id|                type|
+-----------+--------------------+
|23487963576|          WatchEvent|
|23487963624|         CreateEvent|
|23487963529|           PushEvent|
|23487963558|   IssueCommentEvent|
|23487963581|    PullRequestEvent|
|23487963532|           PushEvent|
|23487963524|           PushEvent|
|23487963526|           PushEvent|
|23487963492|           PushEvent|
|23487963504|         DeleteEvent|
|23487963536|PullRequestReview...|
|23487963495|         CreateEvent|
|23487963522|           PushEvent|
|23487963444|           PushEvent|
|23487963462|    PullRequestEvent|
|23487963480|         IssuesEvent|
|23487963457|           PushEvent|
|23487963413|           PushEvent|
|23487963429|           PushEvent|
|23487963448|           PushEvent|
+-----------+--------------------+
only showing top 20 rows



In [15]:
data.createOrReplaceTempView("staging_events")

In [16]:
table = spark.sql("""
    select
        *
        
    from
        staging_events
""").show()

+--------------------+--------------------+-----------+--------------------+--------------------+------+--------------------+--------------------+
|               actor|          created_at|         id|                 org|             payload|public|                repo|                type|
+--------------------+--------------------+-----------+--------------------+--------------------+------+--------------------+--------------------+
|{https://avatars....|2022-08-17T15:52:40Z|23487963576|{https://avatars....|{started, null, n...|  true|{6296790, spring-...|          WatchEvent|
|{https://avatars....|2022-08-17T15:52:40Z|23487963624|                null|{null, null, null...|  true|{525860969, gurra...|         CreateEvent|
|{https://avatars....|2022-08-17T15:52:40Z|23487963529|                null|{null, e80c84c7bb...|  true|{350706029, afbel...|           PushEvent|
|{https://avatars....|2022-08-17T15:52:40Z|23487963558|{https://avatars....|{created, null, {...|  true|{226399669, CM

In [33]:
table_events = spark.sql("""
    select
        id
        , type
        , created_at
        , day(created_at) as day
        , month(created_at) as month
        , year(created_at) as year
        , date(created_at) as date
        , actor.id as actor_id
        , repo.id as repo_id
        , public

    from
        staging_events
""")

In [34]:
table_events.show()

+-----------+--------------------+--------------------+---+-----+----+----------+---------+---------+------+
|         id|                type|          created_at|day|month|year|      date| actor_id|  repo_id|public|
+-----------+--------------------+--------------------+---+-----+----+----------+---------+---------+------+
|23487963576|          WatchEvent|2022-08-17T15:52:40Z| 17|    8|2022|2022-08-17| 44167236|  6296790|  true|
|23487963624|         CreateEvent|2022-08-17T15:52:40Z| 17|    8|2022|2022-08-17|111333037|525860969|  true|
|23487963529|           PushEvent|2022-08-17T15:52:40Z| 17|    8|2022|2022-08-17| 65502770|350706029|  true|
|23487963558|   IssueCommentEvent|2022-08-17T15:52:40Z| 17|    8|2022|2022-08-17| 99458559|226399669|  true|
|23487963581|    PullRequestEvent|2022-08-17T15:52:40Z| 17|    8|2022|2022-08-17|  3787410|521980272|  true|
|23487963532|           PushEvent|2022-08-17T15:52:40Z| 17|    8|2022|2022-08-17| 11596368|438998478|  true|
|23487963524|      

In [37]:
destination = "events"
#table.write.partitionBy("year", "month", "day").mode("overwrite").csv(destination)
table_events.write.partitionBy("date").mode("overwrite").csv(destination)

In [38]:
table_actors = spark.sql("""
    select
        actor.id
        , actor.login as actor_login
        , actor.display_login as actor_display_login
        , actor.gravatar_id as actor_gravatar_id
        , actor.url as actor_url
        , actor.avatar_url as actor_avatar_url
        , day(created_at) as day
        , month(created_at) as month
        , year(created_at) as year
        , date(created_at) as date

    from
        staging_events
""")

In [39]:
table_actors.show()

+---------+--------------+-------------------+-----------------+--------------------+--------------------+---+-----+----+----------+
|       id|   actor_login|actor_display_login|actor_gravatar_id|           actor_url|    actor_avatar_url|day|month|year|      date|
+---------+--------------+-------------------+-----------------+--------------------+--------------------+---+-----+----+----------+
| 44167236|    evilgaoshu|         evilgaoshu|                 |https://api.githu...|https://avatars.g...| 17|    8|2022|2022-08-17|
|111333037|      gurram47|           gurram47|                 |https://api.githu...|https://avatars.g...| 17|    8|2022|2022-08-17|
| 65502770|    afbeltranr|         afbeltranr|                 |https://api.githu...|https://avatars.g...| 17|    8|2022|2022-08-17|
| 99458559|      karla-vm|           karla-vm|                 |https://api.githu...|https://avatars.g...| 17|    8|2022|2022-08-17|
|  3787410|       hsluoyz|            hsluoyz|                 |https

In [43]:
destination = "actors"
#table.write.partitionBy("year", "month", "day").mode("overwrite").csv(destination)
table_actors.write.partitionBy("date").mode("overwrite").csv(destination)

In [41]:
table_repo = spark.sql("""
    select
        repo.id as repo_id
        , repo.name as repo_name
        , repo.url as repo_url
        , day(created_at) as day
        , month(created_at) as month
        , year(created_at) as year
        , date(created_at) as date

    from
        staging_events
""")

In [42]:
table_repo.show()

+---------+--------------------+--------------------+---+-----+----+----------+
|  repo_id|           repo_name|            repo_url|day|month|year|      date|
+---------+--------------------+--------------------+---+-----+----+----------+
|  6296790|spring-projects/s...|https://api.githu...| 17|    8|2022|2022-08-17|
|525860969|gurram47/AP201100...|https://api.githu...| 17|    8|2022|2022-08-17|
|350706029| afbeltranr/Agrilab2|https://api.githu...| 17|    8|2022|2022-08-17|
|226399669|CMSgov/cms-carts-...|https://api.githu...| 17|    8|2022|2022-08-17|
|521980272|casdoor/casdoor-c...|https://api.githu...| 17|    8|2022|2022-08-17|
|438998478|    mnw1020/obsidian|https://api.githu...| 17|    8|2022|2022-08-17|
|525362201|ikjo93/Data-Struc...|https://api.githu...| 17|    8|2022|2022-08-17|
|500139646|Gabe616/ObbyCreat...|https://api.githu...| 17|    8|2022|2022-08-17|
|268298723|    BadProfessor/INL|https://api.githu...| 17|    8|2022|2022-08-17|
|399546191|ALMA-FUNDEGUA/vac...|https://

In [44]:
destination = "repo"
#table.write.partitionBy("year", "month", "day").mode("overwrite").csv(destination)
table_repo.write.partitionBy("date").mode("overwrite").csv(destination)

In [None]:
table_events = spark.sql("""
    select
        id
        , type
        , created_at
        , day(created_at) as day
        , month(created_at) as month
        , year(created_at) as year
        , date(created_at) as date
        , actor.id as actor_id
        , actor.login as actor_login
        , actor.display_login as actor_display_login
        , actor.gravatar_id as actor_gravatar_id
        , actor.url as actor_url
        , actor.avatar_url as actor_avatar_url
        , repo.id as repo_id
        , repo.name as repo_name
        , repo.url as repo_url
        , public

    from
        staging_events
""")