In [1]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; } </style>"))

In [2]:
from utils import *
from pyspark.sql import SparkSession
from pyspark.sql import types as T
from pyspark.sql import functions as F
import config

if __name__ == "__main__":

    ## glopal variables
    spark_master = "spark://spark-service:7077"
    app_name = "extratct json data"
    memory = "4g"
    data_path = "/home/data/archive/"
    
    spark = start_spark_session(spark_master,app_name,memory)

    repos_schema = T.StructType([T.StructField('id', T.LongType(), True),
                                 T.StructField('repo_name', T.StringType(), True),
                                 T.StructField('full_name', T.StringType(), True),
                                 T.StructField('description', T.StringType(), True),
                                 T.StructField('created', T.StringType(), True), 
                                 T.StructField('language', T.StringType(), True), 
                                 T.StructField('type', T.StringType(), True),
                                 T.StructField('username', T.StringType(), True),
                                 T.StructField('stars', T.LongType(), True),
                                 T.StructField('forks', T.LongType(), True), 
                                 T.StructField('subscribers', T.LongType(), True),
                                 T.StructField('open_issues', T.LongType(), True), 
                                 T.StructField('topics', T.ArrayType(T.StringType(), True), True)
                                 ])
    df_source_repos = spark.read.json("/home/data/archive/", schema = repos_schema)

    """ Create a table for programming languages called "programming_lang" which has two columns, 
        the programming language name and the number of repos using it."""
    
    programming_lang_df = df_source_repos.groupby("language")\
                                         .agg(F.count("*").alias("repo_count"))
    write_sdf_to_postgres_db(programming_lang_df, config.POSTGRS_CREDENTIALS,"programming_lang", mode = "overwrite")
    
    """ Create a table for the organization-type accounts called "organizations_stars" which has two columns, 
        the organization name and the total number of stars across all of its repos in all the files."""
    
    organizations_stars_df = df_source_repos.filter(F.col("type")=="Organization")\
                                            .groupby("username").agg(F.sum("stars").alias("stars_count"))

    write_sdf_to_postgres_db(organizations_stars_df, config.POSTGRS_CREDENTIALS,"organizations_stars", mode = "overwrite")


    """ Create a table for the search terms called "search_terms_relevance" which has two columns,
        the search term - a.k.a. the file name - and the relevance score for all the repos for this search term. 
        We use a self-defined formular for calculating the relevance 
        where relevance score = 1.5 * forks + 1.32 * subscribers + 1.04 * stars.
    """
    
    relevence_calc = 1.5 * F.col("forks") + 1.32 * F.col("subscribers") +  1.04 * F.col("stars")
    
    search_terms_relevance_df = df_source_repos.withColumn("relevance_score", 
                                                           F.round(relevence_calc / 100) )\
                                                           .select("repo_name","relevance_score")

    
    write_sdf_to_postgres_db(search_terms_relevance_df, config.POSTGRS_CREDENTIALS,"search_terms_relevance", mode = "overwrite")

    


                                                                                

In [48]:
search_terms_relevance_df.show()

+--------------------+---------------+
|           repo_name|relevance_score|
+--------------------+---------------+
|               spark|          744.0|
|             horovod|          162.0|
|      SparkInternals|           84.0|
|               delta|           61.0|
|   TensorFlowOnSpark|           57.0|
|              koalas|           40.0|
|     spark-jobserver|           47.0|
|       analytics-zoo|           38.0|
|            ballista|           27.0|
|               deequ|           29.0|
|       TransmogrifAI|           29.0|
|                vega|           25.0|
| spark-deep-learning|           29.0|
|spark-on-k8s-oper...|           35.0|
|                oryx|           28.0|
|               spark|           23.0|
|        docker-spark|           28.0|
|          elassandra|           21.0|
|  spark-py-notebooks|           30.0|
|          carbondata|           26.0|
+--------------------+---------------+
only showing top 20 rows

