# SparkSession

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

In [None]:
spark = SparkSession.builder \
    .appName("data_preprocessing_gold") \
    .getOrCreate()

In [None]:
catalog = "1dt_team8_databricks"
movielens_schema = "`movielens-32m`"
final_schema = "`final`"

movies_with_ratings_silver = spark.table(f"{catalog}.{movielens_schema}.movies_with_ratings_silver")
rating_counts_df_silver = spark.table(f"{catalog}.{movielens_schema}.rating_counts_df_silver")
user_counts_silver = spark.table(f"{catalog}.{movielens_schema}.user_counts_silver")

# Gold

# 유저별 리뷰수 20개 이상 필터링

In [None]:
valid_users = user_counts_silver.filter(col("review_count") >= 20).select("userId")

# valid_users와 movies_with_ratings_silver 조인해서 필터링된 데이터셋 생성
filtered_df = movies_with_ratings_silver.join(valid_users, on="userId", how="inner")

# 유저별 랜덤 정렬 후 row 번호 생성
window = Window.partitionBy("userId").orderBy(rand(seed=0))
filtered_df = filtered_df.withColumn("row_num", row_number().over(window))

# 유저별 총 리뷰 개수 계산 후 조인
user_total_counts = filtered_df.groupBy("userId").count().withColumnRenamed("count", "total_count")
df_with_counts = filtered_df.join(user_total_counts, on="userId")

# train/val/test split

유저별로 6:2:2로 나눔

warm start

In [None]:
# 1) 유저별로 리뷰 랜덤 정렬 후 row 번호 생성 (리뷰 단위 분할)
window = Window.partitionBy("userId").orderBy(rand(seed=0))
filtered_df = filtered_df.withColumn("row_num", row_number().over(window))

user_total_counts = filtered_df.groupBy("userId").count().withColumnRenamed("count", "total_count")
df_with_counts = filtered_df.join(user_total_counts, on="userId")

df_split = df_with_counts.withColumn(
    "dataset",
    when(col("row_num") <= floor(col("total_count") * 0.6), "train")
    .when(col("row_num") <= floor(col("total_count") * 0.8), "val")
    .otherwise("test")
)

train_df = df_split.filter(col("dataset") == "train").drop("row_num", "total_count", "dataset")
val_df = df_split.filter(col("dataset") == "val").drop("row_num", "total_count", "dataset")
test_df = df_split.filter(col("dataset") == "test").drop("row_num", "total_count", "dataset")

cold start

In [None]:
# 2) 유저 단위로 랜덤하게 섞어서 데이터셋 할당 (유저 단위 분할)
users = filtered_df.select("userId").distinct()
users = users.withColumn("rand", rand(seed=42))
users_split = users.withColumn(
    "dataset",
    when(col("rand") <= 0.6, "train")
    .when(col("rand") <= 0.8, "val")
    .otherwise("test")
).select("userId", "dataset")

df_with_dataset = filtered_df.join(users_split, on="userId")

train_data = df_with_dataset.filter(col("dataset") == "train").drop("dataset", "row_num", "total_count")
val_data = df_with_dataset.filter(col("dataset") == "val").drop("dataset", "row_num", "total_count")
test_data = df_with_dataset.filter(col("dataset") == "test").drop("dataset", "row_num", "total_count")

In [None]:
train_df.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{final_schema}.train_df")
val_df.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{final_schema}.validation_df")
test_df.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{final_schema}.test_df")

train_data.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{final_schema}.train_data")
val_data.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{final_schema}.validation_data")
test_data.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{final_schema}.test_data")

print("골드 레이어에 리뷰 단위 분할 및 유저 단위 분할 데이터셋이 저장되었습니다.")