In [1]:
import findspark
findspark.init('/home/ductien/spark-3.3.2-bin-hadoop3')
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import *
import mysql.connector

In [2]:
#Create SparkSession
spark = SparkSession.builder \
                    .config("spark.jars", "./mysql-connector-j-8.0.32.jar")\
                    .appName("MySQL_loader").getOrCreate()

23/04/29 01:11:21 WARN Utils: Your hostname, DT-Kubuntu resolves to a loopback address: 127.0.1.1; using 192.168.1.7 instead (on interface wlp3s0)
23/04/29 01:11:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/04/29 01:11:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
mark_2017_df = spark.read.csv("./CleanedDatasets/Mark2017", header= True, inferSchema= True)

                                                                                

In [18]:
mark_18_22_df = spark.read.csv("./CleanedDatasets/Mark_2018_2022", header= True, inferSchema= True)

                                                                                

In [3]:
benchmark_df = spark.read.csv("./CleanedDatasets/cleaned_uni_mark", header= True, inferSchema= True)

                                                                                

In [4]:
from pyspark.sql.functions import col

<h1> Spark dataframe to MySQL database

In [5]:
import os
from dotenv import load_dotenv
load_dotenv(".env")
dbName = os.getenv("DB_NAME")
dbUser = os.getenv("DB_USER")
password = os.getenv("DB_PASSWORD")
host = os.getenv("DB_HOST")
port = os.getenv("DB_PORT")

<h3> Load to province table

In [7]:
mark_2017_df.createTempView("PROVINCE")
provinceDF = spark.sql("SELECT DISTINCT province_code, province FROM PROVINCE")

In [8]:
provinceDF.printSchema()

root
 |-- province_code: integer (nullable = true)
 |-- province: string (nullable = true)



In [9]:
provinceDF = provinceDF.sort("province_code")

In [10]:
provinceDF.count()

                                                                                

61

In [11]:
from pyspark.sql.functions import col

In [12]:
newRow = spark.createDataFrame([(39,'phuyen')], provinceDF.columns)
provinceDF = provinceDF.union(newRow)

In [13]:
newRow = spark.createDataFrame([(45,'ninhthuan')], provinceDF.columns)
provinceDF = provinceDF.union(newRow)

In [14]:
provinceDF = provinceDF.sort('province_code')

In [15]:
provinceDF = provinceDF.withColumnRenamed('province', 'ProvinceName') \
                        .withColumnRenamed('province_code', 'provinceCode')

In [16]:
provinceDF.printSchema()

root
 |-- provinceCode: long (nullable = true)
 |-- ProvinceName: string (nullable = true)



In [17]:
provinceDF.count()

                                                                                

63

In [44]:
provinceDF.write.format('jdbc').options(
    url = 'jdbc:mysql://localhost:3306/equivalent_score',
    driver = 'com.mysql.jdbc.Driver',
    dbtable = "DimProvince",
    user = dbUser,
    password = password
).mode('append').save()

<h3> Load to Year table

In [40]:
from pyspark.sql import Row

In [41]:
yearDF = spark.createDataFrame([
    Row(year = 2018),
    Row(year = 2019),
    Row(year = 2020),
    Row(year = 2021),
    Row(year = 2022)
])

In [42]:
yearDF.show()

+----+
|year|
+----+
|2018|
|2019|
|2020|
|2021|
|2022|
+----+



In [45]:
yearDF.write.format('jdbc').options(
    url = 'jdbc:mysql://localhost:3306/equivalent_score',
    driver = 'com.mysql.jdbc.Driver',
    dbtable = "DimYear",
    user = dbUser,
    password = password
).mode('append').save()

<h2> Load to DimUniverity

In [6]:
benchmark_df.createTempView('UNI')

In [7]:
benchmark_df.printSchema()

root
 |-- uni_code: string (nullable = true)
 |-- uni_name: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- major_code: string (nullable = true)
 |-- major_name: string (nullable = true)
 |-- benchmark: double (nullable = true)
 |-- subject_group: string (nullable = true)



In [8]:
uniDF = spark.sql("SELECT uni_code AS UniCode, uni_name AS UniName, \
                   major_code AS MajorCode, major_name AS MajorName, \
                   subject_group AS SubjectGroup, benchmark AS BenchMark, year AS Year FROM UNI")

In [9]:
from pyspark.sql.functions import count, when, isnan, isnull

In [10]:
uniDF.select([count(when(isnan(c) | isnull(c), c)).alias(c) for c in uniDF.columns]).show()

[Stage 2:>                                                          (0 + 1) / 1]

+-------+-------+---------+---------+------------+---------+----+
|UniCode|UniName|MajorCode|MajorName|SubjectGroup|BenchMark|Year|
+-------+-------+---------+---------+------------+---------+----+
|      0|      0|       37|        0|           0|        0|   0|
+-------+-------+---------+---------+------------+---------+----+



                                                                                

In [11]:
uniDF = uniDF.dropna(subset= uniDF.columns)

In [12]:
uniDF.select([count(when(isnan(c) | isnull(c), c)).alias(c) for c in uniDF.columns]).show()

+-------+-------+---------+---------+------------+---------+----+
|UniCode|UniName|MajorCode|MajorName|SubjectGroup|BenchMark|Year|
+-------+-------+---------+---------+------------+---------+----+
|      0|      0|        0|        0|           0|        0|   0|
+-------+-------+---------+---------+------------+---------+----+



In [13]:
from pyspark.sql.functions import max

In [14]:
uniDF = uniDF.filter(col('BenchMark') <= 32.75)

In [15]:
spark.sql('SELECT max(length(major_code)) FROM UNI').show()

+-----------------------+
|max(length(major_code))|
+-----------------------+
|                     25|
+-----------------------+



In [16]:
uniDF.count()

34734

In [68]:
uniDF.write.format('jdbc').options(
    url = 'jdbc:mysql://localhost:3306/equivalent_score',
    driver = 'com.mysql.jdbc.Driver',
    dbtable = "University",
    user = dbUser,
    password = password
).mode('append').save()

                                                                                

<h1> Load to FactScore

In [19]:
mark_18_22_df.createTempView('student1822')

In [20]:
mark_18_22_df.printSchema()

root
 |-- student_id: integer (nullable = true)
 |-- mathematics: double (nullable = true)
 |-- literature: double (nullable = true)
 |-- english: double (nullable = true)
 |-- physics: double (nullable = true)
 |-- chemistry: double (nullable = true)
 |-- biology: double (nullable = true)
 |-- history: double (nullable = true)
 |-- geography: double (nullable = true)
 |-- civic_education: double (nullable = true)
 |-- Year: integer (nullable = true)
 |-- province_code: integer (nullable = true)
 |-- combined_natural_sciences: double (nullable = true)
 |-- combined_social_sciences: double (nullable = true)



In [21]:
student1822 = spark.sql("SELECT student_id as studentID, literature, mathematics as math, \
                         english, physics, chemistry, biology, history, geography, \
                         civic_education as civil, province_code as ProvinceCode, Year as year FROM student1822")

In [22]:
student1822 = student1822.distinct()

In [23]:
student1822.select('ProvinceCode').distinct().count()

                                                                                

63

In [24]:
distinctProv = spark.sql("SELECT DISTINCT province_code FROM student1822")

In [25]:
distinctProv.count()

                                                                                

63

In [26]:
student1822.count()

                                                                                

3815783

In [27]:
student1822.select([count(when(isnan(c) | isnull(c), c)).alias(c) for c in student1822.columns]).show()



+---------+----------+----+-------+-------+---------+-------+-------+---------+-------+------------+----+
|studentID|literature|math|english|physics|chemistry|biology|history|geography|  civil|ProvinceCode|year|
+---------+----------+----+-------+-------+---------+-------+-------+---------+-------+------------+----+
|        0|         0|   0|      0|2327577|  2327932|2329854|1450340|  1452611|1459756|           0|   0|
+---------+----------+----+-------+-------+---------+-------+-------+---------+-------+------------+----+



                                                                                

In [28]:
student1822.select(max(col('literature'))).show()



+---------------+
|max(literature)|
+---------------+
|           10.0|
+---------------+



                                                                                

In [None]:
student1822.write.format('jdbc').options(
    url = 'jdbc:mysql://localhost:3306/equivalent_score',
    driver = 'com.mysql.jdbc.Driver',
    dbtable = "FactScore",
    user = dbUser,
    password = password
).partitionBy('year').mode('append').save()