In [3]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import col

In [4]:
# 初始化Spark会话
spark = SparkSession.builder.appName("ExploreEducationData").getOrCreate()

# 数据文件路径
path_prefix = "data/"  # 需要根据实际情况调整
datasets = {
    "course_chapter": "course_chapter.csv",
    "course_task": "course_task.csv",
    "classroom_member": "classroom_member.csv",
    "classroom_courses": "classroom_courses.csv",
    "log": "log.csv",
    "user_learn_statistics_total": "user_learn_statistics_total.csv",
    "activity_learn_log": "activity_learn_log.csv",
    "testpaper": "testpaper.csv",
    "testpaper_result": "testpaper_result.csv",
    # "教学视频" 对应的数据可能不适合直接以表格形式读取
}

# 读取所有数据表
dataframes = {}
for name, file in datasets.items():
    df = spark.read.csv(f"{path_prefix}{file}", header=True, inferSchema=True)
    dataframes[name] = df

# 例：查看每个表的基本信息
for name, df in dataframes.items():
    print(f"---{name}---")
    df.show(5)  # 显示前5行
    df.printSchema()  # 打印表结构
    print(f"Total rows: {df.count()}\n")  # 显示总行数


---course_chapter---
+---+--------+-------+------+---+------------+-----------+-----------+------+---------+----------+---------------+-------------------+----------------+----------------+---------------+----------------+
| id|courseId|   type|number|seq|       title|createdTime|updatedTime|copyId|   status|isOptional|migrateLessonId|migrateCopyCourseId|migrateRefTaskId|mgrateCopyTaskId|migrate_task_id|published_number|
+---+--------+-------+------+---+------------+-----------+-----------+------+---------+----------+---------------+-------------------+----------------+----------------+---------------+----------------+
| 13|       3| lesson|     2|  4|        引言| 1536659838| 1560909864|     0|published|         0|              0|                  0|               0|               0|              0|               2|
| 14|       3|chapter|     1|  3|机器学习绪论| 1536659876| 1560909861|     0|published|         0|              0|                  0|               0|               0|           

In [5]:
from pyspark.sql import functions as F

In [6]:
# 用户学习统计的基本统计信息
df_user_stats = dataframes["user_learn_statistics_total"]
user_learn_stats_summary = df_user_stats.agg(
    F.countDistinct("userId").alias("unique_users"),
    F.max("learnedSeconds").alias("max_learned_seconds"),
    F.max("finishedTaskNum").alias("max_finished_tasks")
)
user_learn_stats_summary.show()

+------------+-------------------+------------------+
|unique_users|max_learned_seconds|max_finished_tasks|
+------------+-------------------+------------------+
|      142251|           12778285|              1608|
+------------+-------------------+------------------+


In [7]:
# 课程任务的统计信息
df_course_task = dataframes["course_task"]

course_task_summary = df_course_task.agg(
    F.countDistinct("courseId").alias("unique_courses"),
    F.count("id").alias("total_tasks"),
    F.max("length").alias("max_task_length")
)
course_task_summary.show()

+--------------+-----------+---------------+
|unique_courses|total_tasks|max_task_length|
+--------------+-----------+---------------+
|         11468|     259238|          14757|
+--------------+-----------+---------------+


In [8]:
# 试卷成绩的最高排名等统计信息
df_testpaper_result = dataframes["testpaper_result"]
testpaper_result_summary = df_testpaper_result.agg(
    F.max("score").alias("max_score"),
    F.countDistinct("userId").alias("unique_test_takers")
)
testpaper_result_summary.show()

+---------+------------------+
|max_score|unique_test_takers|
+---------+------------------+
|     99.5|             35642|
+---------+------------------+


In [9]:
# 日志记录的统计信息，如最活跃的用户

df_log = dataframes["log"]
log_summary = df_log.groupBy("userId").count().withColumnRenamed("count", "activity_count").orderBy(F.desc("activity_count"))
log_summary.show(5)  # 显示前5个最活跃的用户

+------+--------------+
|userId|activity_count|
+------+--------------+
|155471|         44486|
|165559|         40673|
|     0|         40221|
| 79144|         16257|
|160105|         15907|
+------+--------------+


In [10]:
# 学习日志的统计信息，如最常见的学习行为
df_activity_learn_log = dataframes["activity_learn_log"]
learn_log_summary = df_activity_learn_log.groupBy("event").count().orderBy(F.desc("count"))
learn_log_summary.show()

+--------+-------+
|   event|  count|
+--------+-------+
|   doing|9561272|
|watching|1931242|
|   start| 605537|
|  finish| 397415|
+--------+-------+


1. 课程效果评估
课程效果评估可以通过课程的完成度、学习成绩、用户反馈等多个角度来分析。

In [11]:
from pyspark.sql import functions as F

# 正确的课程参与度分析，包括章节数量、任务数量和参与学生数量
# 首先获取courseId和courseTaskId的对应关系
task_course_mapping = dataframes["course_task"].select("courseId", "id").withColumnRenamed("id", "courseTaskId")

# 将activity_learn_log与task_course_mapping连接，以获取每个学习记录所对应的courseId
activity_course_mapping = dataframes["activity_learn_log"].join(task_course_mapping, "courseTaskId")

# 现在，我们可以根据courseId来聚合，计算每门课程的参与学生数量
course_student_count_correct = activity_course_mapping.groupBy("courseId").agg(F.countDistinct("userId").alias("student_count"))

# 课程的章节数量
course_chapter_count = dataframes["course_chapter"].groupBy("courseId").count().withColumnRenamed("count", "chapter_count")

# 课程的任务数量
course_task_count = dataframes["course_task"].groupBy("courseId").count().withColumnRenamed("count", "task_count")

# 连接上述统计信息以进行分析
course_analysis = course_chapter_count.join(course_task_count, "courseId").join(course_student_count_correct, "courseId")

course_analysis.show()


+--------+-------------+----------+-------------+
|courseId|chapter_count|task_count|student_count|
+--------+-------------+----------+-------------+
|     451|            2|         2|            3|
|     255|            8|         7|            1|
|     368|           78|        70|            1|
|     385|           12|        13|            4|
|     436|           37|        27|            1|
|     412|           60|        50|           18|
|      26|           56|        42|            2|
|     384|          182|       151|            5|
|     271|           76|        69|            6|
|     743|           12|        10|           26|
|     253|           12|        11|            5|
|      12|           19|        15|            1|
|     350|           63|        32|            1|
|     417|           37|        29|           65|
|     548|           18|        22|            2|
|     409|           31|        24|            1|
|     601|           11|        11|           14|


In [20]:
from pyspark.sql import functions as F

# 计算每个课程的完成任务数量
course_completed_tasks = dataframes['course_task'].groupBy('courseId') \
    .agg(F.count('id').alias('completedTasks'))

# 计算每个课程的总任务数量
course_total_tasks = dataframes['course_task'].groupBy('courseId') \
    .agg(F.count('id').alias('totalTasks'))

# 计算完成率
course_completion_rate = course_completed_tasks.join(course_total_tasks, 'courseId') \
    .select('courseId', (F.col('completedTasks') / F.col('totalTasks')).alias('completionRate'))

course_completion_rate.show()


+--------+--------------+
|courseId|completionRate|
+--------+--------------+
|     148|           1.0|
|     463|           1.0|
|     471|           1.0|
|     496|           1.0|
|     833|           1.0|
|     243|           1.0|
|     392|           1.0|
|     540|           1.0|
|     623|           1.0|
|     737|           1.0|
|     858|           1.0|
|     897|           1.0|
|    1025|           1.0|
|     516|           1.0|
|      85|           1.0|
|     137|           1.0|
|     251|           1.0|
|     451|           1.0|
|     580|           1.0|
|     808|           1.0|
+--------+--------------+


2. 教学视频质量分析
分析视频的观看时长、重复观看次数等，可以初步评估视频质量。

方向2: 学生学习行为分析
分析学生的学习行为，例如学习视频的平均时长、完成的任务数量等。

In [12]:
from pyspark.sql.types import IntegerType

# 转换学习时长的数据类型
dataframes["user_learn_statistics_total"] = dataframes["user_learn_statistics_total"].withColumn("learnedSeconds", col("learnedSeconds").cast(IntegerType()))

# 计算每个学生的平均学习时长和完成的任务数量
student_learn_behavior = dataframes["user_learn_statistics_total"].groupBy("userId").agg(
    F.avg("learnedSeconds").alias("avg_learned_seconds"),
    F.sum("finishedTaskNum").alias("total_finished_tasks")
)

student_learn_behavior.show()


+------+-------------------+--------------------+
|userId|avg_learned_seconds|total_finished_tasks|
+------+-------------------+--------------------+
|   148|               17.0|                   0|
|   463|           215584.0|                  21|
|   471|            10914.0|                   8|
|   496|             3400.0|                   0|
|   833|             9665.0|                   1|
|  1088|            22953.0|                   4|
|  1238|             5937.0|                   3|
|  1342|              128.0|                   0|
|  1580|            14666.0|                   3|
|  1591|               41.0|                   1|
|  1645|             5649.0|                   3|
|  1829|               75.0|                   1|
|  1959|              121.0|                   0|
|  2122|             4817.0|                   1|
|  2142|               18.0|                   0|
|  2366|              549.0|                   2|
|  2866|               48.0|                   1|


方向4: 测试结果分析
分析测试结果，例如平均分数、通过率等

In [13]:
# 将分数列转换为浮点数
dataframes["testpaper_result"] = dataframes["testpaper_result"].withColumn("score", col("score").cast("float"))

# 计算每次测试的平均分和通过率
test_result_analysis = dataframes["testpaper_result"].groupBy("testId").agg(
    F.avg("score").alias("avg_score"),
    (F.sum(F.when(col("passedStatus") == "passed", 1).otherwise(0)) / F.count("testId")).alias("pass_rate")
)

test_result_analysis.show()


+------+------------------+-------------------+
|testId|         avg_score|          pass_rate|
+------+------------------+-------------------+
|  1090|               0.0|                0.0|
|  2136|               0.0|                0.0|
|  1159|               0.0|                0.0|
|   125|               0.0|                0.0|
|  2275|18.341463414634145|                1.0|
|  2393|17.391304347826086| 0.9782608695652174|
|   919|               0.0|                0.0|
|  1241|               0.0|0.10526315789473684|
|  1265|               0.0|                0.0|
|   124|               0.0|0.05263157894736842|
|   475|               0.0|                0.0|
|   574|               0.0|                0.0|
|  1695| 35.92307692307692|                0.0|
|  1773|              13.2|                0.7|
|  1445|               0.0|               0.04|
|   205|               0.0|                0.0|
|   334|               0.0|0.10256410256410256|
|   544|               0.0|             

In [14]:
from pyspark.sql.functions import countDistinct

# 使用activity_learn_log与task_course_mapping进行连接，以便获取每个学习活动对应的课程ID
activity_course_mapping = dataframes["activity_learn_log"].join(task_course_mapping, "courseTaskId")

# 根据courseId对结果进行分组，并计算每门课程的参与学生数量
course_student_count = activity_course_mapping.groupBy("courseId").agg(countDistinct("userId").alias("student_count"))

course_student_count.show()


+--------+-------------+
|courseId|student_count|
+--------+-------------+
|   11033|          130|
|   10623|           14|
|   11317|            1|
|   10206|           12|
|   11748|            2|
|   11858|           15|
|   12027|           10|
|    1645|            1|
|   12046|            1|
|    7554|            1|
|    2580|            4|
|   10230|           13|
|   10703|            8|
|    7417|            1|
|    3475|            1|
|   11316|            1|
|   10462|           10|
|    2811|            1|
|   10121|            4|
|   11710|            1|
+--------+-------------+


In [15]:
# 计算每门课程的平均每名学生完成的任务数量
course_participation = course_task_count.join(course_student_count, "courseId").withColumn(
    "avg_tasks_per_student", 
    F.col("task_count") / F.col("student_count")
)

course_participation.show()


+--------+----------+-------------+---------------------+
|courseId|task_count|student_count|avg_tasks_per_student|
+--------+----------+-------------+---------------------+
|   11033|        63|          130|   0.4846153846153846|
|   10623|        67|           14|    4.785714285714286|
|   11317|         7|            1|                  7.0|
|   10206|        20|           12|   1.6666666666666667|
|   11748|        20|            2|                 10.0|
|   11858|         3|           15|                  0.2|
|   12027|        66|           10|                  6.6|
|    1645|        35|            1|                 35.0|
|   12046|        14|            1|                 14.0|
|    7554|         6|            1|                  6.0|
|    2580|        20|            4|                  5.0|
|   10230|        30|           13|   2.3076923076923075|
|   10703|         4|            8|                  0.5|
|    7417|        59|            1|                 59.0|
|    3475|    

In [16]:
# 假设testpaper_result DataFrame已经准备好
# 转换score为浮点数
from pyspark.sql.types import FloatType
testpaper_result = dataframes["testpaper_result"].withColumn("score", col("score").cast(FloatType()))

# 计算每门课程的平均分数
course_avg_score = testpaper_result.groupBy("courseId").agg(F.avg("score").alias("avg_score"))

course_avg_score.show()


+----------+------------------+
|  courseId|         avg_score|
+----------+------------------+
|1576143326|     1.578298752E9|
|       858| 8.678378378378378|
|1578460928|     1.578550144E9|
|       580|               0.0|
|       879|  9.35036496350365|
|       883|              31.5|
|       804|5.0442890442890445|
|       853| 6.170357751277683|
|       857| 8.865131578947368|
|       593|               0.0|
|       683|3.2977099236641223|
|       796| 7.154696132596685|
|       772| 83.21428571428571|
|       756| 4.698080279232112|
|       385|               0.0|
|        81|               0.0|
|1576764152|     1.578300416E9|
|1577261130|     1.578301056E9|
|       830| 7.383838383838384|
|       762| 3.013824884792627|
+----------+------------------+


In [17]:
# 假设course_task_count和course_student_count已准备好
# 结合课程参与度指标
course_participation = course_task_count.join(course_student_count, "courseId")

# 结合学习成果
course_analysis = course_participation.join(course_avg_score, "courseId")

course_analysis.show()


+--------+----------+-------------+------------------+
|courseId|task_count|student_count|         avg_score|
+--------+----------+-------------+------------------+
|    1645|        35|            1| 7.117647058823529|
|    7554|         6|            1|               0.0|
|    2580|        20|            4|               0.0|
|    3475|        13|            1|               0.0|
|    2811|        38|            1| 8.265486725663717|
|   12715|         3|            5| 83.33333333333333|
|   10468|         8|            1|               0.0|
|   10815|        10|           16|               0.0|
|   12210|         1|           54| 78.47169811320755|
|    6934|         9|            1|               0.0|
|   11468|         3|            6|53.333333333333336|
|    2625|         1|            1| 77.55703883495146|
|   12336|         6|           82| 31.52310924369748|
|   11059|         3|           28|29.833333333333332|
|    8003|        17|           14|               0.0|
|    8423|

In [18]:
# 计算平均任务完成率与平均分数之间的相关系数
for col_name in ["task_count", "student_count"]:
    correlation = course_analysis.stat.corr(col_name, "avg_score")
    print(f"The correlation between {col_name} and average score is: {correlation}")


The correlation between task_count and average score is: -0.38467894012801457
The correlation between student_count and average score is: 0.05668126870898684


In [22]:
from pyspark.sql.functions import col, avg

# 计算每个学生的学习活动参与度
student_activity_score = dataframes["activity_learn_log"].groupBy("userId").count().withColumnRenamed("count", "activity_score")

# 计算每个学生的课程完成度得分
student_task_score = dataframes["user_learn_statistics_total"].select("userId", "finishedTaskNum").withColumnRenamed("finishedTaskNum", "task_score")

# 计算每个学生的学习成果得分（平均考试分数）
# 假设testpaper_result已经被转换成了适当的数值类型
student_test_score = dataframes["testpaper_result"].groupBy("userId").agg(avg("score").alias("test_score"))

# 合并这些得分
student_scores = student_activity_score.join(student_task_score, "userId", "outer").join(student_test_score, "userId", "outer")

# 计算综合评分，这里我们简单地取平均值作为综合评分
student_overall_score = student_scores.withColumn(
    "overall_score",
    (col("activity_score") + col("task_score") + col("test_score")) / 3
)

# 选择综合评分最高的前十名学生
top_students = student_overall_score.orderBy(col("overall_score").desc())

top_students.show()


+------+--------------+----------+------------------+------------------+
|userId|activity_score|task_score|        test_score|     overall_score|
+------+--------------+----------+------------------+------------------+
|167157|         31477|       158| 42.27777777777778|10559.092592592593|
|  5183|         29031|       493|               6.5|            9843.5|
|159863|         27987|       276|30.857142857142858| 9431.285714285714|
|145060|         22993|       479| 66.33333333333333|  7846.11111111111|
|164177|         20639|       228|36.285714285714285| 6967.761904761905|
|158920|         18443|       361|12.785714285714286| 6272.261904761905|
|162613|         18263|       199|              37.0| 6166.333333333333|
|169488|         18360|       125|               0.0| 6161.666666666667|
|167698|         17771|       210|            46.875| 6009.291666666667|
|164169|         17293|       344|30.964285714285715| 5889.321428571428|
|168274|         17196|       202|              30.

NameError: name 'dataframes' is not defined