### Initialize Spark & Extracting Data

In [26]:
# Import Used Packages
from pyspark.sql import functions  
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name

In [1]:
# Initialize Spark Session
spark = SparkSession.builder \
    .appName("GitHubRepoETL") \
    .config("spark.jars", "/home/jovyan/work/postgresql-42.7.5.jar") \
    .getOrCreate()

In [2]:
# PostgreSQL Connection 
jdbc_url = "jdbc:postgresql://postgres_db:5432/github_data"

properties = {
    "user": "admin",
    "password": "admin123",
    "driver": "org.postgresql.Driver"
}
df = spark.read.jdbc(url=jdbc_url, table="pg_catalog.pg_tables", properties=properties)
df.show()

+----------+--------------------+----------+----------+----------+--------+-----------+-----------+
|schemaname|           tablename|tableowner|tablespace|hasindexes|hasrules|hastriggers|rowsecurity|
+----------+--------------------+----------+----------+----------+--------+-----------+-----------+
|pg_catalog|        pg_statistic|     admin|      NULL|      true|   false|      false|      false|
|pg_catalog|             pg_type|     admin|      NULL|      true|   false|      false|      false|
|pg_catalog|    pg_foreign_table|     admin|      NULL|      true|   false|      false|      false|
|pg_catalog|           pg_authid|     admin| pg_global|      true|   false|      false|      false|
|pg_catalog|pg_statistic_ext_...|     admin|      NULL|      true|   false|      false|      false|
|pg_catalog|     pg_user_mapping|     admin|      NULL|      true|   false|      false|      false|
|pg_catalog|     pg_subscription|     admin| pg_global|      true|   false|      false|      false|


In [3]:
# Extract The Data
import os
json_path = "Data/*.json"

df = spark.read.json(json_path)

### EDA

In [4]:
df.printSchema()

root
 |-- _corrupt_record: string (nullable = true)
 |-- created: string (nullable = true)
 |-- description: string (nullable = true)
 |-- forks: long (nullable = true)
 |-- full_name: string (nullable = true)
 |-- id: long (nullable = true)
 |-- language: string (nullable = true)
 |-- open_issues: long (nullable = true)
 |-- repo_name: string (nullable = true)
 |-- stars: long (nullable = true)
 |-- subscribers: long (nullable = true)
 |-- topics: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- type: string (nullable = true)
 |-- username: string (nullable = true)



In [11]:
df.show(5)

+---------------+-------------------+--------------------+-----+--------------------+---------+--------+-----------+-----------------+-----+-----------+--------------------+------------+---------+--------------------+---------------+
|_corrupt_record|            created|         description|forks|           full_name|       id|language|open_issues|        repo_name|stars|subscribers|              topics|        type| username|         search_term|relevance_score|
+---------------+-------------------+--------------------+-----+--------------------+---------+--------+-----------+-----------------+-----+-----------+--------------------+------------+---------+--------------------+---------------+
|           NULL|2014-02-25 08:00:08|Apache Spark - A ...|25357|        apache/spark| 17165658|   Scala|        242|            spark|32296|       2080|[python, scala, r...|Organization|   apache|file:///home/jovy...|       74368.94|
|           NULL|2017-08-09 19:39:59|Distributed train...| 2027|

In [12]:
df.count()

27281

In [14]:
df_summary = df.describe()
df_summary.show()

+-------+--------------------+-------------------+--------------------+------------------+--------------------+--------------------+----------+-----------------+------------------+-----------------+------------------+------------+--------------------+--------------------+------------------+
|summary|     _corrupt_record|            created|         description|             forks|           full_name|                  id|  language|      open_issues|         repo_name|            stars|       subscribers|        type|            username|         search_term|   relevance_score|
+-------+--------------------+-------------------+--------------------+------------------+--------------------+--------------------+----------+-----------------+------------------+-----------------+------------------+------------+--------------------+--------------------+------------------+
|  count|                   1|              27273|               26645|             27269|               27279|             

In [19]:
df.select("type").distinct().show()

+------------+
|        type|
+------------+
|Organization|
|        User|
|        NULL|
+------------+



In [20]:
df.groupBy("type").count().show()

+------------+-----+
|        type|count|
+------------+-----+
|        NULL|   10|
|Organization|11276|
|        User|15995|
+------------+-----+



In [22]:
df.groupBy("language").count().orderBy("count", ascending=False).show(10)

+----------------+-----+
|        language|count|
+----------------+-----+
|      JavaScript| 5293|
|          Python| 4861|
|      TypeScript| 2816|
|              Go| 1868|
|            NULL| 1425|
|            Java| 1418|
|Jupyter Notebook| 1291|
|           Scala| 1178|
|             C++|  952|
|            Dart|  855|
+----------------+-----+
only showing top 10 rows



In [23]:
df.select("stars", "forks").stat.corr("stars", "forks")

0.8205940403273477

In [24]:
df.select("full_name", "stars").orderBy(col("stars").desc()).show(10, False)

+-------------------------------+------+
|full_name                      |stars |
+-------------------------------+------+
|kamranahmedse/developer-roadmap|188605|
|facebook/react                 |184156|
|tensorflow/tensorflow          |163254|
|tensorflow/tensorflow          |163249|
|tensorflow/tensorflow          |163247|
|ohmyzsh/ohmyzsh                |141449|
|flutter/flutter                |137350|
|microsoft/vscode               |128959|
|facebook/react-native          |101603|
|facebook/react-native          |101540|
+-------------------------------+------+
only showing top 10 rows



In [25]:
df.groupBy("username").count().orderBy(col("count").desc()).show(10)

+---------------+-----+
|       username|count|
+---------------+-----+
|      microsoft|  186|
|         google|   92|
|     tensorflow|   90|
|         apache|   69|
|    aws-samples|   67|
|  jeromeetienne|   48|
|PacktPublishing|   47|
|       ethereum|   45|
|       jazzband|   40|
|        awslabs|   38|
+---------------+-----+
only showing top 10 rows



In [18]:
df.select(
    mean("stars").alias("Avg Stars"),
    min("stars").alias("Min Stars"),
    max("stars").alias("Max Stars")
).show()

+-----------------+---------+---------+
|        Avg Stars|Min Stars|Max Stars|
+-----------------+---------+---------+
|2069.074883567421|        1|   188605|
+-----------------+---------+---------+



### Data Transformations & Loading in the db

In [6]:
# Transformations
programming_lang_df = df.groupBy("language").count().withColumnRenamed("count", "repo_count")

# Load in db
programming_lang_df.write \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "programming_lang") \
    .option("user", "admin") \
    .option("password", "admin123") \
    .option("driver", "org.postgresql.Driver") \
    .mode("overwrite") \
    .save()

In [9]:
# Transformtions
org_stars_df = df.filter(df["type"] == "Organization").groupBy("username").agg(F.sum("stars").alias("total_stars"))

# Load in db
org_stars_df.write \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "organizations_stars") \
    .option("user", "admin") \
    .option("password", "admin123") \
    .option("driver", "org.postgresql.Driver") \
    .mode("overwrite") \
    .save()


In [10]:
#Transformations
df = df.withColumn("search_term", input_file_name())
df = df.withColumn("relevance_score", 1.5 * df.forks + 1.32 * df.subscribers + 1.04 * df.stars)
search_terms_df = df.select("search_term", "relevance_score")

# Load in db
search_terms_df.write \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "search_terms_relevance") \
    .option("user", "admin") \
    .option("password", "admin123") \
    .option("driver", "org.postgresql.Driver") \
    .mode("overwrite") \
    .save()