In [1]:
import polars as pl
from tqdm import tqdm
import os
import pandas as pd
import json
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import scipy

In [2]:
import findspark
findspark.init()

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, explode, count, split, substring, size, when
spark = SparkSession.builder.appName('group_problem_course').getOrCreate()

In [4]:
from pyspark.sql.types import StructType, StructField, StringType

custom_schema = StructType([
    StructField("No", StringType(), True),
    StructField("course_id", StringType(), True),
    StructField("grouped_problem_id", StringType(), True)
])


grouped_problems = spark.read.csv("course-problem.csv", header=True, schema=custom_schema)

In [5]:
grouped_problems_with_array = grouped_problems.withColumn("grouped_problem_id_array", split(grouped_problems["grouped_problem_id"], ", "))

# Sử dụng hàm explode để tách các phần tử trong mảng thành các hàng riêng biệt
exploded_grouped_problems = grouped_problems_with_array.select(explode("grouped_problem_id_array").alias("problem_id"), "course_id")

# Hiển thị kết quả
exploded_grouped_problems.show()

+-------------+---------+
|   problem_id|course_id|
+-------------+---------+
|['Pm_2017603'|C_1017355|
| 'Pm_2017604'|C_1017355|
| 'Pm_2017605'|C_1017355|
| 'Pm_2017606'|C_1017355|
| 'Pm_2017607'|C_1017355|
| 'Pm_2017608'|C_1017355|
| 'Pm_2017609'|C_1017355|
| 'Pm_2017610'|C_1017355|
| 'Pm_2017611'|C_1017355|
| 'Pm_2017612'|C_1017355|
| 'Pm_2017613'|C_1017355|
| 'Pm_2017614'|C_1017355|
| 'Pm_2017615'|C_1017355|
| 'Pm_2017616'|C_1017355|
| 'Pm_2017617'|C_1017355|
| 'Pm_2017618'|C_1017355|
| 'Pm_2017619'|C_1017355|
| 'Pm_2017620'|C_1017355|
| 'Pm_2017621'|C_1017355|
| 'Pm_2017622'|C_1017355|
+-------------+---------+
only showing top 20 rows



In [6]:
from pyspark.sql.functions import regexp_replace

cleaned_exploded_grouped_problems = exploded_grouped_problems.withColumn("problem_id", regexp_replace(exploded_grouped_problems["problem_id"], "[\\[']", ""))

# Hiển thị kết quả
cleaned_exploded_grouped_problems.show()

+----------+---------+
|problem_id|course_id|
+----------+---------+
|Pm_2017603|C_1017355|
|Pm_2017604|C_1017355|
|Pm_2017605|C_1017355|
|Pm_2017606|C_1017355|
|Pm_2017607|C_1017355|
|Pm_2017608|C_1017355|
|Pm_2017609|C_1017355|
|Pm_2017610|C_1017355|
|Pm_2017611|C_1017355|
|Pm_2017612|C_1017355|
|Pm_2017613|C_1017355|
|Pm_2017614|C_1017355|
|Pm_2017615|C_1017355|
|Pm_2017616|C_1017355|
|Pm_2017617|C_1017355|
|Pm_2017618|C_1017355|
|Pm_2017619|C_1017355|
|Pm_2017620|C_1017355|
|Pm_2017621|C_1017355|
|Pm_2017622|C_1017355|
+----------+---------+
only showing top 20 rows



# 2. Chọn những course_id trùng với problem_id của file user-problem

In [7]:
df = spark.read.json("D:\\Data\\relations\\user-problem.json")

In [8]:
df.show()

+--------+----------+----------------+----------+-----+-------------------+----------+
|attempts|is_correct|          log_id|problem_id|score|        submit_time|   user_id|
+--------+----------+----------------+----------+-----+-------------------+----------+
|       1|         0|   10000_6906522|Pm_6906522| NULL|2020-10-27 10:11:56|   U_10000|
|       1|         0|   10000_6906523|Pm_6906523| NULL|2020-10-27 10:12:13|   U_10000|
|       1|         1|   10000_6906524|Pm_6906524| NULL|2020-10-27 10:12:28|   U_10000|
|       1|         0|   10000_6906525|Pm_6906525| NULL|2020-10-27 10:14:56|   U_10000|
|       1|         0|   10000_6906526|Pm_6906526| NULL|2020-10-27 10:15:18|   U_10000|
|       1|         0|   10000_6906527|Pm_6906527| NULL|2020-10-27 10:15:41|   U_10000|
|       1|         0|   10000_6906528|Pm_6906528| NULL|2020-10-27 10:16:21|   U_10000|
|       3|         0|10000130_3624759|Pm_3624759| -1.0|2020-05-19 16:57:44|U_10000130|
|       2|         0|10000130_3624760|Pm_36

In [9]:
joined_user_problem = df.join(cleaned_exploded_grouped_problems, df["problem_id"] == cleaned_exploded_grouped_problems["problem_id"], "inner")

In [10]:
# Chọn cột course_id từ bảng exploded_grouped_problems để thêm vào bảng user-problem
joined_user_problem_with_course_id = joined_user_problem.select(df["*"], cleaned_exploded_grouped_problems["course_id"].alias("course_id"))

# Hiển thị kết quả
joined_user_problem_with_course_id.show()

+--------+----------+---------------+----------+-----+-------------------+---------+---------+
|attempts|is_correct|         log_id|problem_id|score|        submit_time|  user_id|course_id|
+--------+----------+---------------+----------+-----+-------------------+---------+---------+
|       1|         0|  10000_6906522|Pm_6906522| NULL|2020-10-27 10:11:56|  U_10000|C_2033958|
|       1|         0|  10000_6906523|Pm_6906523| NULL|2020-10-27 10:12:13|  U_10000|C_2033958|
|       1|         1|  10000_6906524|Pm_6906524| NULL|2020-10-27 10:12:28|  U_10000|C_2033958|
|       1|         0|  10000_6906525|Pm_6906525| NULL|2020-10-27 10:14:56|  U_10000|C_2033958|
|       1|         0|  10000_6906526|Pm_6906526| NULL|2020-10-27 10:15:18|  U_10000|C_2033958|
|       1|         0|  10000_6906527|Pm_6906527| NULL|2020-10-27 10:15:41|  U_10000|C_2033958|
|       1|         0|  10000_6906528|Pm_6906528| NULL|2020-10-27 10:16:21|  U_10000|C_2033958|
|       1|         1|1000454_7309497|Pm_7309497| N

# 3. Tính tỉ lệ giải bài tập đối với mỗi course

In [11]:
from pyspark.sql import functions as F

result_df = joined_user_problem_with_course_id.groupBy("problem_id", "course_id").agg(
    F.sum("attempts").alias("total_attempts"),
    F.sum("is_correct").alias("total_is_correct")
)

result_df = result_df.withColumn("completion_rate", F.col("total_is_correct") / F.col("total_attempts"))

result_df.show()


+----------+---------+--------------+----------------+------------------+
|problem_id|course_id|total_attempts|total_is_correct|   completion_rate|
+----------+---------+--------------+----------------+------------------+
|Pm_3331697|C_1756056|           117|             110|0.9401709401709402|
|Pm_3331707|C_1756056|           117|              95| 0.811965811965812|
|Pm_3331800|C_1756056|           100|              87|              0.87|
|Pm_3331869|C_1756056|            92|              80|0.8695652173913043|
|Pm_3332003|C_1756056|            82|              80| 0.975609756097561|
|Pm_7677475|C_1756056|            84|              66|0.7857142857142857|
| Pm_687745| C_697684|            30|              14|0.4666666666666667|
| Pm_687861| C_697684|            12|              12|               1.0|
|Pm_3946681|C_1822804|         17252|           12979| 0.752318571759796|
|Pm_4077470|C_1822804|         13102|           12099|0.9234468020149595|
|Pm_7839516|C_2256608|           273| 

In [12]:
average_completion_rate_df = result_df.groupBy("course_id").agg(
    F.avg("completion_rate").alias("average_completion_rate")
)
average_completion_rate_df.show()


+---------+-----------------------+
|course_id|average_completion_rate|
+---------+-----------------------+
| C_676937|                 0.5625|
|C_1898449|     0.7058823529411765|
|C_1907852|     0.7840896985192287|
|C_1767581|     0.9384384384384384|
|C_1771164|                  0.675|
|C_2308099|                    0.5|
| C_707083|     0.6880846532405905|
|C_1750851|     0.7908552615191857|
| C_948389|     0.8433933877231129|
|C_1775020|     0.7816917464660283|
|C_1886685|     0.4487179487179487|
|C_2342491|     0.5666666666666667|
| C_948109|     0.8764020342129146|
|C_1712435|     0.7773124078290778|
| C_949542|     0.7464897392197078|
|C_1909583|     0.7083333333333334|
|C_2059282|     0.7259800242110365|
| C_697188|                    0.5|
| C_682556|     0.5533909962586433|
|C_2244921|     0.7416666666666667|
+---------+-----------------------+
only showing top 20 rows



In [13]:
average_completion_rate_df =average_completion_rate_df.toPandas().to_csv('course_completion_rate.csv')

In [14]:
result_df.toPandas().to_csv('problem_completion_rate.csv')