In [1]:
from pyspark.sql import SparkSession
import os
import pyspark.sql.functions as F
from pyspark.sql.functions import lit, desc
from sqlalchemy import create_engine
from sqlalchemy.exc import OperationalError
import pandas as pd

In [2]:
spark = SparkSession.builder.appName("GitHubRepos").getOrCreate()

In [3]:
json_files = [f for f in os.listdir("./repos-jsons")]
df_combined = None
largest_schema = None

In [4]:
for file in json_files:
    file_path = f"./repos-jsons/{file}"
    df = spark.read.json(file_path)
    if largest_schema is None or len(df.columns) > len(largest_schema.fields):
        largest_schema = df.schema

In [5]:
for file in json_files:
    file_path = f"./repos-jsons/{file}"
    file_name = file.split('.')[0]

    df_batch = spark.read.schema(largest_schema).json(file_path)
    df_batch = df_batch.withColumn("file_name", lit(file_name))

    if df_combined is None:
        df_combined = df_batch
    else:
        df_combined = df_combined.unionByName(df_batch, allowMissingColumns=True)

In [6]:
df_combined.show(truncate=False)

+-------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+----------------------------------------------+---------+----------+-----------+---------------------------------+-----+-----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+------------+---------+
|created            |description                                                                                                                                                                                                           |forks|full_name                                     |id       |language  |open_issues|repo_name                        |

In [7]:
programming_lang_df = (
    df_combined
    .groupBy("language")
    .count()
    .withColumnRenamed("count", "repo_count")
    .orderBy(desc("repo_count"))
)

programming_lang_df.show()

+----------------+----------+
|        language|repo_count|
+----------------+----------+
|      JavaScript|      5293|
|          Python|      4861|
|      TypeScript|      2816|
|              Go|      1868|
|            NULL|      1425|
|            Java|      1418|
|Jupyter Notebook|      1291|
|           Scala|      1178|
|             C++|       952|
|            Dart|       855|
|          Kotlin|       830|
|           Julia|       806|
|            HTML|       524|
|           Shell|       520|
|              C#|       336|
|               C|       321|
|             CSS|       227|
|     Objective-C|       192|
|            Ruby|       174|
|            Rust|       165|
+----------------+----------+
only showing top 20 rows



In [8]:
organizations_stars_df = (
    df_combined
    .filter(df_combined.type == "Organization")
    .groupBy("username")
    .agg(F.sum("stars").alias("total_stars"))
    .orderBy(desc('total_stars'))
    .withColumnRenamed("username", "organization_name")
)

# Show the results
organizations_stars_df.show()

+-----------------+-----------+
|organization_name|total_stars|
+-----------------+-----------+
|        microsoft|    1011553|
|       tensorflow|     859765|
|         facebook|     627455|
|           apache|     433004|
|          angular|     352533|
|      huggingface|     306537|
|           google|     289449|
|          pytorch|     255202|
|      gothinkster|     218946|
|      storybookjs|     208451|
|       kubernetes|     206820|
|       keras-team|     195019|
|           vercel|     180974|
|          alibaba|     172746|
|          Tencent|     159319|
|          flutter|     159010|
|           nodejs|     144071|
|           docker|     142690|
|          ohmyzsh|     141449|
|             dmlc|     139827|
+-----------------+-----------+
only showing top 20 rows



In [9]:
search_terms_relevance_df = (
    df_combined
    .filter(df_combined.file_name.isNotNull())
    .withColumn("relevance_score", 
                1.5 * F.col("forks") + 
                1.32 * F.col("subscribers") + 
                1.04 * F.col("stars"))
    .groupBy("file_name")
    .agg(F.round(F.sum("relevance_score"),2).alias("total_relevance_score"))
    .orderBy(desc('total_relevance_score'))
    .withColumnRenamed('file_name','search_term')
    .withColumnRenamed('total_relevance_score','relevance_score')
)

search_terms_relevance_df.show()

+----------------+---------------+
|     search_term|relevance_score|
+----------------+---------------+
|               R|  1.187633824E7|
|Machine-Learning|      6925617.8|
|   Deep-Learning|     6559100.02|
|          Golang|     5407407.26|
|      Typescript|     4909697.76|
|          Docker|     4403817.56|
|             Cpp|     4377811.44|
|          NodeJS|     4331014.68|
|      Tensorflow|     3918987.26|
|      Kubernetes|     3540183.02|
|         PyTorch|     3357329.08|
|    React-Native|     3097924.46|
|         Angular|     2888727.66|
|          Kotlin|      1936763.9|
|          Django|     1488257.16|
|      serverless|     1312038.98|
|           Scala|     1287463.68|
|        ethereum|     1157287.26|
|            Dart|     1028218.32|
|          Scikit|      897061.24|
+----------------+---------------+
only showing top 20 rows



In [10]:
programming_lang_df = programming_lang_df.toPandas()
organizations_stars_df = organizations_stars_df.toPandas()
search_terms_relevance_df = search_terms_relevance_df.toPandas()

In [11]:
dwh_url ='postgresql://postgres:DWH123@localhost:5450/spark_project'

engine = create_engine(dwh_url)

In [12]:
try:
    with engine.connect() as connection:
        #test connection
        connection.execute('SELECT 1;')  
        print("Connection successful!")
except OperationalError as e:
    print(f"Connection failed: {e}")

Connection successful!


In [13]:
try:
    with engine.connect() as connection:
        dataframes = [programming_lang_df, organizations_stars_df, search_terms_relevance_df]  
        table_names = ['programming_languages', 'organizations_stars', 'search_term_relevance']  

        # Truncate and insert data
        for df, table_name in zip(dataframes, table_names):
            connection.execute(f"truncate table {table_name} CASCADE")
            df.to_sql(table_name, engine, if_exists='append', index=False)
        print('Data Loaded Successfully')
        
except OperationalError as e:
    print(f"Loading Data Failed: {e}")

Data Loaded Successfully
