# Spark Context

In [12]:
from pyspark.sql import SparkSession

# Spark session & context
spark = (SparkSession
         .builder
         .master("local")
         .appName("spark-sql")
         .getOrCreate())
sc = spark.sparkContext

In [2]:
df = (
    spark.read
    .format("csv")
    .option("header", True)
    .load("/home/jovyan/data/movies.csv")
)

In [3]:
df.summary()

DataFrame[summary: string, movieId: string, title: string, genres: string]

In [21]:
df.show(10, False)

+-------+----------------------------------+-------------------------------------------+
|movieId|title                             |genres                                     |
+-------+----------------------------------+-------------------------------------------+
|1      |Toy Story (1995)                  |Adventure|Animation|Children|Comedy|Fantasy|
|2      |Jumanji (1995)                    |Adventure|Children|Fantasy                 |
|3      |Grumpier Old Men (1995)           |Comedy|Romance                             |
|4      |Waiting to Exhale (1995)          |Comedy|Drama|Romance                       |
|5      |Father of the Bride Part II (1995)|Comedy                                     |
|6      |Heat (1995)                       |Action|Crime|Thriller                      |
|7      |Sabrina (1995)                    |Comedy|Romance                             |
|8      |Tom and Huck (1995)               |Adventure|Children                         |
|9      |Sudden Death

In [4]:
df.printSchema()

root
 |-- movieId: string (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



In [18]:
df.createOrReplaceTempView("movies")

In [19]:
n_df = spark.sql("SELECT DISTINCT(genres) FROM movies")
n_df.show(30, False)

+-------------------------------------------------+
|genres                                           |
+-------------------------------------------------+
|Comedy|Horror|Thriller                           |
|Adventure|Sci-Fi|Thriller                        |
|Action|Adventure|Drama|Fantasy                   |
|Action|Drama|Horror                              |
|Action|Animation|Comedy|Sci-Fi                   |
|Animation|Children|Drama|Musical|Romance         |
|Action|Adventure|Drama                           |
|Adventure|Sci-Fi                                 |
|Documentary|Musical|IMAX                         |
|Adventure|Children|Fantasy|Sci-Fi|Thriller       |
|Adventure|Animation                              |
|Musical|Romance|War                              |
|Action|Adventure|Fantasy|Romance                 |
|Adventure|Children|Drama|Fantasy|IMAX            |
|Comedy|Crime|Horror|Thriller                     |
|Crime|Drama|Fantasy|Horror|Thriller              |
|Comedy|Myst

In [22]:
spark.sql("SELECT title FROM movies WHERE movieId > 100").show(10, False)

+------------------------------------------+
|title                                     |
+------------------------------------------+
|Bottle Rocket (1996)                      |
|Mr. Wrong (1996)                          |
|Unforgettable (1996)                      |
|Happy Gilmore (1996)                      |
|Bridges of Madison County, The (1995)     |
|Nobody Loves Me (Keiner liebt mich) (1994)|
|Muppet Treasure Island (1996)             |
|Catwalk (1996)                            |
|Braveheart (1995)                         |
|Taxi Driver (1976)                        |
+------------------------------------------+
only showing top 10 rows



In [30]:
spark.sql("""
    SELECT movieId, genres
    FROM movies
    GROUP BY genres, movieId"""
    ).where("movieId > 10"
    ).count()



                                                                                

9732

## RDD with partitions

In [9]:
rdd1 = sc.textFile("../data/movies.csv")
rdd2 = sc.textFile("../data/ratings.csv")

In [16]:

print(f"defaultMinPartitions: {sc.defaultMinPartitions}")
print(f"name: rdd1, count: {rdd1.count}, partitions: {rdd1.getNumPartitions()}")
print(f"name: rdd2, count: {rdd2.count}, partitions: {rdd2.getNumPartitions()}")


defaultMinPartitions: 1
name: rdd1, count: <bound method RDD.count of ../data/movies.csv MapPartitionsRDD[32] at textFile at NativeMethodAccessorImpl.java:0>, partitions: 1
name: rdd2, count: <bound method RDD.count of ../data/ratings.csv MapPartitionsRDD[34] at textFile at NativeMethodAccessorImpl.java:0>, partitions: 1


## Spark SQL

In [32]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType , DateType, TimestampType, LongType ,BooleanType

schema = StructType([
      StructField("actor", StringType(), True),
      StructField("created_at", DateType(), True),
      StructField("id", LongType(), False),
      StructField("org", StringType(), True),
      StructField("payload", StringType(), True),
      StructField("public", BooleanType(), True),
      StructField("repo", StringType(), True),
      StructField("type", StringType(), True)
  ])

github = spark.read.json("../data/2024-06-01-17.json" , schema = schema)
github.printSchema()


root
 |-- actor: string (nullable = true)
 |-- created_at: date (nullable = true)
 |-- id: long (nullable = true)
 |-- org: string (nullable = true)
 |-- payload: string (nullable = true)
 |-- public: boolean (nullable = true)
 |-- repo: string (nullable = true)
 |-- type: string (nullable = true)



In [7]:
github.show(10)

                                                                                

+--------------------+----------+----+--------------------+--------------------+------+--------------------+-----------+
|               actor|created_at|  id|                 org|             payload|public|                repo|       type|
+--------------------+----------+----+--------------------+--------------------+------+--------------------+-----------+
|{"id":120438850,"...|2024-06-01|null|                null|{"repository_id":...|  true|{"id":583676719,"...|  PushEvent|
|{"id":85358022,"l...|2024-06-01|null|                null|{"repository_id":...|  true|{"id":807004394,"...|  PushEvent|
|{"id":168934284,"...|2024-06-01|null|                null|{"ref":null,"ref_...|  true|{"id":809092693,"...|CreateEvent|
|{"id":143586580,"...|2024-06-01|null|                null|{"ref":"pullshark...|  true|{"id":808472583,"...|DeleteEvent|
|{"id":167545029,"...|2024-06-01|null|                null|{"ref":"main","re...|  true|{"id":809092689,"...|CreateEvent|
|{"id":11133742,"l...|2024-06-01

In [10]:
github.show(1,False) # nested 한 내부 확인
                     # 내부 구조에 대해서 각각 스키마작성

                                                                                

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+----+----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+--------------------------------------------------------------------------------------+---------+
|actor                                                                                                                                                                                       |created_at|i

In [14]:
github.select("payload").show(1,False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|payload                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
+-------------------------------

In [37]:
from pyspark.sql.types import StructType, StructField, StringType, LongType
from pyspark.sql.functions import from_json, col

actor_schema = StructType([
    StructField('login', StringType(), True),
    StructField('url', StringType(), True)
])

payload_schema = StructType([
    StructField('repository_id', LongType(), True),
    StructField('size', LongType(), True),
    StructField('distinct_size', LongType(), True),
    StructField('message', StringType(), True)
])

repo_schema = StructType([
    StructField('name', StringType(), True),
    StructField('url', StringType(), True)
])

# * 컬럼에서 *_schema를 따라 json 파일을 추출
new_df = github.withColumn('actor_json', from_json('actor', schema = actor_schema)) \
               .select(col('created_at'), col('id'), col('payload'), col('type'), col('actor_json.*'), col('repo'))

new_df = new_df.withColumn('payload_json', from_json('payload', schema = payload_schema)) \
               .select(col('login'), col('url'), col('created_at'), col('id'), col('payload_json.*'), col('type'), col('repo'))

new_df = new_df.withColumn('repo_json', from_json('repo', schema = repo_schema)) \
               .select(col('login'), col('url'), col('created_at'), col('id'), col('repository_id'), 
                       col('size'), col('distinct_size'), col('message'), col('type'), col('repo_json.*'))
new_df.show(10)

                                                                                

+---------------+--------------------+----------+----+-------------+----+-------------+-------+-----------+--------------------+--------------------+
|          login|                 url|created_at|  id|repository_id|size|distinct_size|message|       type|                name|                 url|
+---------------+--------------------+----------+----+-------------+----+-------------+-------+-----------+--------------------+--------------------+
|        1992513|https://api.githu...|2024-06-01|null|    583676719|   1|            1|   null|  PushEvent|         1992513/djy|https://api.githu...|
|         dds256|https://api.githu...|2024-06-01|null|    807004394|   1|            1|   null|  PushEvent|     dds256/MusicBot|https://api.githu...|
|      Avamcotec|https://api.githu...|2024-06-01|null|         null|null|         null|   null|CreateEvent|        Avamcotec/db|https://api.githu...|
|  lukebewitched|https://api.githu...|2024-06-01|null|         null|null|         null|   null|Delet


|login     |url    |created_at    |id   |repository_id|size|distinct_size|message|type  |
| --- | ---| ---| --- | --- | --- | --- | --- | --- |
| String | String | DateTime | Long | Long | Int | Int | String | String |

Top 50 Push repositories
Top 50 Commit repositories



In [55]:
new_df.printSchema()

root
 |-- login: string (nullable = true)
 |-- url: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- id: string (nullable = true)
 |-- repository_id: long (nullable = true)
 |-- size: long (nullable = true)
 |-- distinct_size: long (nullable = true)
 |-- message: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- url: string (nullable = true)



In [38]:
# filter github action bot
# created_at to datetime
new_df = new_df.filter(col("login") != "github-actions[bot]")
new_df = new_df.withColumn('created_at', F.trim(F.regexp_replace(new_df.created_at, "[TZ]", " ")))
new_df = new_df.withColumn('created_dt', F.to_timestamp(new_df.created_at, 'yyyy-MM-dd HH:mm:ss'))
new_df.show(10, False)

                                                                                

+---------------+--------------------------------------------+----------+----+-------------+----+-------------+-------+-----------+------------------------------------------+-----------------------------------------------------------------------+----------+
|login          |url                                         |created_at|id  |repository_id|size|distinct_size|message|type       |name                                      |url                                                                    |created_dt|
+---------------+--------------------------------------------+----------+----+-------------+----+-------------+-------+-----------+------------------------------------------+-----------------------------------------------------------------------+----------+
|1992513        |https://api.github.com/users/1992513        |2024-06-01|null|583676719    |1   |1            |null   |PushEvent  |1992513/djy                               |https://api.github.com/repos/1992513/djy            

In [73]:
new_df.printSchema()

root
 |-- login: string (nullable = true)
 |-- url: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- id: string (nullable = true)
 |-- repository_id: long (nullable = true)
 |-- size: long (nullable = true)
 |-- distinct_size: long (nullable = true)
 |-- message: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- url: string (nullable = true)
 |-- created_dt: timestamp (nullable = true)



In [64]:
# name 컬럼 정제
def check_repo_name(name):
    sp = name.split("/")
    if not sp:
        return name
    else:
        return sp[-1]
    
udf_check_repo_name = F.udf(check_repo_name, StringType())

In [None]:
# @F.udf(returnType=StringType())
# def check_repo_name(val):
#     sp = name.split("/")
#     if not sp:
#         return name
#     else:
#         return sp[-1]

In [39]:
udf_check_repo_name = F.udf(lambda name : name.split("/")[1], StringType())

In [40]:
new_df = new_df.withColumn('repo_name', udf_check_repo_name(F.col('name')))
new_df.show(10, False)

[Stage 21:>                                                         (0 + 1) / 1]

+---------------+--------------------------------------------+----------+----+-------------+----+-------------+-------+-----------+------------------------------------------+-----------------------------------------------------------------------+----------+---------------------------+
|login          |url                                         |created_at|id  |repository_id|size|distinct_size|message|type       |name                                      |url                                                                    |created_dt|repo_name                  |
+---------------+--------------------------------------------+----------+----+-------------+----+-------------+-------+-----------+------------------------------------------+-----------------------------------------------------------------------+----------+---------------------------+
|1992513        |https://api.github.com/users/1992513        |2024-06-01|null|583676719    |1   |1            |null   |PushEvent  |1992513/djy


                                                                                

In [85]:
new_df.count()

                                                                                

171662

In [84]:
new_df.agg(F.countDistinct('repo_name')).show(5, False)



+----------------+
|count(repo_name)|
+----------------+
|50439           |
+----------------+




                                                                                

In [88]:
# Window
# 중복을 제거하고 각 repo 가 가장 최근에 작업한 내용만 가져오고싶다는 needs
# ==> window 이용가능
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number

w = Window.partitionBy("repo_name").orderBy(F.desc(col("size")))
new_df.withColumn("row", row_number().over(w)) \
      .filter(col("row") == 1).drop("row") \
      .count()

                                                                                

50439

## Data Model

1. Top 10 Repo
- id
- @timestamp
- repo_url
- repo_name
- push_count
- commit_count
- pr_count
- fork_count
- issue_count
- watch_count

2. Top 10 User
- id
- @timestamp
- user_name
- push_count
- commit_count
- pr_count
- issue_count
- issue_comment_count

3. Daily Stats
- id
- @timestamp
- distinct_user_cnt
- distinct_repo_cnt
- push_count
- commit_count
- pr_count
- issue_count
- issue_comment_count
- release_count
- language

4. Monthly Stats

5. Weekly Stats

In [89]:
new_df.select("type").distinct().show()

                                                                                

+--------------------+
|                type|
+--------------------+
|PullRequestReview...|
|           PushEvent|
|         GollumEvent|
|        ReleaseEvent|
|  CommitCommentEvent|
|         CreateEvent|
|PullRequestReview...|
|   IssueCommentEvent|
|         DeleteEvent|
|         IssuesEvent|
|           ForkEvent|
|         PublicEvent|
|         MemberEvent|
|          WatchEvent|
|    PullRequestEvent|
+--------------------+

