#0. 환경

In [0]:
%pip install optuna

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import sum as _sum
from pyspark.sql.window import Window
from pyspark.ml.functions import vector_to_array
from pyspark.ml.feature import RegexTokenizer, CountVectorizer, VectorAssembler, StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.types import DoubleType, IntegerType
from xgboost.spark import SparkXGBClassifier
import mlflow
# import optuna

In [0]:
spark = SparkSession.builder \
    .appName("sparkXGBoost") \
    .getOrCreate()
mlflow.autolog(disable=True)
mlflow.spark.autolog(disable=True)

# 1. 데이터로드

### 데이터 로드

In [0]:
catalog = "1dt_team8_databricks"
schema = "`final`"
path = f"{catalog}.{schema}"

try:
    train = spark.read.table(f"{path}.train_df")
    validation = spark.read.table(f"{path}.validation_df")
    test = spark.read.table(f"{path}.test_df")
except Exception as e:
    print(f"Error loading data from Unity Catalog Volume: {e}")
# display(train)
# display(validation)
# display(test)

train = train.withColumn("label", when(train["rating"] >= 4, 1).otherwise(0))
validation = validation.withColumn("label", when(validation["rating"] >= 4, 1).otherwise(0))
test = test.withColumn("label", when(test["rating"] >= 4, 1).otherwise(0))


# train, validation, test에서 movieId 컬럼만 추출해서 중복 제거한 뒤 합치기
movies_train = train.select("movieId").distinct()
movies_validation = validation.select("movieId").distinct()
movies_test = test.select("movieId").distinct()

# 세 데이터프레임 합치기
all_movies = movies_train.union(movies_validation).union(movies_test).distinct()

### IMDB 로드

In [0]:
catalog = "1dt_team8_databricks"
schema = "`imdb`"
imdb_path = f"{catalog}.{schema}"

try:
    imdb_ratings = spark.read.table(f"{imdb_path}.title_ratings")
    title_basics = spark.read.table(f"{imdb_path}.title_basics")
    print("Data loaded successfully from Unity Catalog Volume.")
except Exception as e:
    print(f"Error loading data from Unity Catalog Volume: {e}")
    print(f"Please ensure CSV files (imdb_ratings.csv) exist in {imdb_path}")

In [0]:
catalog = "1dt_team8_databricks"
schema = "`movielens-32m`"
volume_path = f"{catalog}.{schema}"

links = spark.read.table(f"{volume_path}.links")

catalog = "1dt_team8_databricks"
schema = "`imdb`"
imdb_path = f"{catalog}.{schema}"

# 1. links에서 imdbId → tconst 형식으로 변환
links_with_tconst = links.withColumn(
    "tconst", concat(lit("tt"), lpad(col("imdbId").cast("string"), 7, "0"))
)

# 2. 필요한 열만 추출
title_years = title_basics.select("tconst", "startYear")

# 3. movies별로 year가 null인 경우에만 보완할 준비
def fill_year_from_title_basics(df, links_df, title_basics_df):
    # movieId 기준으로 links 조인 → tconst 획득
    df_with_tconst = df.join(links_df.select("movieId", "tconst"), on="movieId", how="left")

    # tconst 기준으로 title_basics 조인 → startYear 획득
    df_with_year = df_with_tconst.join(
        title_basics_df.select("tconst", "startYear"), on="tconst", how="left"
    )

    # 기존 year가 null인 경우 startYear로 대체
    df_filled = df_with_year.withColumn(
        "year",
        when(col("year").isNull(), col("startYear")).otherwise(col("year"))
    )

    # 불필요한 열 제거 및 정리
    return df_filled.drop("startYear", "tconst")

# 4. 각 데이터셋에 적용
train = fill_year_from_title_basics(train, links_with_tconst, title_years)
validation = fill_year_from_title_basics(validation, links_with_tconst, title_years)
test = fill_year_from_title_basics(test, links_with_tconst, title_years)

# 5. 결측치가 정말 채워졌는지 확인
print("Train 결측치 수:", train.filter(col("year").isNull()).count())
print("Val 결측치 수:", validation.filter(col("year").isNull()).count())
print("Test 결측치 수:", test.filter(col("year").isNull()).count())

train = train.withColumn("year", col("year").cast(IntegerType()))
validation = validation.withColumn("year", col("year").cast(IntegerType()))
test = test.withColumn("year", col("year").cast(IntegerType()))

In [0]:
# 1. links에서 imdbId → tconst 변환 (이미 했으니 재활용)
links_with_tconst = links.withColumn(
    "tconst", concat(lit("tt"), lpad(col("imdbId").cast("string"), 7, "0"))
).select("movieId", "tconst")

# 2. imdb_ratings에서 필요한 컬럼만 추출
imdb_ratings_selected = imdb_ratings.select("tconst", "averageRating", "numVotes")

# 3. train/validation/test에 tconst 조인
def join_imdb_ratings(df):
    df_with_tconst = df.join(links_with_tconst, on="movieId", how="left")
    df_joined = df_with_tconst.join(imdb_ratings_selected, on="tconst", how="left")
    
    # 결측치 처리 (예: averageRating은 0.0, numVotes는 0으로 대체)
    df_filled = df_joined.fillna({"averageRating": 0.0, "numVotes": 0})
    
    # 필요하면 타입 변환
    df_final = df_filled.withColumn("averageRating", col("averageRating").cast("double")) \
                        .withColumn("numVotes", col("numVotes").cast("int"))
    
    # tconst 컬럼은 필요 없으면 삭제
    df_final = df_final.drop("tconst")
    
    return df_final

train = join_imdb_ratings(train)
validation = join_imdb_ratings(validation)
test = join_imdb_ratings(test)

# 연결 완료 후 데이터 확인 (샘플)
train.select("movieId", "averageRating", "numVotes").show(5)

# 2. 데이터분리

## 파이프라인

In [0]:
from pyspark.ml.feature import MinMaxScaler
user_indexer = StringIndexer(inputCol="userId", outputCol="userIndex", handleInvalid="keep")
movie_indexer = StringIndexer(inputCol="movieId", outputCol="movieIndex", handleInvalid="keep")
user_indexer_model = user_indexer.fit(train)
movie_indexer_model = movie_indexer.fit(train)

tokenizer = RegexTokenizer(inputCol="genres", outputCol="genres_tokens", pattern="\\|")
vectorizer = CountVectorizer(inputCol="genres_tokens", outputCol="genres_vec")
assembler_numvotes = VectorAssembler(inputCols=["numVotes"], outputCol="numVotes_vec")
scaler = MinMaxScaler(inputCol="numVotes_vec", outputCol="numVotes_scaled")

assembler_all = VectorAssembler(
    inputCols=["genres_vec", "userIndex", "movieIndex", "year", "averageRating", "numVotes_scaled"],
    outputCol="features"
)

pipeline = Pipeline(stages=[
    tokenizer, vectorizer, assembler_numvotes, scaler,
    user_indexer, movie_indexer, assembler_all

])

pipeline_model = pipeline.fit(train)
train_transformed = pipeline_model.transform(train)
validation_transformed = pipeline_model.transform(validation)
test_transformed = pipeline_model.transform(test)

train_transformed.cache()
validation_transformed.cache()
test_transformed.cache()

# 3. 모델 설계 및 평가

## XgbClassifier

In [0]:
xgb_model = SparkXGBClassifier(
    max_depth=6,
    num_round=100,
    eta=0.1,
    eval_metric='logloss',
    missing=0.0,  # NaN 처리 방식
    features_col="features",
    label_col="label",
    prediction_col="prediction",
    probability_col="probability",
    seed=0
)

# 2. 학습
xgb_model_fitted = xgb_model.fit(train_transformed)

In [0]:
# 1. validation 데이터로 예측 수행
raw_predictions = xgb_model_fitted.transform(validation_transformed)

# 2. 예측 확률(y_pred_proba) 컬럼 추가 (probability 벡터에서 긍정 클래스 확률 추출)
predictions = raw_predictions.withColumn("y_pred_proba", vector_to_array(col("probability"))[1])

# 3. 필요한 컬럼만 선택 (userIndex, movieIndex, 실제 label, prediction, 예측 확률)
predictions = predictions.select(
    "userIndex",
    "movieIndex",
    col("label"),
    "prediction",
    "y_pred_proba"
)

# 4. 결과 확인
predictions.show(truncate=False)

### 평가지표

In [0]:
# 1. Top-K 추출
k = 10
window_spec = Window.partitionBy("userIndex").orderBy(col("y_pred_proba").desc())

topk_df = predictions.withColumn("rank", row_number().over(window_spec)) \
                     .filter(col("rank") <= k)

In [0]:
# 2. Precision@K 계산
precision_df = topk_df.groupBy("userIndex") \
    .agg(_sum(col("label")).alias("true_positives")) \
    .withColumn("precision_at_k", col("true_positives") / k)

mean_precision = precision_df.agg(avg("precision_at_k")).first()[0]
print(f"Precision@{k}: {mean_precision:.4f}")

In [0]:
# 3. Recall@K 계산
total_relevant_df = predictions.groupBy("userIndex") \
    .agg(_sum(col("label")).alias("total_relevant"))

recall_df = precision_df.join(total_relevant_df, on="userIndex") \
    .withColumn("recall_at_k", col("true_positives") / col("total_relevant"))

mean_recall = recall_df.agg(avg("recall_at_k")).first()[0]
print(f"Recall@{k}: {mean_recall:.4f}")

### MLFlow + Optuna

In [0]:
def objective(trial):
    param = {
        "max_depth": trial.suggest_int("max_depth", 3, 10),
        "eta": trial.suggest_float("eta", 0.01, 0.3),
        "num_round": trial.suggest_int("num_round", 50, 100),
        "eval_metric": "logloss",
    }

    model = SparkXGBClassifier(
        features_col="features",
        label_col="label",
        prediction_col="prediction",
        probability_col="probability",
        seed=0,
        maxDepth=param["max_depth"],
        eta=param["eta"],
        numRound=param["num_round"],
        evalMetric="logloss"
    )

    model_fitted = model.fit(train_transformed)

    pred_test = model_fitted.transform(validation_transformed)
    pred_test = pred_test.withColumn("y_pred_prob", vector_to_array(col("probability"))[1])

    window_spec = Window.partitionBy("userIndex").orderBy(col("y_pred_prob").desc())
    top_k = pred_test.withColumn("rank", row_number().over(window_spec)) \
                     .filter(col("rank") <= k)

    precision_df = top_k.groupBy("userIndex").agg(avg("label").alias("user_precision"))
    result = precision_df.select(avg("user_precision").alias("precision_at_k")).collect()

    avg_precision_at_k = result[0]["precision_at_k"] if result and result[0]["precision_at_k"] is not None else 0.0

    # MLflow 기록 없음!

    return -avg_precision_at_k  # 최대화 목표를 위해 음수 반환

In [0]:
# Optuna 스터디 실행
study = optuna.create_study(direction="minimize")
study.optimize(objective, n_trials=10)

In [0]:
best_params = study.best_params

# 최종 모델 학습 및 MLflow 기록
with mlflow.start_run(run_name="Best_SparkXGBoost_Model"):
    mlflow.log_params(best_params)

    final_model = SparkXGBClassifier(
        features_col="features",
        label_col="label",
        prediction_col="prediction",
        probability_col="probability",
        missing=0.0,
        seed=0,
        maxDepth=best_params["max_depth"],
        eta=best_params["eta"],
        numRound=best_params["num_round"],
        evalMetric="logloss"
    )

    fitted_model = final_model.fit(train_transformed)

    mlflow.spark.log_model(fitted_model, "model")

    # 최종 모델 평가 및 기록
    pred = fitted_model.transform(validation_transformed)
    pred = pred.withColumn("y_pred_prob", vector_to_array(col("probability"))[1])

    window_spec = Window.partitionBy("userIndex").orderBy(col("y_pred_prob").desc())
    top_k = pred.withColumn("rank", row_number().over(window_spec)) \
                 .filter(col("rank") <= k)

    precision_df = top_k.groupBy("userIndex").agg(avg("label").alias("user_precision"))
    avg_precision = precision_df.select(avg("user_precision").alias("precision_at_k")).collect()[0]["precision_at_k"]

    mlflow.log_metric("precision_at_k", avg_precision)

    pred.select("userIndex", "movieIndex", "prediction").show()

# 4. 결과

## 모델 로드

In [0]:
K = 10

# 1. 최적화된 모델 로드 (run_id 또는 experiment에서 마지막 모델 불러오기)
model_uri = "runs:/d425b9e818124a1dbcc3fb2a6e2741f6/final_model"
final_model = mlflow.spark.load_model(model_uri)

# 2. 테스트 예측 및 확률 추출
pred_test = final_model.transform(test_transformed)
pred_test = pred_test.withColumn("y_pred_proba", vector_to_array(col("probability"))[1])

## 평가지표

In [0]:
K = 10

# 사용자별 상위 K개 추천 항목 선택
window_spec = Window.partitionBy("userIndex").orderBy(col("y_pred_proba").desc())
top_k_pred = pred_test.withColumn("rank", row_number().over(window_spec)) \
                      .filter(col("rank") <= K)

In [0]:
# 사용자별 Precision@K 계산
precision_df = top_k_pred.groupBy("userIndex") \
                         .agg(avg("label").alias("precision_at_k"))

mean_precision_at_k = precision_df.select(avg("precision_at_k").alias("mean_precision_at_k")) \
                                  .first()["mean_precision_at_k"]

print(f"[Test Set] Precision@{K}: {mean_precision_at_k:.4f}")

In [0]:
# 4. Recall@K 계산
total_relevant_df = pred_test.groupBy("userIndex") \
                             .agg(_sum("label").alias("total_relevant"))

true_positives_df = top_k_pred.groupBy("userIndex") \
                             .agg(_sum("label").alias("true_positives"))

recall_df = true_positives_df.join(total_relevant_df, on="userIndex") \
    .withColumn("recall_at_k",
                when(col("total_relevant") == 0, 0.0)
                .otherwise(col("true_positives") / col("total_relevant")))
    
mean_recall_at_k = recall_df.select(avg("recall_at_k").alias("mean_recall_at_k")) \
                           .first()["mean_recall_at_k"]

print(f"[Test Set] Recall@{K}: {mean_recall_at_k:.4f}")

## 사용자별 추천 영화(시청 영화 제외)

In [0]:
# 1. 사용자별 본 영화 집합 생성 (train, validation 데이터 기준)
seen_movies_df = train_transformed.select("userIndex", "movieIndex") \
    .union(validation_transformed.select("userIndex", "movieIndex")) \
    .distinct() \
    .groupBy("userIndex") \
    .agg(collect_set("movieIndex").alias("seen_movies"))

# 2. test_pred와 seen_movies_df를 userIndex 기준으로 left_outer join
pred_with_seen = pred_test.join(seen_movies_df, on="userIndex", how="left_outer")

# 3. 이미 본 영화 제외 (seen_movies가 null일 경우 대비)
pred_filtered = pred_with_seen.filter(
    (col("seen_movies").isNull()) | (~array_contains(col("seen_movies"), col("movieIndex")))
)

# 4. movieIndex → movieId, title 매핑
movie_index_mapping = train_transformed.select(
    "movieIndex",
    col("movieId").alias("movieId")
).dropDuplicates()

movies_meta_clean = train.select(
    col("movieId").alias("movieId_meta"),
    col("title").alias("title_meta")
).dropDuplicates()

# 5. top_k에 movieId 추가
top_k_with_id = top_k_filtered.join(movie_index_mapping, on="movieIndex", how="left")

# 6. movieId 기준으로 title 조인
top_k_with_title = top_k_with_id.join(
    movies_meta_clean,
    top_k_with_id["movieId"] == movies_meta_clean["movieId_meta"],
    how="left"
)

# 7. 추천 리스트 구조화
top_k_with_title = top_k_with_title.withColumn(
    "recommendation",
    struct(
        col("movieId").alias("movieId"),
        col("title_meta").alias("title"),
        col("y_pred_proba").alias("pred_rating")
    )
)

# 8. 사용자별 추천 리스트 그룹화
recommendations = top_k_with_title.groupBy("userIndex") \
                                  .agg(collect_list("recommendation").alias("recommendations")) \
                                  .orderBy("userIndex")

recommendations.show(truncate=False)

## 시각화

배열을 flatten

In [0]:
# 1. recommendations 배열 explode
recommendations_exploded = recommendations.select(
    "userIndex",
    explode(col("recommendations")).alias("rec")
)

# 2. rec 구조체에서 필드 추출하여 컬럼 생성
recommendations_flat = recommendations_exploded.select(
    "userIndex",
    col("rec.movieId").alias("movieId"),
    col("rec.title").alias("title"),
    col("rec.pred_rating").alias("pred_rating")
)

rec_with_genres  = test.select("movieId", "title", "genres").dropDuplicates()

# 3. movieId 기준으로 join
rec_with_genres  = recommendations_flat.join(
    rec_with_genres ,
    on="movieId",
    how="left"
)

# 4. genres를 '|' 기준으로 분리 후 explode
recs_vis = rec_with_genres.withColumn("genre", explode(split(col("genres"), "\\|")))
display(recs_vis)

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

## 4-2. 추가 User 처리 어떤 영화를 추천할지

1. 신규 유저가 본 영화와 기존 유저들의 시청 목록 간 Jaccard 유사도를 계산하여 가장 유사한 기존 유저를 식별함.
2. 유사한 기존 유저의 userIndex로 대체, 신규 유저가 보지 않은 영화들에 대해 추천 모델로 예측을 수행함.
3. 예측 결과 중 상위 K개의 영화를 추출하여 신규 유저에게 추천함.



In [0]:
# Step 1. 신규 유저가 본 영화 리스트
new_user_seen_movies = [1, 10, 50, 300]
new_user_set = set(new_user_seen_movies)

# Step 2. 기존 유저별 시청 영화 집합 생성
user_movie_sets = train.groupBy("userId").agg(collect_set("movieId").alias("movie_set"))

# Step 3. Jaccard 유사도 계산
def jaccard_similarity(set1, set2):
    set1, set2 = set(set1), set(set2)
    intersection = len(set1.intersection(set2))
    union = len(set1.union(set2))
    return float(intersection) / union if union != 0 else 0.0

jaccard_udf = udf(lambda x: jaccard_similarity(new_user_set, x), DoubleType())

similar_users = user_movie_sets.withColumn("jaccard_sim", jaccard_udf(col("movie_set"))) \
                               .orderBy(desc("jaccard_sim")) \
                               .limit(1)

# Step 4. 유사한 기존 유저의 userId 및 userIndex 추출
top_user_id = similar_users.select("userId").first()["userId"]
top_user_index = user_indexer_model.transform(
    spark.createDataFrame([(top_user_id,)], ["userId"])
).select("userIndex").first()["userIndex"]

# Step 5. 신규 유저가 보지 않은 영화만 필터링 (train + test 영화 모두 포함 가능)
all_movies = train.select("movieId").union(test.select("movieId")).dropDuplicates()
unseen_movies = all_movies.filter(~col("movieId").isin(new_user_seen_movies))

# movies_meta: movieId와 title이 포함된 영화 메타 데이터 (train 또는 별도)
unseen_movies = unseen_movies.join(
    movies_meta,
    unseen_movies.movieId == movies_meta.movieId_meta,
    how="left"
).select(
    unseen_movies.movieId,
    movies_meta.title_meta.alias("title"),
    movies_meta.genres_meta.alias("genres")
)

# train에서 고유한 movieId-year 정보 추출
movie_year_df = train.select("movieId", "year").dropna().dropDuplicates(["movieId"])

# unseen_movies에 year 붙이기
unseen_movies = unseen_movies.join(movie_year_df, on="movieId", how="left")

In [0]:
# Step 6. userId, genres 컬럼 추가
unseen_movies = unseen_movies.withColumn("userId", lit(top_user_id))

# Step 7. pipeline_model로 features 추출
unseen_with_features = pipeline_model.transform(unseen_movies)

# Step 8. 모든 row에 해당 유사 유저의 userIndex 덮어쓰기
unseen_with_features = unseen_with_features.withColumn("userIndex", lit(top_user_index))

# Step 9. 중복 영화 제거
unseen_unique = unseen_with_features.dropDuplicates(["movieId"])


# Step 10. Spark ML 모델로 예측 수행 (transform 사용)
predicted_df = final_model.transform(unseen_unique)

# Step 11. 확률 벡터에서 긍정 클래스(1) 확률 추출
predicted_df = predicted_df.withColumn("prediction_proba", vector_to_array(col("probability"))[1])

# Step 12. 상위 K개 추출
K = 10
top_k_df = predicted_df.orderBy(col("prediction_proba").desc()).limit(K)

# Step 13. 결과 출력
top_k_df.select("movieId", "title", "prediction_proba").show()