In [33]:
#creating spark session and importing libraries
import findspark
findspark.init("/opt/spark")  
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pyspark.sql.functions as fn
import os
from dotenv import load_dotenv


spark = SparkSession.builder \
    .appName("MySQL Integration") \
    .config("spark.jars", "/home/abanoub/Downloads/mysql-connector-j-8.0.33.jar") \
    .config("spark.driver.extraClassPath", "/home/abanoub/Downloads/mysql-connector-j-8.0.33.jar") \
    .config("spark.executor.extraClassPath", "/home/abanoub/Downloads/mysql-connector-j-8.0.33.jar") \
    .getOrCreate()



In [34]:
#reading all files from the archive 
df_total=spark.read.json('/home/abanoub/Downloads/archive/*.json')

In [35]:
#testing the reading 
df_total.show()

+-------------------+--------------------+-----+--------------------+---------+----------------+-----------+--------------------+-----+-----------+--------------------+------------+-------------------+
|            created|         description|forks|           full_name|       id|        language|open_issues|           repo_name|stars|subscribers|              topics|        type|           username|
+-------------------+--------------------+-----+--------------------+---------+----------------+-----------+--------------------+-----+-----------+--------------------+------------+-------------------+
|2014-02-25 08:00:08|Apache Spark - A ...|25357|        apache/spark| 17165658|           Scala|        242|               spark|32296|       2080|[python, scala, r...|Organization|             apache|
|2017-08-09 19:39:59|Distributed train...| 2027|     horovod/horovod| 99846383|          Python|        298|             horovod|12219|        334|[tensorflow, uber...|Organization|           

In [36]:
df_total.select('language').distinct().show(136)

+-----------------+
|         language|
+-----------------+
|               C#|
|            Terra|
|             Hack|
|         Makefile|
|              VBA|
|             Cuda|
|         Markdown|
|              Arc|
|             LLVM|
|            Metal|
|             Haxe|
|             Rust|
|       JavaScript|
|       Emacs Lisp|
|             TSQL|
|             Perl|
|           Puppet|
|           Erlang|
|          Crystal|
|            Jinja|
|              Nim|
|              C++|
|               F#|
|           Groovy|
|              TeX|
|            OCaml|
|             Dart|
|             MQL5|
|            Cirru|
|              Elm|
|              Vue|
|              Coq|
|       Dockerfile|
|     ActionScript|
|            Julia|
| Jupyter Notebook|
|             XSLT|
|       Vim script|
|    Objective-C++|
|                C|
|      Objective-C|
|          Verilog|
|       ApacheConf|
|            Frege|
|            Swift|
|       TypeScript|
|             NASL|


In [37]:
#creating view for first requistion (programming_lang) 
df_total.createOrReplaceTempView('lang')
programming_lang=spark.sql('Select language ,count(*) as number_of_repos From lang GROUP BY language')


In [38]:
#test 1st df 
programming_lang.show()
programming_lang.printSchema()

+----------+---------------+
|  language|number_of_repos|
+----------+---------------+
|        C#|            336|
|     Terra|              1|
|      Hack|              1|
|  Makefile|             36|
|       VBA|              3|
|      Cuda|             12|
|  Markdown|              3|
|       Arc|              1|
|      LLVM|              2|
|     Metal|              1|
|      Haxe|              4|
|      Rust|            165|
|JavaScript|           5293|
|Emacs Lisp|             18|
|      TSQL|              9|
|      Perl|             14|
|    Puppet|              4|
|    Erlang|              6|
|      NULL|           1424|
|   Crystal|              2|
+----------+---------------+
only showing top 20 rows

root
 |-- language: string (nullable = true)
 |-- number_of_repos: long (nullable = false)



In [39]:
#test2 for 1st df 
org=spark.sql('Select username ,stars From lang WHERE username=="facebookincubator"').show(n=10000)

+-----------------+-----+
|         username|stars|
+-----------------+-----+
|facebookincubator|  375|
|facebookincubator| 9305|
|facebookincubator|  998|
|facebookincubator| 9304|
+-----------------+-----+



In [40]:
load_dotenv()
db_password = os.getenv("mysql_pass")

In [41]:
# writing 1st df into mysql db 
programming_lang.write \
        .format("jdbc") \
        .option("url", "jdbc:mysql://localhost:3306/Github_repos") \
        .option("dbtable", "programming_lang_database") \
        .option("user", "root") \
        .option("password", db_password) \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .mode("append") \
        .save()


In [46]:
#creating 2nd df 
organizations_stars=spark.sql('Select username ,sum(stars) as total_number_of_stars From lang GROUP BY username')

In [None]:
#test for 2nd df 
organizations_stars.show()

In [48]:
print(organizations_stars)

DataFrame[username: string, total_number_of_stars: bigint]


In [49]:
#writing 2nd requistion into mysql db 
organizations_stars.write \
        .format("jdbc") \
        .option("url", "jdbc:mysql://localhost:3306/Github_repos") \
        .option("dbtable", "organizations_stars_database") \
        .option("user", "root") \
        .option("password",db_password) \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .mode("append") \
        .save()


                                                                                

In [50]:
#getting all directories for the json files 
import os 
main_directory='/home/abanoub/Downloads/archive'
files=os.listdir(main_directory)
list_of_directories=[]
for file in files:
    directory_of_file=os.path.join(main_directory, file)
    list_of_directories.append(directory_of_file)

print(list_of_directories)

['/home/abanoub/Downloads/archive/Angular.json', '/home/abanoub/Downloads/archive/Django.json', '/home/abanoub/Downloads/archive/Docker.json', '/home/abanoub/Downloads/archive/Gatsby.json', '/home/abanoub/Downloads/archive/React-Native.json', '/home/abanoub/Downloads/archive/Tensorflow.json', '/home/abanoub/Downloads/archive/Cpp.json', '/home/abanoub/Downloads/archive/Scala.json', '/home/abanoub/Downloads/archive/Machine-Learning.json', '/home/abanoub/Downloads/archive/Kubernetes.json', '/home/abanoub/Downloads/archive/ethereum.json', '/home/abanoub/Downloads/archive/PyTorch.json', '/home/abanoub/Downloads/archive/serverless.json', '/home/abanoub/Downloads/archive/Scikit.json', '/home/abanoub/Downloads/archive/NextJS.json', '/home/abanoub/Downloads/archive/Deep-Learning.json', '/home/abanoub/Downloads/archive/Kotlin.json', '/home/abanoub/Downloads/archive/React-JS.json', '/home/abanoub/Downloads/archive/Julia.json', '/home/abanoub/Downloads/archive/NodeJS.json', '/home/abanoub/Download

In [51]:
# getting names for json files 
df_names = [os.path.splitext(file)[0] for file in files]
print(df_names)

['Angular', 'Django', 'Docker', 'Gatsby', 'React-Native', 'Tensorflow', 'Cpp', 'Scala', 'Machine-Learning', 'Kubernetes', 'ethereum', 'PyTorch', 'serverless', 'Scikit', 'NextJS', 'Deep-Learning', 'Kotlin', 'React-JS', 'Julia', 'NodeJS', 'Golang', 'Threejs', 'R', 'Typescript', 'Dart', 'Flask', 'Spark', 'OOPs', 'Hadoop']


In [52]:
#creation for dfs for every file
df_total_search=[]
for i, directory in enumerate(list_of_directories):
    search_term=df_names[i]
    df=spark.read.json(directory)
    df.createOrReplaceTempView('view_for_df')
    relevance_score= spark.sql(f'SELECT "{search_term}" as search_term, sum(forks)*1.5 + sum(subscribers)*1.32 + sum(stars)*1.04 as relevance_score FROM view_for_df')
    df_total_search.append(relevance_score)

In [53]:
df_total_search[0].show()

+-----------+---------------+
|search_term|relevance_score|
+-----------+---------------+
|    Angular|     2888727.66|
+-----------+---------------+



In [54]:
#merging all dfs and names in one df 
merged_df = df_total_search[0]
for df in df_total_search[1:]: 
    merged_df = merged_df.union(df)

merged_df.show(30)

+----------------+---------------+
|     search_term|relevance_score|
+----------------+---------------+
|         Angular|     2888727.66|
|          Django|     1488257.16|
|          Docker|     4403817.56|
|          Gatsby|      361724.48|
|    React-Native|     3097924.46|
|      Tensorflow|     3918987.26|
|             Cpp|     4377811.44|
|           Scala|     1287463.68|
|Machine-Learning|     6925617.80|
|      Kubernetes|     3540183.02|
|        ethereum|     1157287.26|
|         PyTorch|     3357329.08|
|      serverless|     1312038.98|
|          Scikit|      897061.24|
|          NextJS|      631042.74|
|   Deep-Learning|     6560501.06|
|          Kotlin|     1936763.90|
|        React-JS|      870068.36|
|           Julia|      384773.28|
|          NodeJS|     4331014.68|
|          Golang|     5407407.26|
|         Threejs|      360260.50|
|               R|    11876338.24|
|      Typescript|     4909697.76|
|            Dart|     1028218.32|
|           Flask|  

                                                                                

In [55]:
#writing all dfs 3rd requistion into mysql db 
merged_df.write \
        .format("jdbc") \
        .option("url", "jdbc:mysql://localhost:3306/Github_repos") \
        .option("dbtable", "Search_terms_relevance_database") \
        .option("user", "root") \
        .option("password", db_password) \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .mode("append") \
        .save()


                                                                                