# ALS on MovieLens 20M (Spark ML)

This notebook:
- loads the **MovieLens 20M** dataset from Databricks datasets
- trains multiple **ALS** models with different hyperparameters
- compares them using **RMSE**
- visualizes how RMSE changes with parameters
- adds **>=10 custom ratings** for a new user and shows **top‑20 recommendations**.


In [0]:
display(dbutils.fs.ls('/databricks-datasets/cs110x/ml-20m/data-001/'))

In [0]:
# Paths (Databricks datasets)
MOVIES_PATH  = "/databricks-datasets/cs110x/ml-20m/data-001/movies.csv"
RATINGS_PATH = "/databricks-datasets/cs110x/ml-20m/data-001/ratings.csv"

display(dbutils.fs.ls("/databricks-datasets/cs110x/ml-20m/data-001/"))

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, LongType

movies_schema = StructType([
    StructField("movieId", IntegerType(), True),
    StructField("title", StringType(), True),
    StructField("genres", StringType(), True),
])

ratings_schema = StructType([
    StructField("userId", IntegerType(), True),
    StructField("movieId", IntegerType(), True),
    StructField("rating", FloatType(), True),
    StructField("timestamp", LongType(), True),
]
)

In [0]:
from pyspark.sql import functions as F

df_movies = (spark.read.format("csv")
    .option("header", True)
    .schema(movies_schema)
    .load(MOVIES_PATH)
)

df_ratings = (spark.read.format("csv")
    .option("header", True)
    .schema(ratings_schema)
    .load(RATINGS_PATH)
    .select("userId","movieId","rating")  # timestamp not needed for ALS here
)

print("movies:", df_movies.count())
print("ratings:", df_ratings.count())
display(df_movies.limit(5))
display(df_ratings.limit(5))

In [0]:
# Quick sanity checks
df_ratings.select("rating").describe().show()
df_ratings.select(F.countDistinct("userId").alias("n_users"),
                  F.countDistinct("movieId").alias("n_movies")).show()

In [0]:
# Train / test split
df_train, df_test = df_ratings.randomSplit([0.8, 0.2], seed=42)

# Cache for repeated fits (REMOVED due to serverless limitation)
# df_train = df_train.cache()
# df_test  = df_test.cache()

print("train:", df_train.count())
print("test :", df_test.count())

In [0]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

def train_eval(rank: int, regParam: float, maxIter: int):
    als = ALS(
        rank=rank,
        regParam=regParam,
        maxIter=maxIter,
        userCol="userId",
        itemCol="movieId",
        ratingCol="rating",
        coldStartStrategy="drop"   # drop NaN predictions at eval time
    )
    model = als.fit(df_train)
    preds = model.transform(df_test)
    rmse = evaluator.evaluate(preds)
    return rmse

## Hyperparameter search
We try multiple values of:
- `rank`
- `regParam` (regularization)
- `maxIter` (iterations)

RMSE is lower = better.

In [0]:
# Parameter grids (adjust if your cluster is small)
ranks     = [10, 20, 30, 40]
regParams = [0.01, 0.05, 0.1, 0.2]
maxIters  = [5, 10, 15]

results = []
for r in ranks:
    for reg in regParams:
        for it in maxIters:
            rmse = train_eval(rank=r, regParam=reg, maxIter=it)
            results.append((r, reg, it, float(rmse)))
            print(f"rank={r:>2}  regParam={reg:<5}  maxIter={it:>2}  -> rmse={rmse}")

results_df = spark.createDataFrame(results, ["rank", "regParam", "maxIter", "rmse"])
display(results_df.orderBy("rmse"))

In [0]:
# Best model params (min RMSE)
best_row = results_df.orderBy("rmse").first()
best_rank, best_reg, best_iter, best_rmse = best_row["rank"], best_row["regParam"], best_row["maxIter"], best_row["rmse"]
print("BEST:", best_row)

## Visualizations
We’ll plot RMSE as a function of the tested values.

In [0]:
# Convert to pandas for plotting
pdf = results_df.toPandas()
pdf.head()

In [0]:
import matplotlib.pyplot as plt

# 1) RMSE vs rank (one line per regParam) for a chosen maxIter (here: best_iter)
fixed_iter = int(best_iter)
sub = pdf[pdf["maxIter"] == fixed_iter].copy()

for reg in sorted(sub["regParam"].unique()):
    d = sub[sub["regParam"] == reg].sort_values("rank")
    plt.plot(d["rank"], d["rmse"], marker="o", label=f"reg={reg}")

plt.xlabel("rank")
plt.ylabel("RMSE")
plt.title(f"RMSE vs rank (maxIter={fixed_iter})")
plt.legend()
plt.show()

In [0]:
# 2) Heatmap: RMSE for rank x regParam at best_iter
sub = pdf[pdf["maxIter"] == fixed_iter].copy()
pivot = sub.pivot(index="regParam", columns="rank", values="rmse").sort_index()

plt.imshow(pivot.values, aspect="auto")
plt.xticks(range(len(pivot.columns)), pivot.columns)
plt.yticks(range(len(pivot.index)), pivot.index)
plt.xlabel("rank")
plt.ylabel("regParam")
plt.title(f"RMSE heatmap (maxIter={fixed_iter})")
plt.colorbar(label="RMSE")
plt.show()

In [0]:
# 3) RMSE vs maxIter for best_rank + best_reg
sub = pdf[(pdf["rank"] == best_rank) & (pdf["regParam"] == best_reg)].sort_values("maxIter")
plt.plot(sub["maxIter"], sub["rmse"], marker="o")
plt.xlabel("maxIter")
plt.ylabel("RMSE")
plt.title(f"RMSE vs maxIter (rank={best_rank}, regParam={best_reg})")
plt.show()

## Train best ALS on full ratings + your custom ratings
We add a new user with **at least 10 ratings** and ask for top‑20 recommendations.

In [0]:
# Choose a new user id (one more than max existing)
new_user_id = df_ratings.agg(F.max("userId")).first()[0] + 1
print("new_user_id:", new_user_id)

In [0]:
# Pick at least 10 movies (MovieLens movieId is consistent across datasets)
my_rated_movies = [
    (new_user_id, 318, 5.0),   # Shawshank Redemption
    (new_user_id, 858, 5.0),   # Godfather
    (new_user_id, 296, 5.0),   # Pulp Fiction
    (new_user_id, 260, 4.5),   # Star Wars: Episode IV
    (new_user_id, 593, 4.0),   # Silence of the Lambs
    (new_user_id, 527, 4.5),   # Schindler's List
    (new_user_id, 50,  4.0),   # Usual Suspects
    (new_user_id, 2571,4.0),   # Matrix
    (new_user_id, 110, 4.0),   # Braveheart
    (new_user_id, 1196,4.5),   # Star Wars: Episode V
    (new_user_id, 1210,4.0),   # Star Wars: Episode VI
]

df_my_ratings = spark.createDataFrame(my_rated_movies, ["userId","movieId","rating"])

# Show the "my ratings" table with titles
df_my_ratings_named = (df_my_ratings
    .join(df_movies, on="movieId", how="left")
    .select("movieId","title","genres","rating")
    .orderBy(F.desc("rating"), "title")
)
display(df_my_ratings_named)

In [0]:
# Fit the best ALS on ALL ratings + my ratings
df_all = df_ratings.unionByName(df_my_ratings)

als_best = ALS(
    rank=int(best_rank),
    regParam=float(best_reg),
    maxIter=int(best_iter),
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    coldStartStrategy="drop"
)

best_model = als_best.fit(df_all)

In [0]:
from pyspark.sql import functions as F

# Ensure types match
df_movies = df_movies.withColumn("movieId", F.col("movieId").cast("int"))
df_my_ratings = df_my_ratings.withColumn("movieId", F.col("movieId").cast("int"))

new_user_id_int = int(new_user_id)

# Candidate set: all movies (for 1 user => ~27k rows, cheap)
df_candidates = (
    df_movies.select("movieId").distinct()
    .withColumn("userId", F.lit(new_user_id_int))
)

# Predict ratings for all candidate movies
df_pred = best_model.transform(df_candidates)

# Remove already-rated movies + null/NaN predictions
df_pred_clean = (
    df_pred
    .join(df_my_ratings.select("movieId").withColumn("already_rated", F.lit(1)),
          on="movieId", how="left")
    .filter(F.col("already_rated").isNull())
    .filter(F.col("prediction").isNotNull())
    .filter(~F.isnan("prediction"))
)

# Attach movie names and pick Top-20
recs_named = (
    df_pred_clean
    .join(df_movies, on="movieId", how="left")
    .select("movieId", "title", "genres", F.round("prediction", 3).alias("pred_rating"))
    .orderBy(F.desc("pred_rating"))
    .limit(20)
)

display(recs_named)


## Notes
- If runtime is too long, reduce the grids (`ranks`, `regParams`, `maxIters`) or sample ratings before training.
- `coldStartStrategy='drop'` avoids NaN predictions during evaluation.
- RMSE is computed on the held‑out test set.
