## Github Most Popular repos


In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("SimpleApp") \
    .master("local[*]") \
    .config("spark.jars", "/Drivers/SQL_Sever/jdbc/postgresql-42.7.3.jar") \
    .getOrCreate()

sc = spark.sparkContext

Data = a group of json files that has data for the top starred repos on github according to different search terms

## Loading Data

In [90]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, ArrayType

schema = StructType([
    StructField("created", TimestampType()),
    StructField("description", StringType()),
    StructField("forks", IntegerType()),
    StructField("full_name", StringType()),
    StructField("id", IntegerType()),
    StructField("language", StringType()),
    StructField("open_issues", IntegerType()),
    StructField("repo_name", StringType()),
    StructField("stars", IntegerType()),
    StructField("subscribers", IntegerType()),
    StructField("topics", ArrayType(StringType())),
    StructField("type", StringType()),
    StructField("username", StringType())
])

df = spark.read.json('work/data/*.json', schema = schema)

In [89]:
df.printSchema()

root
 |-- created: timestamp (nullable = true)
 |-- description: string (nullable = true)
 |-- forks: integer (nullable = true)
 |-- full_name: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- language: string (nullable = true)
 |-- open_issues: integer (nullable = true)
 |-- repo_name: string (nullable = true)
 |-- stars: integer (nullable = true)
 |-- subscribers: integer (nullable = true)
 |-- topics: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- type: string (nullable = true)
 |-- username: string (nullable = true)



## Extracting file names into a column

In [91]:
from pyspark.sql.functions import input_file_name, regexp_replace, substring_index

df_file_names = df.withColumn("search_term", input_file_name())
df_search_terms = df_file_names.withColumn("search_term",regexp_replace(substring_index("search_term", '/', -1), ".json", ""))

## Checking the number of nulls in each column

In [92]:
from pyspark.sql.functions import col, count, when, isnull

df_search_terms.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

+-------+-----------+-----+---------+---+--------+-----------+---------+-----+-----------+------+----+--------+
|created|description|forks|full_name| id|language|open_issues|repo_name|stars|subscribers|topics|type|username|
+-------+-----------+-----+---------+---+--------+-----------+---------+-----+-----------+------+----+--------+
|      7|        666|   11|        1|  0|    1466|         13|        0|   11|         12|    13|   9|       9|
+-------+-----------+-----+---------+---+--------+-----------+---------+-----+-----------+------+----+--------+



#### Investigating the empty repo name

In [93]:
df_search_terms.filter("full_name is null").show()

+-------+-----------+-----+---------+--------+--------+-----------+----------------+-----+-----------+------+----+--------+-----------+
|created|description|forks|full_name|      id|language|open_issues|       repo_name|stars|subscribers|topics|type|username|search_term|
+-------+-----------+-----+---------+--------+--------+-----------+----------------+-----+-----------+------+----+--------+-----------+
|   NULL|       NULL| NULL|     NULL|63600221|    NULL|       NULL|docker-spark-arm| NULL|       NULL|  NULL|NULL|    NULL|      Spark|
+-------+-----------+-----+---------+--------+--------+-----------+----------------+-----+-----------+------+----+--------+-----------+



#### Filling the missing repo name

In [94]:
df_filled_full_name = df_search_terms.na.fill("afritzler/docker-spark-arm", subset = ["full_name"])

## Using Github API to get missing repo details

In [95]:
import requests
from pyspark.sql import Row

def fill_missing_values(df, col_name, github_col_name):
    full_names_df = df.filter(isnull(col(col_name))).select("full_name")
    repo_data = {}
    token = ""
    headers = {"Authorization": f"token {token}"}
    
    for row in full_names_df.collect():
        full_name = row["full_name"]
        repo_url = f"https://api.github.com/repos/{full_name}"
        response = requests.get(repo_url, headers = headers)
        if response.status_code == 200:
            if col_name == "type" or col_name == "username":
                repo_data[full_name] = response.json()['owner'][github_col_name]
            else:
                repo_data[full_name] = response.json()[github_col_name]
        else:
            repo_data[full_name] = None 
        
    repo_df = spark.createDataFrame([Row(full_name=k, col_filled=v) for k, v in repo_data.items()])

    res = df.join(repo_df, on="full_name", how="left").withColumn(
        col_name,
        when(col(col_name).isNull(), col("col_filled"))
        .otherwise(col(col_name))
    ).drop("col_filled")
    
    return res 

In [96]:
df_filled_created = fill_missing_values(df_filled_full_name, "created","created_at")
df_filled_stars = fill_missing_values(df_filled_created, "stars","stargazers_count")
df_filled_issues = fill_missing_values(df_filled_stars, "open_issues","open_issues_count")
df_filled_subscribers = fill_missing_values(df_filled_issues, "subscribers","subscribers_count")
df_filled_type = fill_missing_values(df_filled_subscribers,"type","type")
df_filled_forks = fill_missing_values(df_filled_type, "forks","forks_count")
df_filled = fill_missing_values(df_filled_forks, "username","login")

In [97]:
df_filled.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

+-------+-----------+-----+---------+---+--------+-----------+---------+-----+-----------+------+----+--------+
|created|description|forks|full_name| id|language|open_issues|repo_name|stars|subscribers|topics|type|username|
+-------+-----------+-----+---------+---+--------+-----------+---------+-----+-----------+------+----+--------+
|      0|        666|    0|        0|  0|    1466|          0|        0|    0|          0|    13|   0|       0|
+-------+-----------+-----+---------+---+--------+-----------+---------+-----+-----------+------+----+--------+



## Answering requirements:

**1. Create a table for programming languages called "programming_lang" which has two columns, the programming language name and the number of repos using it.**

Some repos have missing programming languages, this is due to some repos not containing code for example repos of popular books. Or other repos having many programming languages with no dominant language, in this case github doesn't detect a programming language for the repo. To make this more understandable we replace nulls with "No Language Detected".

Also, Matlap language is stored once in lower case and another time on uppercase, so language names column should be standardized.

In [98]:
from pyspark.sql.functions import lower, col

df_filled_languages = df_filled.na.fill("No Language Detected", ["language"])

df_lower = df_filled_languages.withColumn('language', lower(col('language')))

df_langs = df_lower.groupby('language').count()

Loading Data to Postgres:

In [99]:
df_langs.write.format("jdbc") \
    .option("url", "jdbc:postgresql://postgres:5432/github_repo") \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", "programmin_lang") \
    .option("user", "postgres") \
    .option("password", "password") \
    .mode("overwrite") \
    .save()

**2. Create a table for the organization-type accounts called "organizations_stars" which has two columns, the organization name and the total number of stars across all of its repos in all the files.**

In [100]:
df_orgs = df_filled.filter("type = 'Organization'").groupby('username').sum('stars').withColumnRenamed("sum(stars)", "total_stars")

In [101]:
df_orgs.write.format("jdbc") \
    .option("url", "jdbc:postgresql://postgres:5432/github_repo") \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", "organizations_stars") \
    .option("user", "postgres") \
    .option("password", "password") \
    .mode("overwrite") \
    .save()

**3. Create a table for the search terms called "search_terms_relevance" which has two columns, the search term - a.k.a. the file name - and the relevance score for all the repos for this search term. We use a self-defined formular for calculating the relevance where relevance score = 1.5 * forks + 1.32 * subscribers + 1.04 * stars**

In [103]:
df_relevance_scores = df_filled.selectExpr("search_term", "repo_name", "1.5 * forks + 1.32 * subscribers + 1.04 * stars as relevance_score")

In [105]:
df_relevance_scores.write.format("jdbc") \
    .option("url", "jdbc:postgresql://postgres:5432/github_repo") \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", "search_terms_relevance") \
    .option("user", "postgres") \
    .option("password", "password") \
    .mode("overwrite") \
    .save()