# Movie Recommendation System

## Import dependencies

In [0]:
from pyspark.mllib.recommendation import ALS
from pyspark.mllib.recommendation import Rating
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.rdd import RDD
from pyspark.sql import SparkSession

In [0]:
# create spark session
spark = SparkSession.builder.appName("Recommendation System").getOrCreate()

In [0]:
# create and view spark context
sc = spark.sparkContext
sc

## Load dataset

In [0]:
# read movie data
raw_data = sc.textFile("/FileStore/tables/u.data")

In [0]:
raw_data.take(5)

Out[5]: ['196\t242\t3\t881250949',
 '186\t302\t3\t891717742',
 '22\t377\t1\t878887116',
 '244\t51\t2\t880606923',
 '166\t346\t1\t886397596']

In [0]:
# read first three columns and turn them into Rating object.
raw_ratings = raw_data.map(lambda line: line.split("\t")[:3])
rating_rdd = raw_ratings.map(lambda raw_rating: Rating(raw_rating[0], raw_rating[1], raw_rating[2]))

In [0]:
rating_rdd.take(5)

Out[7]: [Rating(user=196, product=242, rating=3.0),
 Rating(user=186, product=302, rating=3.0),
 Rating(user=22, product=377, rating=1.0),
 Rating(user=244, product=51, rating=2.0),
 Rating(user=166, product=346, rating=1.0)]

In [0]:
# look at unique users
n_users = rating_rdd.map(lambda entry: entry.user).distinct().count()
n_users

Out[8]: 943

## Split dataset

In [0]:
# split rdd into train and test
train_rdd, test_rdd = rating_rdd.randomSplit(weights = [0.8, 0.2], seed = 101)

## Train the model

In [0]:
# train model
model = ALS.train(train_rdd, rank = 2, iterations = 10, lambda_ = 0.01, seed = 101)

## Predictions

In [0]:
# predict ratings for all user-product combinations in test set
predictions = model.predictAll(test_rdd.map(lambda entry: (entry.user, entry.product)))

In [0]:
# convert test RDD and predicted ratings RDD to dataframe
test_df = test_rdd.map(lambda entry: (entry.user, entry.product, float(entry.rating))).toDF(["user", "product", "rating"])
predictions_df = predictions.toDF(["user", "product", "prediction"])

In [0]:
# join test_df with predictions_df on user and product columns
test_df_with_predictions = test_df.join(predictions_df, ["user", "product"])
# show the first 5 rows of the resulting dataframe
test_df_with_predictions.show(5)

+----+-------+------+------------------+
|user|product|rating|        prediction|
+----+-------+------+------------------+
| 159|     24|   5.0|3.0299217286123508|
| 648|    288|   4.0| 3.082811990556877|
|  56|     96|   5.0| 4.318138371808146|
| 711|    736|   5.0| 4.152057894839537|
| 401|    144|   5.0|3.2610037193199233|
+----+-------+------+------------------+
only showing top 5 rows



## Measure the errors

In [0]:
predictions_and_observations = test_df_with_predictions.select("prediction", "rating").rdd
metrics = RegressionMetrics(predictions_and_observations)
metrics.rootMeanSquaredError, metrics.meanSquaredError

Out[14]: (0.9483490002018126, 0.8993658261837776)

## Tuning the hyperparameters

In [0]:
def compute_rmse(model, data):
    """
    Compute RMSE (Root Mean Squared Error).
    """
    predictions = model.predictAll(data.map(lambda entry: (entry.user, entry.product))) 
    test_df = test_rdd.map(lambda entry: (entry.user, entry.product, float(entry.rating))).toDF(["user", "product", "rating"])
    predictions_df = predictions.toDF(["user", "product", "prediction"])
    test_df_with_predictions = test_df.join(predictions_df, ["user", "product"])
    predictions_and_observations = test_df_with_predictions.select("prediction", "rating").rdd
    metrics = RegressionMetrics(predictions_and_observations)
    return metrics.rootMeanSquaredError

In [0]:
ranks = range(2, 10, 2)
lambdas = [0.1, 0.01, 0.001]
num_iters = [5, 10, 20]
best_model = None
best_rmse = float("inf")
best_rank = 0
best_lambda = -1.0
best_num_iter = -1

In [0]:
# for rank, lmbda, num_iter in zip(ranks, lambdas, numIters):
for rank in ranks:
    for lmbda in lambdas:
        for num_iter in num_iters:
            model = ALS.train(train_rdd, rank, num_iter, lmbda)
            rmse = compute_rmse(model, test_rdd)
            print(f"RMSE = {rmse} for the model trained with rank = {rank}, lambda = {lmbda}, and num_iter = {num_iter}")
            if (rmse < best_rmse):
                best_model = model
                best_rmse = rmse
                best_rank = rank
                best_lambda = lmbda
                best_num_iter = num_iter

RMSE = 0.9362641350733714 for the model trained with rank = 2, lambda = 0.1, and num_iter = 5
RMSE = 0.929223873003072 for the model trained with rank = 2, lambda = 0.1, and num_iter = 10
RMSE = 0.9253272466140455 for the model trained with rank = 2, lambda = 0.1, and num_iter = 20
RMSE = 0.9570409191434277 for the model trained with rank = 2, lambda = 0.01, and num_iter = 5
RMSE = 0.9376240330145003 for the model trained with rank = 2, lambda = 0.01, and num_iter = 10
RMSE = 0.9362910512557954 for the model trained with rank = 2, lambda = 0.01, and num_iter = 20
RMSE = 0.9568129212651045 for the model trained with rank = 2, lambda = 0.001, and num_iter = 5
RMSE = 0.950866996389954 for the model trained with rank = 2, lambda = 0.001, and num_iter = 10
RMSE = 0.9742261882151261 for the model trained with rank = 2, lambda = 0.001, and num_iter = 20
RMSE = 0.9315455144390536 for the model trained with rank = 4, lambda = 0.1, and num_iter = 5
RMSE = 0.918419603448835 for the model trained 

In [0]:
best_rmse, best_rank, best_lambda, best_num_iter

Out[18]: (0.9153671533355986, 4, 0.1, 20)