In [None]:
!pip install polars

In [None]:
import polars as pl
from datetime import datetime
import pandas as pd
import numpy as np

In [None]:
user_filtered_course_df = pl.read_parquet("/kaggle/input/user-video-filtered-course/user_filtered_course.parquet")
user_filtered_course_df.head()

In [None]:
uv_filtered_course_df = pl.read_parquet("/kaggle/input/user-video-filtered-course/uv_filtered_course.parquet")
uv_filtered_course_df[:2, :]

In [None]:
course_info_limit = pl.read_parquet("/kaggle/input/user-video-filtered-course/course_info_limit.parquet")
course_info_limit[:2, :]

In [None]:
uv_filtered_course_df = uv_filtered_course_df.drop('seq')

In [None]:
# TẠO CỘT LOCAL_START_TIME
uv_filtered_course_df = uv_filtered_course_df.with_columns(
    pl.col("segments_list").list.eval(  # Xử lý list ngoài
        pl.element().list.eval(         # Xử lý list bên trong
            pl.element().struct.field("local_start_time").cast(pl.Int64)
        )
    ).alias("local_start_time")
)

# Chuyển sang datetime format
uv_filtered_course_df = uv_filtered_course_df.with_columns(
    pl.col("local_start_time").list.eval(
        pl.element().list.eval(  # Xử lý list con
            pl.from_epoch(pl.element()).dt.strftime("%Y-%m-%d %H:%M:%S")
        )
    )
)

uv_filtered_course_df[:2, :]

In [None]:
# KIỂM TRA VIỆC XEM CÁC SEGMENT CỦA 1 VIDEO CÓ CHÊNH NHAU QUÁ 1 NGÀY KHÔNG
def check_gap_exceeds_one_day(row):
    for session in row:  # mỗi session là 1 mảng thời gian xem video
        if len(session) < 2:
            continue
        times = [datetime.strptime(t, "%Y-%m-%d %H:%M:%S") for t in session]
        times.sort()  # sắp xếp cho chắc ăn
        for i in range(1, len(times)):
            if (times[i] - times[i - 1]).days > 0:
                return True  # có cách nhau quá 1 ngày
    return False  # không vi phạm


In [None]:
# Giả sử uv_filtered_course_df là pl.DataFrame
df_check = uv_filtered_course_df.to_pandas()

df_check["has_big_gap"] = df_check["local_start_time"].apply(check_gap_exceeds_one_day)

# Đếm bao nhiêu hàng có mảng vi phạm
num_violate = df_check["has_big_gap"].sum()
print(f"Số hàng có timestamp cách nhau quá 14 ngày trong cùng session: {num_violate}")


In [None]:
from datetime import datetime

def count_sessions_with_large_gap(row):
    count = 0
    for session in row:  # session là 1 list các timestamp dạng chuỗi
        if len(session) < 2:
            continue
        times = [datetime.strptime(t, "%Y-%m-%d %H:%M:%S") for t in session]
        times.sort()
        for i in range(1, len(times)):
            if (times[i] - times[i - 1]).days > 0:
                count += 1
                break  # chỉ cần 1 cặp vi phạm là tính mảng đó
    return count

df_check = uv_filtered_course_df.to_pandas()

df_check["num_sessions_violate"] = df_check["local_start_time"].apply(count_sessions_with_large_gap)

df_check[["local_start_time", "num_sessions_violate"]].head()

In [None]:
total_exploded_rows = uv_filtered_course_df.select(
    pl.col("local_start_time").list.len().sum()
).item()


In [None]:
total_exploded_rows

In [None]:
cols_to_explode = [col for col in uv_filtered_course_df.columns if col != "user_id"]
df_exploded = uv_filtered_course_df.explode(cols_to_explode)

In [None]:
df_exploded.head()

In [None]:
df_exploded = df_exploded.drop('avg_duration_seg', 'num_seg_repeat', 'num_move_seg', 'watch_time', 'perc_miss', 'user_video_ccids', 'seg_intervals', 'p_seg', 'ent_seg')
df_exploded.head()

In [None]:

# GIỮ CÁC PHẦN TỬ CÓ LOCAL_START_TIME NHỎ TỎNG VÒNG 14 NAGFY TỪ NGÀY ĐĂNG KÍ
from datetime import datetime, timedelta

def filter_phase1_fixed(row):
    enroll_time = pd.to_datetime(row["enroll_time"])
    local_start_times = pd.to_datetime(row["local_start_time"])
    
    keep_idx = [
        i for i, t in enumerate(local_start_times)
        if enroll_time <= t <= enroll_time + timedelta(days=14)
    ]

    if not keep_idx:
        return None  # không giữ lại dòng nào

    # Cột giữ nguyên
    preserved = {
        "user_id": row["user_id"],
        "ccid": row["ccid"],
        "course_of_watched_video": row["course_of_watched_video"],
        "enroll_time": row["enroll_time"],
        "video_length": row["video_length"]
    }

    # Cột cần lọc
    list_columns = [
        "local_start_time", "duration_seg", "segments_list", "start_points",
        "end_points", "speed", "watch_time_seg"
    ]

    for col in list_columns:
        try:
            preserved[col] = [row[col][i] for i in keep_idx]
        except Exception as e:
            print(f"Lỗi khi xử lý cột {col}: {e}")
            preserved[col] = None

    return preserved

df_phase1_pandas = pd.DataFrame(df_exploded.to_pandas().apply(filter_phase1_fixed, axis=1).dropna().to_list())
df_phase1 = pl.from_pandas(df_phase1_pandas)

df_phase1.head()

In [None]:
df_phase1.shape

In [None]:
# GỘP LẠI THEO USER_ID

# Xác định các cột cần gom list
cols_to_group = [col for col in df_phase1.columns if col not in ["user_id", "course_of_watched_video"]]

# Group theo user_id và course_of_watched_video, gom list cho các cột còn lại
df_grouped_phase1 = df_phase1.group_by(["user_id", "course_of_watched_video"]).agg([
    pl.col(col) for col in cols_to_group
])

df_grouped_phase1.head()

In [None]:
df_grouped_phase1.shape

In [None]:
# Tạo cột video_watch_count_1 là cột đếm số lượng video đã xem trong phase 1
# bằng cách đếm số phần tử ccid của cột ccid
df_grouped_phase1 = df_grouped_phase1.with_columns([
    pl.col("ccid").list.unique().list.len().alias("video_watch_count_1")
])

df_grouped_phase1.head()

In [None]:
course_dict = (
    course_info_limit
    .select([
        pl.col("clean_course_id"),
        pl.col("ccids").list.len().alias("video_count")
    ])
    .to_dict(as_series=False)
)

# Chuyển thành dict: key là course id, value là số lượng video
course_video_count = dict(zip(course_dict["clean_course_id"], course_dict["video_count"]))


In [None]:
#Tạo cột unique_course
# course_of_watched_video là 1 giá trị duy nhất (str), nên không cần set hay list
df_grouped_phase1 = df_grouped_phase1.with_columns([
    pl.col("course_of_watched_video").alias("unique_course")
])

#  Tính tổng số video theo unique_course dùng course_video_count
# Hàm xử lý khi unique_course là 1 str duy nhất
def total_video_count(course: str) -> int:
    return course_video_count.get(course, 0)

df_grouped_phase1 = df_grouped_phase1.with_columns([
    pl.col("unique_course").map_elements(total_video_count, return_dtype=pl.Int64).alias("total_videos_for_user")
])

df_grouped_phase1 = df_grouped_phase1.with_columns([
    (pl.col("video_watch_count_1") * 100 / pl.col("total_videos_for_user")).alias("video_watched_percentage_1")
])


df_grouped_phase1.head()

In [None]:
# Cột video_watch_time_1 tổng các thời gian của duration_seg

df_grouped_phase1 = df_grouped_phase1.with_columns([

    # Tính max cho mỗi sublist trong duration_seg
    pl.col("duration_seg").map_elements(
        lambda nested: [max(d) if len(d) > 0 else 0 for d in nested],
        return_dtype=pl.List(pl.Float64)
    ).alias("max_watch_per_video")
])

# Tính phần trăm xem trên từng video: max_duration / video_length
df_grouped_phase1 = df_grouped_phase1.with_columns([
    pl.struct(["max_watch_per_video", "video_length"]).map_elements(
        lambda row: [
            (watched / length) * 100 if length > 0 else 0
            for watched, length in zip(row["max_watch_per_video"], row["video_length"])
        ],
        return_dtype=pl.List(pl.Float64)
    ).alias("watch_percentages")
])

# Lấy trung bình các phần trăm để tính video_percentage_watch_time và giới hạn ở 100
df_grouped_phase1 = df_grouped_phase1.with_columns([
    pl.col("watch_percentages")
      .list.mean()
      .clip(upper_bound=100)
      .alias("video_percentage_watch_time_1")
])

df_grouped_phase1 = df_grouped_phase1.drop(['max_watch_per_video', 'watch_percentages'])
# Xem thử kết quả
df_grouped_phase1.head()

In [None]:
# tính số lần ngắt quãng (pause) mà user đã có trong phase 1
df_grouped_phase1 = df_grouped_phase1.with_columns([
    pl.col("start_points")
      .map_elements(lambda list_of_lists: sum(len(sublist) for sublist in list_of_lists), return_dtype=pl.Int64)
      .alias("video_pause_count_1")
])

# tính trung bình số lần ngắt quãng đợt 1 là cột video_pause_avg_1, 
# tính độ lệch chuẩn số lần ngắt quãng đợt 1 là cột video_pause_std_1,
df_grouped_phase1 = df_grouped_phase1.with_columns([
    # Trung bình số lần ngắt quãng mỗi video
    pl.col("start_points")
      .map_elements(lambda list_of_lists: (
          sum(len(sublist) for sublist in list_of_lists) / len(list_of_lists)
          if len(list_of_lists) > 0 else 0
      ), return_dtype=pl.Float64)
      .alias("video_pause_avg_1"),

    # Độ lệch chuẩn số lần ngắt quãng mỗi video, thay null bằng 0.0
    pl.col("start_points")
      .map_elements(lambda list_of_lists: (
          float(pl.Series([len(sublist) for sublist in list_of_lists]).std())
          if len(list_of_lists) > 1 else 0.0  # ít nhất phải có 2 video mới tính std được
      ), return_dtype=pl.Float64)
      .alias("video_pause_std_1"),
])

df_grouped_phase1.head()

In [None]:
# TÍNH TRUNG BÌNH, STD SỐ LẦN XEM LẠI
def rewatch_counts(start_lists, end_lists):
    counts = []
    for starts, ends in zip(start_lists, end_lists):
        count = 0
        for i in range(1, len(starts)):
            if starts[i] < ends[i - 1]:
                count += 1
        counts.append(count)
    return counts

df_grouped_phase1 = df_grouped_phase1.with_columns([
    pl.struct(["start_points", "end_points"])
      .map_elements(lambda x: rewatch_counts(x["start_points"], x["end_points"]))
      .alias("video_rewatch_count_1")
])

df_grouped_phase1 = df_grouped_phase1.with_columns([
    pl.col("video_rewatch_count_1")
      .list.eval(pl.element().mean())
      .list.first()
      .alias("video_rewatch_avg_1"),

    pl.col("video_rewatch_count_1")
      .list.eval(pl.element().std(ddof=1))
      .list.first()
      .fill_null(0.0)
      .alias("video_rewatch_std_1"),
])


df_grouped_phase1.head()

In [None]:
df_grouped_phase1.head()

In [None]:
# TÍNH THỜI GIAN NGẮT QUÃNG GIỮA CÁC LẦN XEM, TRUNG BÌNH VÀ STD
#
df_grouped_phase1 = df_grouped_phase1.with_columns([
    pl.struct(["start_points", "end_points"]).map_elements(
        lambda row: [
            [s_next - e_curr for s_next, e_curr in zip(s[1:], e[:-1])] if len(s) > 1 else [0.0]
            for s, e in zip(row["start_points"], row["end_points"])
        ]
    ).alias("video_time_between_views_1")
])

df_grouped_phase1 = df_grouped_phase1.with_columns([
    # Tính trung bình
    pl.col("video_time_between_views_1").map_elements(
        lambda outer: float(sum([sum(inner) for inner in outer], 0.0)) / max(sum([len(inner) for inner in outer]), 1)
    ).alias("video_time_between_views_avg_1"),

    # Tính độ lệch chuẩn
    pl.col("video_time_between_views_1").map_elements(
        lambda outer: (
            (lambda flat: float(np.std(flat, ddof=1)) if len(flat) > 1 else 0.0)(
                [x for inner in outer for x in inner]
            )
        )
    ).alias("video_time_between_views_std_1")
])

df_grouped_phase1.head()

In [None]:
# TÍNH TRUNG BÌNH TỐC ĐỘ XEM

df_grouped_phase1 = df_grouped_phase1.with_columns([
    pl.col("speed").map_elements(
        lambda outer: float(
            sum([sum(inner) / len(inner) if len(inner) > 0 else 0.0 for inner in outer])
        ) / max(len(outer), 1)
    ).alias("video_speed_avg_1")
])

df_grouped_phase1.head()

In [None]:
# TÍNH ENTROPY CỦA KHOẢNG THỜI GIAN XEM

def entropy_single(duration_list):
    total = sum(duration_list)
    if total == 0 or len(duration_list) == 0:
        return 0.0
    probs = [d / total for d in duration_list]
    return sum([-p * np.log2(p) if p > 0 else 0.0 for p in probs])

# Áp dụng cho từng video trong danh sách duration_seg
df_grouped_phase1 = df_grouped_phase1.with_columns([
    pl.col("duration_seg").map_elements(
        lambda video_durations: [entropy_single(seg) for seg in video_durations]
    ).alias("ent_seg")  # đây là entropy của từng video
])

# Tính trung bình tất cả entropy từ các list trong ent_seg và thêm làm cột entropy_time_1
# Tính trung bình của các list trong ent_seg
df_grouped_phase1 = df_grouped_phase1.with_columns(
    pl.col("ent_seg").list.mean().alias("entropy_time_1")
)

df_grouped_phase1.head()

In [None]:
df_grouped_phase1.write_parquet("user_video_phase.parquet") 
#course_info_limit_df.write_parquet("course_info_limit.parquet") 