# MovieLens Collaborative Filtering Recommender System

This notebook builds a **production-ready recommendation system** using Spark ALS (Alternating Least Squares) to generate personalized movie recommendations.

## What This Notebook Does

- Load MovieLens ratings and movies from CSV (local or HDFS)
- Clean and filter data (remove sparse users/movies)
- Split data: 80% train, 10% validation, 10% test
- Train ALS model with hyperparameter tuning (rank, regParam)
- Evaluate model performance:
  - **RMSE**: Prediction accuracy on test ratings
  - **Precision@K and Recall@K**: Ranking quality for top-N recommendations
- Generate personalized top-N recommendations for all users
- Handle cold-start users with popularity baseline
- Export precomputed artifacts for API serving:
  - user_topn/: Top-100 recommendations per user with metadata
  - popularity/: Popular movies for cold-start fallback
  - movies_meta/: Movie metadata (title, genres, year)
  - item_factors/: ALS latent factors for item similarity

## Expected Runtime

10-30 minutes (depending on dataset size and hardware)

## Outputs

All artifacts saved to outputs/ directory in Parquet format for efficient API loading.

## 1. Setup and Configuration

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType, FloatType
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
import os

# Configuration
DATA_DIR = "../data/movielens/32m"
OUTPUT_DIR = "../outputs"
MIN_USER_INTERACTIONS = 20
MIN_MOVIE_INTERACTIONS = 20
N_RECS = 100
K = 10  # for Precision@K and Recall@K

print(f"Data directory: {DATA_DIR}")
print(f"Output directory: {OUTPUT_DIR}")

In [None]:
# Initialize Spark Session
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("MovieLens-Recommender") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

print(f"Spark version: {spark.version}")
print(f"Number of cores: {spark.sparkContext.defaultParallelism}")

## 2. Load and Clean Data

In [None]:
# Load ratings
ratings_raw = spark.read.csv(
    f"{DATA_DIR}/ratings.csv",
    header=True,
    inferSchema=True
)

print(f"Total ratings: {ratings_raw.count():,}")
ratings_raw.printSchema()
ratings_raw.show(5)

In [None]:
# Load movies
movies_raw = spark.read.csv(
    f"{DATA_DIR}/movies.csv",
    header=True,
    inferSchema=True
)

print(f"Total movies: {movies_raw.count():,}")
movies_raw.printSchema()
movies_raw.show(5)

In [None]:
# Clean ratings: cast to proper types
ratings = ratings_raw.select(
    F.col("userId").cast(IntegerType()),
    F.col("movieId").cast(IntegerType()),
    F.col("rating").cast(FloatType()),
    F.col("timestamp").cast(IntegerType())
).dropna()

print(f"Cleaned ratings: {ratings.count():,}")

In [None]:
# Filter out sparse users and movies
user_counts = ratings.groupBy("userId").count()
active_users = user_counts.filter(F.col("count") >= MIN_USER_INTERACTIONS).select("userId")

movie_counts = ratings.groupBy("movieId").count()
popular_movies = movie_counts.filter(F.col("count") >= MIN_MOVIE_INTERACTIONS).select("movieId")

ratings_filtered = ratings \
    .join(active_users, "userId", "inner") \
    .join(popular_movies, "movieId", "inner")

print(f"Filtered ratings: {ratings_filtered.count():,}")
print(f"Unique users: {ratings_filtered.select('userId').distinct().count():,}")
print(f"Unique movies: {ratings_filtered.select('movieId').distinct().count():,}")

## 3. Train/Validation/Test Split

In [None]:
# Split data: 80% train, 10% validation, 10% test
train_val, test = ratings_filtered.randomSplit([0.9, 0.1], seed=42)
train, val = train_val.randomSplit([0.89, 0.11], seed=42)  # 0.9 * 0.89 ≈ 0.8

train.cache()
val.cache()
test.cache()

print(f"Train: {train.count():,}")
print(f"Validation: {val.count():,}")
print(f"Test: {test.count():,}")

## 4. Hyperparameter Tuning

In [None]:
# Define hyperparameter grid
param_grid = [
    {"rank": 10, "regParam": 0.01},
    {"rank": 10, "regParam": 0.1},
    {"rank": 20, "regParam": 0.01},
    {"rank": 20, "regParam": 0.1},
    {"rank": 50, "regParam": 0.01},
]

evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)

best_rmse = float("inf")
best_params = None
best_model = None

print("Starting hyperparameter search...\n")

for params in param_grid:
    print(f"Training with rank={params['rank']}, regParam={params['regParam']}")
    
    als = ALS(
        maxIter=10,
        rank=params["rank"],
        regParam=params["regParam"],
        userCol="userId",
        itemCol="movieId",
        ratingCol="rating",
        coldStartStrategy="drop",
        seed=42
    )
    
    model = als.fit(train)
    predictions = model.transform(val)
    rmse = evaluator.evaluate(predictions)
    
    print(f"  Validation RMSE: {rmse:.4f}")
    
    if rmse < best_rmse:
        best_rmse = rmse
        best_params = params
        best_model = model
        print(f"  *** New best model! ***")
    
    print()

print(f"Best parameters: {best_params}")
print(f"Best validation RMSE: {best_rmse:.4f}")

## 5. Final Model Training and Evaluation

In [None]:
# Train final model on train+val using best parameters
als_final = ALS(
    maxIter=10,
    rank=best_params["rank"],
    regParam=best_params["regParam"],
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    coldStartStrategy="drop",
    seed=42
)

model_final = als_final.fit(train_val)

print("Final model trained on train+validation data")

In [None]:
# Evaluate on test set
test_predictions = model_final.transform(test)
rmse_evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)

test_rmse = rmse_evaluator.evaluate(test_predictions)
print(f"Test RMSE: {test_rmse:.4f}")
print(f"\nInterpretation: On average, predictions are off by {test_rmse:.2f} stars")

## 6. Precision@K and Recall@K Evaluation

In [None]:
# Generate top-K recommendations for test users
test_users = test.select("userId").distinct()
user_recs = model_final.recommendForUserSubset(test_users, K)

# Explode recommendations
user_recs_exploded = user_recs.select(
    "userId",
    F.explode("recommendations").alias("rec")
).select(
    "userId",
    F.col("rec.movieId").alias("movieId"),
    F.col("rec.rating").alias("score")
)

# Get relevant items (movies rated >= 4.0 in test set)
relevant_items = test.filter(F.col("rating") >= 4.0).select("userId", "movieId")

# Calculate precision and recall
recommended_items = user_recs_exploded.select("userId", "movieId")

# True positives: recommended AND relevant
tp = recommended_items.join(relevant_items, ["userId", "movieId"], "inner") \
    .groupBy("userId").count().withColumnRenamed("count", "tp")

# Total recommended per user
total_rec = recommended_items.groupBy("userId").count().withColumnRenamed("count", "total_rec")

# Total relevant per user
total_rel = relevant_items.groupBy("userId").count().withColumnRenamed("count", "total_rel")

# Join and calculate metrics
metrics_df = total_rec.join(total_rel, "userId", "inner").join(tp, "userId", "left_outer").fillna(0)

metrics_df = metrics_df.withColumn(
    "precision", F.col("tp") / F.col("total_rec")
).withColumn(
    "recall", F.col("tp") / F.col("total_rel")
)

avg_precision = metrics_df.agg(F.avg("precision")).collect()[0][0]
avg_recall = metrics_df.agg(F.avg("recall")).collect()[0][0]

print(f"Precision@{K}: {avg_precision:.4f}")
print(f"Recall@{K}: {avg_recall:.4f}")
print(f"\nInterpretation:")
print(f"  - {avg_precision*100:.1f}% of top-{K} recommendations are relevant (rated ≥4.0)")
print(f"  - We capture {avg_recall*100:.1f}% of all relevant items in top-{K}")

## 7. Generate Recommendations for All Users

In [None]:
# Generate top-N recommendations for all users
all_users = ratings_filtered.select("userId").distinct()
all_user_recs = model_final.recommendForUserSubset(all_users, N_RECS)

print(f"Generated top-{N_RECS} recommendations for {all_user_recs.count():,} users")

In [None]:
# Prepare recommendations with movie metadata
user_recs_ranked = all_user_recs.select(
    "userId",
    F.explode("recommendations").alias("rec")
).select(
    "userId",
    F.col("rec.movieId").alias("movieId"),
    F.col("rec.rating").alias("score")
)

# Add rank within user
window_spec = Window.partitionBy("userId").orderBy(F.col("score").desc())
user_recs_ranked = user_recs_ranked.withColumn("rank", F.row_number().over(window_spec))

# Join with movie titles
movies_clean = movies_raw.select(
    F.col("movieId").cast(IntegerType()),
    F.col("title"),
    F.col("genres")
)

user_recs_with_titles = user_recs_ranked.join(movies_clean, "movieId", "left")

print("Sample recommendations with metadata:")
user_recs_with_titles.filter(F.col("userId") == 1).orderBy("rank").show(10, truncate=False)

## 8. Popularity Baseline for Cold Start

In [None]:
# Calculate popularity baseline (most-rated movies)
popularity = ratings_filtered.groupBy("movieId").agg(
    F.count("rating").alias("interaction_count"),
    F.avg("rating").alias("avg_rating")
).join(movies_clean, "movieId", "inner")

# Filter for high-quality popular movies
popularity_threshold = 100
popular_movies = popularity \
    .filter(F.col("interaction_count") >= popularity_threshold) \
    .filter(F.col("avg_rating") >= 3.5) \
    .orderBy(F.col("interaction_count").desc()) \
    .limit(N_RECS)

print(f"Top {N_RECS} popular movies for cold-start fallback:")
popular_movies.show(10, truncate=False)

## 9. Export Artifacts for Production API

In [None]:
# Create output directory
os.makedirs(OUTPUT_DIR, exist_ok=True)

# Define output paths
output_path_user_recs = f"{OUTPUT_DIR}/user_topn"
output_path_popular = f"{OUTPUT_DIR}/popularity"
output_path_movies = f"{OUTPUT_DIR}/movies_meta"
output_path_item_factors = f"{OUTPUT_DIR}/item_factors"

print(f"Export paths:")
print(f"  - User recommendations: {output_path_user_recs}")
print(f"  - Popular movies: {output_path_popular}")
print(f"  - Movie metadata: {output_path_movies}")
print(f"  - Item factors: {output_path_item_factors}")

In [None]:
# Export user recommendations
user_recs_with_titles.coalesce(1).write.mode("ignore").parquet(output_path_user_recs)
print(f"✓ Exported user recommendations to {output_path_user_recs}")

In [None]:
# Export popularity baseline
popular_movies.coalesce(1).write.mode("ignore").parquet(output_path_popular)
print(f"✓ Exported popularity baseline to {output_path_popular}")

In [None]:
# Export movie metadata
movies_clean.coalesce(1).write.mode("ignore").parquet(output_path_movies)
print(f"✓ Exported movie metadata to {output_path_movies}")

In [None]:
# Export item factors for similarity computation
item_factors = model_final.itemFactors
item_factors.coalesce(1).write.mode("ignore").parquet(output_path_item_factors)
print(f"✓ Exported item factors to {output_path_item_factors}")

## 10. Verify Exports and Summary

In [None]:
# Verify exports by reading them back
print("Verifying exports...\n")

user_recs_check = spark.read.parquet(output_path_user_recs)
print(f"User recommendations: {user_recs_check.count():,} records")

popular_check = spark.read.parquet(output_path_popular)
print(f"Popular movies: {popular_check.count():,} records")

movies_check = spark.read.parquet(output_path_movies)
print(f"Movie metadata: {movies_check.count():,} records")

factors_check = spark.read.parquet(output_path_item_factors)
print(f"Item factors: {factors_check.count():,} records")

print("\n✓ All artifacts successfully exported!")

In [None]:
# Summary
print("="*60)
print("RECOMMENDATION SYSTEM SUMMARY")
print("="*60)
print(f"\nModel Configuration:")
print(f"  - Algorithm: ALS (Alternating Least Squares)")
print(f"  - Rank: {best_params['rank']}")
print(f"  - Regularization: {best_params['regParam']}")
print(f"\nPerformance Metrics:")
print(f"  - Test RMSE: {test_rmse:.4f}")
print(f"  - Precision@{K}: {avg_precision:.4f}")
print(f"  - Recall@{K}: {avg_recall:.4f}")
print(f"\nDataset Statistics:")
print(f"  - Total ratings: {ratings_filtered.count():,}")
print(f"  - Unique users: {ratings_filtered.select('userId').distinct().count():,}")
print(f"  - Unique movies: {ratings_filtered.select('movieId').distinct().count():,}")
print(f"\nGenerated Artifacts:")
print(f"  - Top-{N_RECS} recommendations per user")
print(f"  - {popular_check.count()} popular movies for cold-start")
print(f"  - Movie metadata and item factors")
print("\n" + "="*60)
print("Ready for production deployment via FastAPI!")
print("="*60)

In [None]:
# Stop Spark session
# spark.stop()