In [1]:
# import findspark
# findspark.init('/home/ductien/yspark-3.3.2-bin-hadoop3')
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import *
import mysql.connector

In [2]:
#Create SparkSession
spark = SparkSession.builder \
                    .config("spark.jars", "./mysql-connector-java-8.0.29.jar")\
                    .appName("MySQL_loader").getOrCreate()

In [3]:
mark_2017_df = spark.read.csv("./CleanedDatasets/Mark2017", header= True, inferSchema= True)

In [4]:
mark_18_22_df = spark.read.csv("./CleanedDatasets/Mark_2018_2022", header= True, inferSchema= True)

<h1> Spark dataframe to MySQL database

In [5]:
import os
from dotenv import load_dotenv
load_dotenv(".env")
dbName = os.getenv("DB_NAME")
dbUser = os.getenv("DB_USER")
password = os.getenv("DB_PASSWORD")
host = os.getenv("DB_HOST")
port = os.getenv("DB_PORT")

<h3> Load to province table

In [None]:
mark_2017_df.createTempView("PROVINCE")

In [None]:
provinceDF = spark.sql("SELECT DSTINCT province_code, province_name FROM PROVINCE")

In [None]:
provinceDF.printSchema()

In [None]:
provinceDF = provinceDF.sort("province_code")

In [None]:
provinceDF.count()

In [None]:
from pyspark.sql.functions import col

In [None]:
newRow = spark.createDataFrame([(39,'phuyen')], provinceDF.columns)
provinceDF = provinceDF.union(newRow)

In [None]:
newRow = spark.createDataFrame([(45,'ninhthuan')], provinceDF.columns)
provinceDF = provinceDF.union(newRow)

In [None]:
provinceDF = provinceDF.sort('province_code')

In [None]:
provinceDF = provinceDF.withColumnRenamed('province', 'ProvinceName') \
                        .withColumnRenamed('province_code', 'provinceCode')

In [None]:
provinceDF.printSchema()

In [None]:
provinceDF.count()

In [None]:
provinceDF.write.format('jdbc').options(
    url = 'jdbc:mysql://localhost:3306/equivalent_score',
    driver = 'com.mysql.jdbc.Driver',
    dbtable = "DimProvince",
    user = dbUser,
    password = password
).mode('append').save()

<h3> Load to Year table

In [None]:
from pyspark.sql import Row

In [None]:
yearDF = spark.createDataFrame([
    Row(year = 2018),
    Row(year = 2019),
    Row(year = 2020),
    Row(year = 2021),
    Row(year = 2022)
])

In [None]:
yearDF.show()

In [None]:
yearDF.write.format('jdbc').options(
    url = 'jdbc:mysql://localhost:3306/equivalent_score',
    driver = 'com.mysql.jdbc.Driver',
    dbtable = "DimYear",
    user = dbUser,
    password = password
).mode('append').save()

<h2> Load to DimUniverity

In [None]:
benchmark_df.createTempView('UNI')

In [None]:
benchmark_df.printSchema()

In [None]:
uniDF = spark.sql("SELECT uni_code AS UniCode, uni_name AS UniName, \
                   major_code AS MajorCode, major_name AS MajorName, \
                   subject_group AS SubjectGroup, benchmark AS BenchMark, year AS Year FROM UNI")

In [None]:
from pyspark.sql.functions import count, when, isnan, isnull

In [None]:
uniDF.select([count(when(isnan(c) | isnull(c), c)).alias(c) for c in uniDF.columns]).show()

In [None]:
uniDF = uniDF.dropna(subset= uniDF.columns)

In [None]:
uniDF.select([count(when(isnan(c) | isnull(c), c)).alias(c) for c in uniDF.columns]).show()

In [None]:
from pyspark.sql.functions import max

In [None]:
uniDF = uniDF.filter(col('BenchMark') <= 32.75)

In [None]:
spark.sql('SELECT max(length(major_code)) FROM UNI').show()

In [None]:
uniDF.count()

In [None]:
uniDF.write.format('jdbc').options(
    url = 'jdbc:mysql://localhost:3306/equivalent_score',
    driver = 'com.mysql.jdbc.Driver',
    dbtable = "University",
    user = dbUser,
    password = password
).mode('append').save()

<h1> Load to FactScore

In [None]:
mark_18_22_df.createTempView('student1822')

In [None]:
mark_18_22_df.printSchema()

In [None]:
student1822 = spark.sql("SELECT student_id as studentID, literature, mathematics as math, \
                         english, physics, chemistry, biology, history, geography, \
                         civic_education as civil, province_code as ProvinceCode, Year as year FROM student1822")

In [None]:
student1822 = student1822.distinct()

In [None]:
student1822.select('ProvinceCode').distinct().count()

In [None]:
distinctProv = spark.sql("SELECT DISTINCT province_code FROM student1822")

In [None]:
distinctProv.count()

In [None]:
student1822.count()

In [None]:
student1822.select([count(when(isnan(c) | isnull(c), c)).alias(c) for c in student1822.columns]).show()

In [None]:
student1822.select(max(col('literature'))).show()

In [None]:
student1822.write.format('jdbc').options(
    url = 'jdbc:mysql://localhost:3306/equivalent_score',
    driver = 'com.mysql.jdbc.Driver',
    dbtable = "FactScore",
    user = dbUser,
    password = password
).partitionBy('year').mode('append').save()