<a href="https://colab.research.google.com/github/Azure06072005/DS317-Data-Mining-for-Enterprise/blob/main/merge_reply_user_video.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [35]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Import necessary libraries


In [36]:
import pyspark
from pyspark.sql import SparkSession
import warnings
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
import seaborn as sns
import matplotlib.pyplot as plt
from os import truncate
from pyspark.sql.functions import explode, col, sum, from_unixtime, hour, dayofweek, count, round, when, isnull, split, avg
warnings.filterwarnings("ignore")

spark = SparkSession.builder \
  .appName("reply_user_video_analysis") \
  .master("local[*]") \
  .config("spark.some.config.option", "some-value") \
  .getOrCreate()

In [37]:
BASE_DIR = '/content/drive/MyDrive/DS317.Q11 nhóm 8/DataMOOCDS317'
MY_TEST_DIR = '/content/drive/MyDrive/DS317.Q11 nhóm 8/Bài tập trên lớp/Bài tập phân tích Dataset/Data/test'
df_user_course_path = '/content/drive/MyDrive/DS317.Q11 nhóm 8/DataMOOCDS317/user_course_final_ver3/user_course_v4.csv'

# User-video

In [38]:
df_video_id_ccid = spark.read.text(BASE_DIR + '/video_id-ccid.txt')

In [39]:
df_video_id_ccid = df_video_id_ccid.select(
    split(col('value'), "\t")[0].alias("video_id"),
    split(col('value'), "\t")[1].alias("ccid")
)

In [40]:
df_video = spark.read.json(BASE_DIR + '/video.json')

In [41]:
from pyspark.sql.functions import array_max
df_video = df_video.withColumn('video_duration', array_max('end'))

In [42]:
df_video_duration = df_video.select("ccid", 'video_duration')

In [43]:
df_video = df_video_duration.join(df_video_id_ccid, on=['ccid'], how='left')

In [44]:
df_user_video = spark.read.json(BASE_DIR + '/user-video.json')
# Explode the outer array in 'seq' to get each video sequence as a row
df_user_video = df_user_video.select("user_id", explode("seq").alias("video_sequence"))

# Select the video_id and explode the 'segment' array within each video sequence
df_user_video = df_user_video.select(
    "user_id",
    df_user_video["video_sequence"]["video_id"].alias("video_id"),
    explode(df_user_video["video_sequence"]["segment"]).alias("segment_details")
)

# Select the individul fields from the 'segment_details' struct
df_user_video = df_user_video.select(
    "user_id",
    "video_id",
    df_user_video["segment_details"]["start_point"].alias("start_point"),
    df_user_video["segment_details"]["end_point"].alias("end_point"),
    df_user_video["segment_details"]["speed"].alias("speed"),
    df_user_video["segment_details"]["local_start_time"].alias("local_start_time")
)

df_user_video = df_user_video.withColumn("watched_duration", round(col('end_point') - col('start_point')))
df_user_video = df_user_video.select("user_id", "video_id", "speed", "watched_duration")

In [46]:
df_uv_v = df_user_video.join(df_video, on=['video_id'], how='left')

In [47]:
df_user_video_grouped = (
    df_uv_v # Use df_uv_v which contains video_duration
    .groupBy("user_id", "video_id", "video_duration")
    .agg(
        F.sum("watched_duration").alias("watched_duration_total"),
        F.avg("speed").alias("avg_speed_per_video"),
        F.first("ccid").alias("ccid")
    )
)

In [49]:
df_user_stats = (
    df_user_video_grouped # Use df_user_video_grouped which contains the aggregated columns
    .groupBy("user_id")
    .agg(
        F.round(
            F.sum(F.col("watched_duration_total") * F.col("avg_speed_per_video")) /
            F.sum("watched_duration_total"), 2
        ).alias("avg_speed"),
        F.sum("watched_duration_total").alias("total_watched_duration"),
        F.round(
            F.sum("watched_duration_total") / F.sum("video_duration"), 3
        ).alias("avg_completion_rate"),
        F.collect_set("video_id").alias("video_list"),
        F.collect_set("ccid").alias("ccid_list"),
        F.collect_set("video_duration").alias("video_duration_list")
    )
)

In [50]:
df_user_stats.show()

+----------+---------+----------------------+-------------------+--------------------+--------------------+--------------------+
|   user_id|avg_speed|total_watched_duration|avg_completion_rate|          video_list|           ccid_list| video_duration_list|
+----------+---------+----------------------+-------------------+--------------------+--------------------+--------------------+
|U_10001181|      2.0|                2878.0|               NULL|         [V_7386535]|                  []|                  []|
|U_10001587|     1.89|                8184.0|               NULL|[V_6012902, V_604...|                  []|                  []|
| U_1000290|      1.0|                2311.0|              0.619|[V_6280777, V_628...|[5D058BEE09644CCD...|[1174.279, 859.22...|
| U_1000454|      1.0|                 495.0|              0.718|[V_6181150, V_739...|[05F564395C35232E...|   [226.92, 462.142]|
|  U_100066|      1.0|                  20.0|               NULL|         [V_6335195]|           

In [52]:
df_user_course = spark.read.csv(df_user_course_path, header=True, inferSchema=True)

In [53]:
df_uc_us = df_user_course.join(df_user_stats, on=['user_id'], how='left')
df_uc_us.show(truncate=False)

+----------+---------+-----------+----------+----------+-------------+-------------------+----------+---------+----------------------+-------------------+----------+---------+-------------------+
|user_id   |course_id|certificate|start_date|end_date  |duration_days|enroll_time        |remain_day|avg_speed|total_watched_duration|avg_completion_rate|video_list|ccid_list|video_duration_list|
+----------+---------+-----------+----------+----------+-------------+-------------------+----------+---------+----------------------+-------------------+----------+---------+-------------------+
|U_1000663 |C_2328510|0.0        |2020-11-18|2021-02-28|102.0        |2020-11-27 22:41:36|93.0      |NULL     |NULL                  |NULL               |NULL      |NULL     |NULL               |
|U_1000868 |C_947149 |1.0        |2019-12-30|2020-04-19|111.0        |2020-03-02 15:38:45|48.0      |NULL     |NULL                  |NULL               |NULL      |NULL     |NULL               |
|U_1000883 |C_947149

In [None]:
df_uc_uv.count()

In [None]:
df_uc_uv.select([count(when(col(c).isNull(), c)).alias(c) for c in df_uc_uv.columns]).show()

In [None]:
df_course = spark.read.json(BASE_DIR + '/course.json')
df_course.show(5,truncate=True)

In [None]:
df_user_video.show(5)

In [None]:
df_user_video = df_user_video.join(df_video_duration, on=['video_id'], how='left')
df_user_video.show(5)

In [None]:
df_uc_uv = df_uc_uv.join(df_video_duration, on = ["video_id"], how = 'left')
df_uc_uv.show(5)

In [None]:
from pyspark.sql.functions import col, isnan, when, count
df_uc_uv.count()
df_uc_uv.select([count(when(col(c).isNull(), c)).alias(c) for c in df_uc_uv.columns]).show()

In [None]:
df_user_course.show(5)

In [None]:
# df_uc_uv.write.mode('overwrite').parquet(MY_TEST_DIR)