In [0]:
from pyspark.sql.types import *
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col, lit, desc

ratings_20m_path = "/databricks-datasets/cs110x/ml-20m/data-001/ratings.csv"
movies_20m_path  = "/databricks-datasets/cs110x/ml-20m/data-001/movies.csv"

In [0]:
ratings_schema_20m = StructType([
    StructField('userId', IntegerType()),
    StructField('movieId', IntegerType()),
    StructField('rating', FloatType()),
    StructField('timestamp', LongType())
])

movies_schema_20m = StructType([
    StructField('movieId', IntegerType()),
    StructField('title', StringType()),
    StructField('genres', StringType())
])

df_ratings_20m = (spark.read.format("csv")
    .option("header", "true")
    .schema(ratings_schema_20m)
    .load(ratings_20m_path))

df_movies_20m = (spark.read.format("csv")
    .option("header", "true")
    .schema(movies_schema_20m)
    .load(movies_20m_path))

display(df_ratings_20m)
display(df_movies_20m)

In [0]:
df_ratings_train, df_ratings_test = df_ratings_20m.randomSplit([0.8, 0.2], seed=42)
df_ratings_train.count(), df_ratings_test.count()

Test with various values of:
ranks,
regularization parameter,
number of iterations

In [0]:
evaluator = RegressionEvaluator(metricName="rmse",
                                labelCol="rating",
                                predictionCol="prediction")

ranks = [5, 10, 20, 40]
regParams = [0.01, 0.05, 0.1, 0.2]
iters = [5, 10, 15]

results = []

for rank in ranks:
    for reg in regParams:
        for numIter in iters:
            als = ALS(
                rank=rank,
                maxIter=numIter,
                regParam=reg,
                userCol="userId",
                itemCol="movieId",
                ratingCol="rating",
                coldStartStrategy="drop"
            )
            model = als.fit(df_ratings_train)
            preds = model.transform(df_ratings_test)
            rmse = evaluator.evaluate(preds)
            print(f"rank={rank}, reg={reg}, iter={numIter}, RMSE={rmse}")
            results.append((rank, reg, numIter, rmse))

results_df = spark.createDataFrame(results, ["rank", "regParam", "maxIter", "rmse"])
display(results_df.orderBy("rmse"))

Visualizations that show how the error depends on the tested values of the given parameters

In [0]:
best_reg = 0.1
best_iter = 10
rmse_vs_rank = (results_df
    .filter((col("regParam") == best_reg) & (col("maxIter") == best_iter))
    .orderBy("rank"))
display(rmse_vs_rank)  

Databricks visualization. Run in Databricks to view.

In [0]:
best_rank = 10    
best_iter = 10    

rmse_vs_reg = (results_df
    .filter((col("rank") == best_rank) & (col("maxIter") == best_iter))
    .orderBy("regParam"))

display(rmse_vs_reg)    

Databricks visualization. Run in Databricks to view.

In [0]:
best_rank = 10
best_reg = 0.1

rmse_vs_iter = (results_df
    .filter((col("rank") == best_rank) & (col("regParam") == best_reg))
    .orderBy("maxIter"))

display(rmse_vs_iter)   

Databricks visualization. Run in Databricks to view.

Find the best model based on the error value

In [0]:
best = results_df.orderBy("rmse").first()
best_rank = best["rank"]
best_reg  = best["regParam"]
best_iter = best["maxIter"]

als_best = ALS(
    rank=best_rank,
    maxIter=best_iter,
    regParam=best_reg,
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    coldStartStrategy="drop"
)
best_model = als_best.fit(df_ratings_train)

A table with at least 11 of my own ratings for the movies that I selected

In [0]:
my_user_id = 0  
df_ratings_20m.filter(col("userId") == my_user_id).show()

my_rated_movies_20m = [
    (my_user_id, 1, 5.0),
    (my_user_id, 39, 4.0),
    (my_user_id, 73, 5.0),
    (my_user_id, 293, 5.0),
    (my_user_id, 296, 5.0),
    (my_user_id, 356, 4.0),
    (my_user_id, 586, 4.0),
    (my_user_id, 588, 4.0),
    (my_user_id, 902, 5.0),
    (my_user_id, 920, 5.0),
    (my_user_id, 1029, 5.0)
]

df_custom_ratings_20m = (spark.createDataFrame(
    my_rated_movies_20m, ["userId", "movieId", "rating"]
).withColumn("timestamp", lit(0).cast("long")))

# Join ratings with movie titles 
df_my_ratings_with_titles = (df_custom_ratings_20m
    .join(df_movies_20m.select("movieId", "title", "genres"), on="movieId", how="left"))

display(df_my_ratings_with_titles.select("userId", "movieId", "title", "rating", "genres"))

The top 20 movies that the model recommends for me to watch

In [0]:
df_all_ratings_20m = df_ratings_train.union(df_custom_ratings_20m)

als_custom = ALS(
    rank=best_rank,
    maxIter=best_iter,
    regParam=best_reg,
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    coldStartStrategy="drop"
)
custom_model_20m = als_custom.fit(df_all_ratings_20m)

df_movies_unrated_20m = df_movies_20m.join(
    df_custom_ratings_20m.select("movieId"),
    on="movieId",
    how="left_anti"
)

df_for_prediction_20m = df_movies_unrated_20m.withColumn("userId", lit(my_user_id))
df_predictions_20m = custom_model_20m.transform(df_for_prediction_20m)

df_recommendations_20m = (df_predictions_20m
    .filter(col("prediction").isNotNull())
    .orderBy(desc("prediction")))

display(df_recommendations_20m.select("title", "genres", "prediction").limit(20))