#### <b>Import libraries and datasets</b>

In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 28 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 14.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=fe98637f2edc612f52a804925dc5e27e2e9f3c9fc6f56d5dd0365c0a91e71f5f
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [2]:
import pyspark
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.master("local").appName("DataPoints").getOrCreate()

In [4]:
spark

In [5]:
import pandas as pd
!pip install openpyxl

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [7]:
# Convert .xlsx files to .csv to make the datasets readable to spark
pandas_df = pd.read_excel('diem_f1_md.xlsx', engine = 'openpyxl')
pandas_df.to_csv("f1.csv")
df1 = spark.read.csv("./f1.csv", inferSchema=True, header=True)

pd.read_excel('diem_f2_md.xlsx', engine = 'openpyxl').to_csv("f2.csv")
df2 = spark.read.csv("./f2.csv", inferSchema=True, header=True)

In [12]:
df = df1.union(df2)
df.count()

1000000

In [13]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- NAMHOC: integer (nullable = true)
 |-- TENHK: integer (nullable = true)
 |-- NHHK: integer (nullable = true)
 |-- F_MAMH: string (nullable = true)
 |-- F_TENMHVN: string (nullable = true)
 |-- F_DVHT: double (nullable = true)
 |-- F_PHTRAMKT: integer (nullable = true)
 |-- F_PHTRAMTH: integer (nullable = true)
 |-- F_MANH: string (nullable = true)
 |-- F_TO: string (nullable = true)
 |-- STT: string (nullable = true)
 |-- F_MAKH: string (nullable = true)
 |-- F_TENLOP: string (nullable = true)
 |-- F_KHOI: string (nullable = true)
 |-- F_MANG: string (nullable = true)
 |-- F_TENNGVN: string (nullable = true)
 |-- MASV1: integer (nullable = true)
 |-- KT: double (nullable = true)
 |-- TILEKT: double (nullable = true)
 |-- BT: double (nullable = true)
 |-- TILEBT: double (nullable = true)
 |-- BTLDA: double (nullable = true)
 |-- TILEBTLDA: double (nullable = true)
 |-- TN: double (nullable = true)
 |-- TILETN: double (nullable = true)
 |-- T

In [15]:
# create a dataset of courses (môn học) which are taught to CS students ('F_MAKH' == 'MT') from the year 2015 ('NAMHOC' >= 2015)
df_subject = df.filter((df['F_MAKH'] == 'MT') & (df['NAMHOC'] >= 2015)) \
    .groupBy('F_TENMHVN').count().orderBy('F_TENMHVN')
    
df_subject.count()

178

In [16]:
df_subject.show(df_subject.count(), truncate = False)

+------------------------+-----+
|F_TENMHVN               |count|
+------------------------+-----+
|Anh văn 1               |440  |
|Anh văn 2               |517  |
|Anh văn 3               |606  |
|Anh văn 4               |502  |
|Anh văn cơ bản          |95   |
|Anh văn tăng cường 1    |5    |
|Anh văn tăng cường 2    |41   |
|Anh văn tăng cường 3    |73   |
|Anh văn tăng cường 4    |25   |
|App of wind&solar energy|1    |
|Bảomật hệ thống thôngtin|83   |
|C/nghệ phần mềm nâng cao|156  |
|CS dữ liệu &các hệ thtin|1    |
|CSDL phân tán, hướng đtg|12   |
|Con người và môi trường |69   |
|Ctrúc dữliệu & giảithuật|1084 |
|Công nghệ phần mềm      |1130 |
|Công nghệ vật liệu đcươg|1    |
|Cơ học vật rắn & sóng cơ|1    |
|Cơ học ứng dụng         |145  |
|Cơ lý thuyết            |49   |
|Cơ lưu chất             |46   |
|Cơ sở kt điện-máy tính  |2    |
|Cấu trúc dữliệu & Gthuật|179  |
|Cấutrúc rời rạc cho KHMT|1102 |
|Giaotiếp trong kinhdoanh|1    |
|Giáo dục quốc phòng     |1    |
|Giáo dục 

#### <b>Data cleaning and filtering</b>

In [17]:
# clear the grade value of 13, 21, RT
def cleanSubjectPointDF(dataframe):
    res = dataframe.filter(((dataframe['TKET'] >= 0) &(dataframe['TKET'] <= 10)))
    return res

def createSubjectPointDF(SubjectName):
    res = df.filter((df['F_TENMHVN'] == SubjectName) & (df['F_MAKH'] == 'MT') & (df['NAMHOC'] >= 2015)) \
    .select(df.columns[3:4] + df.columns[12:13] + df.columns[16:29])
    cleaned_res = cleanSubjectPointDF(res)
    return cleaned_res

In [18]:
df_HTS = createSubjectPointDF('Hệ thống số')

In [19]:
df_HTS.count()

1107

In [20]:
df_NMDT = createSubjectPointDF('Nhập môn điện toán')

In [21]:
df_NMDT.count()

1082

In [22]:
df_CTRR = createSubjectPointDF('Cấutrúc rời rạc cho KHMT')

In [23]:
df_CTRR.count()

1100

In [24]:
df_KTLT = createSubjectPointDF('Kỹ thuật lập trình')

In [25]:
df_KTLT.count()

1306

In [26]:
df_CTDLGT1 = createSubjectPointDF('Ctrúc dữliệu & giảithuật')
df_CTDLGT2 = createSubjectPointDF('Cấu trúc dữliệu & Gthuật')
df_CTDLGT = df_CTDLGT1.union(df_CTDLGT2)

In [27]:
df_CTDLGT.count()

1261

In [28]:
df_LTHDT1 = createSubjectPointDF('Lậptrình hướng đối tượng')
df_LTHDT2 = createSubjectPointDF('Lập trình hướng đốitượng')
df_LTHDT = df_LTHDT1.union(df_LTHDT2)

In [29]:
df_LTHDT.count()

840

In [30]:
df_KTMT = createSubjectPointDF('Kiến trúc máy tính')

In [31]:
df_KTMT.count()

1070

In [32]:
df_HTS.createOrReplaceTempView("HTS")
df_NMDT.createOrReplaceTempView("NMDT")
df_CTRR.createOrReplaceTempView("CTRR")
df_KTLT.createOrReplaceTempView("KTLT")

In [33]:
df_CTDLGT.createOrReplaceTempView("CTDLGT")
df_KTMT.createOrReplaceTempView("KTMT")
df_LTHDT.createOrReplaceTempView("LTHDT")

In [34]:
df_MHH = createSubjectPointDF('Mô hình hóa toán học')
df_HDH = createSubjectPointDF('Hệ điều hành')
df_HCSDL = createSubjectPointDF('Hệ cơ sở dữ liệu')

df_CNPM = createSubjectPointDF('Công nghệ phần mềm')
df_MMT = createSubjectPointDF('Mạng máy tính')

df_NLNNLT1 = createSubjectPointDF('Ng/lý ngôn ngữ lập trình')
df_NLNNLT2 = createSubjectPointDF('N/ly Ngon Ngu Lap Trinh')
df_NLNNLT = df_NLNNLT1.union(df_NLNNLT2)

df_PTTKGT = createSubjectPointDF('Phân tích và thiết kế gt').union(createSubjectPointDF('PT & Thiết kế giải thuật'))

In [35]:
df_MHH.createOrReplaceTempView("MHH")
df_HDH.createOrReplaceTempView("HDH")
df_HCSDL.createOrReplaceTempView("HCSDL")

df_CNPM.createOrReplaceTempView("CNPM")
df_MMT.createOrReplaceTempView("MMT")
df_NLNNLT.createOrReplaceTempView("NLNNLT")

df_PTTKGT.createOrReplaceTempView("PTTKGT")

In [43]:
diemTK_nam3 = spark.sql("select \
          HTS.MASV1 as MSSV, NMDT.TKET as NMDT, CTRR.TKET as CTRR, HTS.TKET as HTS, KTLT.TKET as KTLT, \
          CTDLGT.TKET as CTDLGT, KTMT.TKET as KTMT, LTHDT.TKET as LTHDT, \
          MHH.TKET as MHH, HCSDL.TKET as HCSDL, HDH.TKET as HDH, \
          MMT.TKET as MMT, CNPM.TKET as CNPM, NLNNLT.TKET as NLNNLT, PTTKGT.TKET as PTTKGT \
          from HTS, NMDT, KTLT, CTRR, CTDLGT, KTMT, LTHDT, MHH, HCSDL, HDH, MMT, CNPM, NLNNLT, PTTKGT \
          where HTS.MASV1 = NMDT.MASV1 and NMDT.MASV1 = CTRR.MASV1 and CTRR.MASV1 = KTLT.MASV1 \
          and KTLT.MASV1 = CTDLGT.MASV1 and CTDLGT.MASV1 = KTMT.MASV1 and KTMT.MASV1 = LTHDT.MASV1 \
          and LTHDT.MASV1 = MHH.MASV1 and MHH.MASV1 = HCSDL.MASV1 and HCSDL.MASV1 = HDH.MASV1 \
          and HDH.MASV1 = MMT.MASV1 and MMT.MASV1 = CNPM.MASV1 and CNPM.MASV1 = NLNNLT.MASV1 \
          and NLNNLT.MASV1 = PTTKGT.MASV1")

In [44]:
diemTK_nam3.count()

440

In [45]:
diemTK_nam3.createOrReplaceTempView('diemTK_nam3')

In [46]:
maxdiemTK_nam3 = spark.sql("select MSSV, max(NMDT) as NMDT, max(CTRR) as CTRR, max(HTS) as HTS, max(KTLT) as KTLT, \
          max(CTDLGT) as CTDLGT, max(KTMT) as KTMT, max(LTHDT) as LTHDT, \
          max(MHH) as MHH, max(HCSDL) as HCSDL, max(HDH) as HDH, \
          max(MMT) as MMT, max(CNPM) as CNPM, max(NLNNLT) as NLNNLT, max(PTTKGT) as PTTKGT \
          from diemTK_nam3 group by MSSV")

In [47]:
maxdiemTK_nam3.count()

122

In [51]:
maxdiemTK_nam3.show()

+--------+----+----+---+----+------+----+-----+---+-----+---+---+----+------+------+
|    MSSV|NMDT|CTRR|HTS|KTLT|CTDLGT|KTMT|LTHDT|MHH|HCSDL|HDH|MMT|CNPM|NLNNLT|PTTKGT|
+--------+----+----+---+----+------+----+-----+---+-----+---+---+----+------+------+
|   75321| 6.5| 6.5|  5|   6|   5.5| 6.5|  7.5|  7|    8|6.5|8.0| 7.0|   5.5|   9.0|
| 2683782| 7.5|   8|6.5|   9|   8.5| 7.5|    7|8.5|    8|  7|8.5| 8.5|   7.0|   7.0|
| 4045426| 8.5| 8.5|  6|   8|   7.5| 8.5|    9|8.5|  7.5|8.5|8.5| 6.5|   7.5|   8.5|
| 4536867|   8|   8|  9| 7.5|     9| 9.5|  9.5|9.5|    9|9.5|9.5| 8.0|   9.5|  10.0|
| 5763862|   7|   6|  8|   6|   7.5|   7|  6.5|8.5|  7.5|8.5|7.5| 6.5|   5.0|   7.0|
| 6024723| 7.5| 7.5|7.5|   8|   8.5| 7.5|    8|  8|    7|  8|7.5| 7.0|   8.5|   9.0|
| 6136739| 8.5|   7|  7| 8.5|   5.0| 5.5|  8.5|6.5|  7.5|8.0|6.5| 6.0|   5.0|   9.0|
| 7675525| 8.5| 7.5|  9|   8|   8.5| 8.5|    9|9.5|    9|8.5|9.0| 8.0|   7.0|   8.5|
|10372040| 6.5|   6|4.5| 2.5|   4.5|   6|  6.5|  6|  6.0|5.0|6.5|

In [48]:
maxdiemTK_nam3.write.csv('diemTK_nam3.csv', header = True)

In [49]:
diemTK_nam2 = spark.sql("select \
          HTS.MASV1 as MSSV, NMDT.TKET as NMDT, CTRR.TKET as CTRR, HTS.TKET as HTS, KTLT.TKET as KTLT, \
          CTDLGT.TKET as CTDLGT, KTMT.TKET as KTMT, LTHDT.TKET as LTHDT, \
          MHH.TKET as MHH, HCSDL.TKET as HCSDL, HDH.TKET as HDH \
          from HTS, NMDT, KTLT, CTRR, CTDLGT, KTMT, LTHDT, MHH, HCSDL, HDH \
          where HTS.MASV1 = NMDT.MASV1 and NMDT.MASV1 = CTRR.MASV1 and CTRR.MASV1 = KTLT.MASV1 \
          and KTLT.MASV1 = CTDLGT.MASV1 and CTDLGT.MASV1 = KTMT.MASV1 and KTMT.MASV1 = LTHDT.MASV1 \
          and LTHDT.MASV1 = MHH.MASV1 and MHH.MASV1 = HCSDL.MASV1 and HCSDL.MASV1 = HDH.MASV1")

In [50]:
diemTK_nam2.createOrReplaceTempView('diemTK_nam2')

In [55]:
maxdiemTK_nam2 = spark.sql("select MSSV, max(NMDT) as NMDT, max(CTRR) as CTRR, max(HTS) as HTS, max(KTLT) as KTLT, \
          max(CTDLGT) as CTDLGT, max(KTMT) as KTMT, max(LTHDT) as LTHDT, \
          max(MHH) as MHH, max(HCSDL) as HCSDL, max(HDH) as HDH \
          from diemTK_nam2 group by MSSV")

In [56]:
maxdiemTK_nam2.count()

359

In [57]:
maxdiemTK_nam2.show()

+-------+----+----+---+----+------+----+-----+----+-----+---+
|   MSSV|NMDT|CTRR|HTS|KTLT|CTDLGT|KTMT|LTHDT| MHH|HCSDL|HDH|
+-------+----+----+---+----+------+----+-----+----+-----+---+
|  75321| 6.5| 6.5|  5|   6|   5.5| 6.5|  7.5|   7|    8|6.5|
|  81657| 7.5|   7|5.5| 7.0|   7.0| 7.5|  7.5| 6.0|  7.5|7.5|
| 209312|   7| 6.5|5.5| 7.5|   4.5| 7.0|  0.0|   7|    6|  8|
| 583833| 6.5|   7|  6| 5.5|   6.5|   7|    8|   7|    6|  7|
|1509861| 7.5|   5|  5| 9.0|   3.5| 6.5|  6.0| 7.0|  7.5|6.5|
|2047807| 6.5| 6.5|7.5| 9.5|   8.5| 6.5|  8.5| 7.5|  8.5|8.5|
|2223555|   9|   8|9.5|10.0|   9.0| 8.0|  8.0|10.0|  8.0|8.5|
|2240989| 8.5|   7|  7| 7.0|   6.0| 8.0|  7.0| 6.5|  7.5|8.0|
|2260878| 7.5| 8.5|8.5|10.0|   8.5| 8.0|  8.0| 8.5|  0.0|9.0|
|2274996| 5.5|   7|  7| 7.5|   7.5|   6|    8|   7|  7.5|7.5|
|2343848|   8|   7|  6| 7.5|   5.0| 7.0|  6.5| 6.5|  0.0|5.5|
|2683782| 7.5|   8|6.5|   9|   8.5| 7.5|    7| 8.5|    8|  7|
|3095962|   7| 6.5|  6| 6.5|   7.5|   6|  7.5| 5.5|  7.5|6.5|
|3419145

In [58]:
maxdiemTK_nam2.createOrReplaceTempView('maxdiemTK_nam2')

In [59]:
df_NMAI = createSubjectPointDF('Nhập môn trítuệ nhân tạo')
df_DHMT = createSubjectPointDF('Đồ họa máy tính')
df_MMANM = createSubjectPointDF('Mật mã & an ninh mạng').union(createSubjectPointDF('Mật mã và an ninh mạng'))

In [60]:
df_NMAI.createOrReplaceTempView("NMAI")
df_DHMT.createOrReplaceTempView("DHMT")
df_MMANM.createOrReplaceTempView("MMANM")

In [61]:
res_NMAI = spark.sql("select NMAI.MASV1 as MSSV, NMAI.TKET as NMTTNT from NMAI, maxdiemTK_nam2 where NMAI.MASV1 = maxdiemTK_nam2.MSSV")
res_NMAI.count()

159

In [62]:
res_DHMT = spark.sql("select DHMT.MASV1 as MSSV, DHMT.TKET as DHMT from DHMT, maxdiemTK_nam2 where DHMT.MASV1 = maxdiemTK_nam2.MSSV")
res_DHMT.count()

120

In [63]:
res_MMANM = spark.sql("select MMANM.MASV1 as MSSV, MMANM.TKET as MMANM from MMANM, maxdiemTK_nam2 where MMANM.MASV1 = maxdiemTK_nam2.MSSV")
res_MMANM.count()

92

In [64]:
res_NMAI.createOrReplaceTempView('res_NMAI')
res_DHMT.createOrReplaceTempView('res_DHMT')
res_MMANM.createOrReplaceTempView('res_MMANM')

In [65]:
diemTK_BB_TC = spark.sql("select maxdiemTK_nam2.MSSV, NMDT,CTRR,HTS,KTLT,CTDLGT,KTMT,LTHDT,MHH,HCSDL,HDH, NMTTNT, DHMT, MMANM \
          from maxdiemTK_nam2 left join res_NMAI on maxdiemTK_nam2.MSSV = res_NMAI.MSSV \
          left join res_DHMT on maxdiemTK_nam2.MSSV = res_DHMT.MSSV \
          left join res_MMANM on maxdiemTK_nam2.MSSV = res_MMANM.MSSV")

In [66]:
diemTK_BB_TC.show()

+-------+----+----+---+----+------+----+-----+----+-----+---+------+----+-----+
|   MSSV|NMDT|CTRR|HTS|KTLT|CTDLGT|KTMT|LTHDT| MHH|HCSDL|HDH|NMTTNT|DHMT|MMANM|
+-------+----+----+---+----+------+----+-----+----+-----+---+------+----+-----+
|  75321| 6.5| 6.5|  5|   6|   5.5| 6.5|  7.5|   7|    8|6.5|   5.0|null|  7.0|
|  81657| 7.5|   7|5.5| 7.0|   7.0| 7.5|  7.5| 6.0|  7.5|7.5|   5.0|null| null|
| 209312|   7| 6.5|5.5| 7.5|   4.5| 7.0|  0.0|   7|    6|  8|   4.0| 0.0| null|
| 583833| 6.5|   7|  6| 5.5|   6.5|   7|    8|   7|    6|  7|   8.5| 6.0|  7.5|
|1509861| 7.5|   5|  5| 9.0|   3.5| 6.5|  6.0| 7.0|  7.5|6.5|  null|null| null|
|2047807| 6.5| 6.5|7.5| 9.5|   8.5| 6.5|  8.5| 7.5|  8.5|8.5|   9.0| 8.5| null|
|2223555|   9|   8|9.5|10.0|   9.0| 8.0|  8.0|10.0|  8.0|8.5|  null|null| null|
|2240989| 8.5|   7|  7| 7.0|   6.0| 8.0|  7.0| 6.5|  7.5|8.0|  null|null| null|
|2260878| 7.5| 8.5|8.5|10.0|   8.5| 8.0|  8.0| 8.5|  0.0|9.0|  null|null| null|
|2274996| 5.5|   7|  7| 7.5|   7.5|   6|

In [67]:
from pyspark.sql.functions import when

ndf = diemTK_BB_TC.withColumn("NMTTNT", \
              when(diemTK_BB_TC["NMTTNT"] >= 0, 1).otherwise(0))
ndf = ndf.withColumn("DHMT", when(ndf["DHMT"] >= 0, 1).otherwise(0))
ndf = ndf.withColumn("MMANM", when(ndf["MMANM"] >= 0, 1).otherwise(0))

In [68]:
ndf.count()

359

In [69]:
ndf.show(359)

+--------+----+----+---+----+------+----+-----+----+-----+---+------+----+-----+
|    MSSV|NMDT|CTRR|HTS|KTLT|CTDLGT|KTMT|LTHDT| MHH|HCSDL|HDH|NMTTNT|DHMT|MMANM|
+--------+----+----+---+----+------+----+-----+----+-----+---+------+----+-----+
|   75321| 6.5| 6.5|  5|   6|   5.5| 6.5|  7.5|   7|    8|6.5|     1|   0|    1|
|   81657| 7.5|   7|5.5| 7.0|   7.0| 7.5|  7.5| 6.0|  7.5|7.5|     1|   0|    0|
|  209312|   7| 6.5|5.5| 7.5|   4.5| 7.0|  0.0|   7|    6|  8|     1|   1|    0|
|  583833| 6.5|   7|  6| 5.5|   6.5|   7|    8|   7|    6|  7|     1|   1|    1|
| 1509861| 7.5|   5|  5| 9.0|   3.5| 6.5|  6.0| 7.0|  7.5|6.5|     0|   0|    0|
| 2047807| 6.5| 6.5|7.5| 9.5|   8.5| 6.5|  8.5| 7.5|  8.5|8.5|     1|   1|    0|
| 2223555|   9|   8|9.5|10.0|   9.0| 8.0|  8.0|10.0|  8.0|8.5|     0|   0|    0|
| 2240989| 8.5|   7|  7| 7.0|   6.0| 8.0|  7.0| 6.5|  7.5|8.0|     0|   0|    0|
| 2260878| 7.5| 8.5|8.5|10.0|   8.5| 8.0|  8.0| 8.5|  0.0|9.0|     0|   0|    0|
| 2274996| 5.5|   7|  7| 7.5

In [70]:
ndf.write.csv('input.csv', header = True)