# DS/CMPSC 410 Fall 2024
# Instructor: Professor John Yen
# TA: Jin Peng and Al Lawati, Ali Hussain Mohsin
# Lab 6: Movie Recommendations Using Alternative Least Square
## The goals of this lab are for you to be able to
### - Use Alternating Least Squares (ALS) for recommending movies based on reviews of users
### - Be able to understand the raionale for splitting data into training, validation, and testing.
### - Be able to use MLlib to implement an ALS-based movie recommendation system.
### - Be able to use RDD transformations to calculate training error, validation error, and prediction error of a model.
### - Be able to tune hyper-parameters of the ALS model using a small dataset (in local mode)
### - Be able to store the results of evaluating hyper-parameters using Pandas dataframe.
### - Be able to automatically identify the best hyper-parameters and evaluate the chosen model with testing data.
### - Be able to use CheckPoint to improve the efficiency of iterative processing using Spark.
### - Be able to debug (in local mode) using Restart Kernel if needed.
### - Note: This lab only requires running Spark in the local mode (using small review dataset). Lab 7 will require running ALS-based recommendation on large review dataset in the cluster mode.

## Exercises: 
- Exercise 1: 5 points
- Exercise 2A: 5 points
- Exercise 2B: 10 points
- Exercise 3: 10 points
- Exercise 4: 10 points
- Exercise 5: 10 points
- Exercise 6: 25 points
- Exercise 7: 15 points
- Exercise 8: 10 points
## Total Points: 100 points

# Due: midnight, Oct 7th, 2024


## The first thing we need to do in each Jupyter Notebook running pyspark is to import pyspark first.

In [1]:
import pyspark
import pandas as pd
import numpy as np
import math

### Once we import pyspark, we need to import "SparkContext".  Every spark program needs a SparkContext object
### In order to use Spark SQL on DataFrames, we also need to import SparkSession from PySpark.SQL
### In addition, we import ``ALS`` from ``MLlib.recommendation`` of ``pyspark.

In [2]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, LongType, IntegerType, FloatType
from pyspark.sql.functions import col, column
from pyspark.sql.functions import expr
from pyspark.sql.functions import split
from pyspark.sql import Row
from pyspark.mllib.recommendation import ALS

## We then create a Spark Session variable (rather than Spark Context) in order to use DataFrame. 
- Note: We temporarily use "local" as the parameter for master in this notebook so that we can complete and debug the code using Jupyter Server.  After we export it to .py file for execution in the cluster, however, we need to REMOVE .master("local") in the .py file so that it runs in cluster (Standalone) mode in ICDS cluster when we execute ``pbs-spark-submit``

In [3]:
ss=SparkSession.builder.master("local").appName("Lab6 ALS-based Recommendation Systems").getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/30 15:49:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/09/30 15:49:56 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [4]:
ss.sparkContext.setLogLevel("WARN")

In [5]:
ss.sparkContext.setCheckpointDir("~/scratch")

## Exercise 1 (5 points) (a) Add your name below AND (b) replace the path below with the path of your home directory.
## Answer for Exercise 1
- a: Your Name: Daniel Woodford

In [6]:
rating_schema = StructType([ StructField("UserID", IntegerType(), False ), \
                            StructField("MovieID", IntegerType(), True), \
                            StructField("Rating", FloatType(), True ), \
                            StructField("RatingID", IntegerType(), True ), \
                           ])

In [7]:
ratings_DF = ss.read.csv("ratings_samples.csv", schema=rating_schema, header=True, inferSchema=False)

In [8]:
ratings_DF.printSchema()

root
 |-- UserID: integer (nullable = true)
 |-- MovieID: integer (nullable = true)
 |-- Rating: float (nullable = true)
 |-- RatingID: integer (nullable = true)



In [9]:
ratings2_DF = ratings_DF.select("UserID","MovieID","Rating")

In [10]:
ratings2_DF.first()

                                                                                

Row(UserID=1, MovieID=31, Rating=2.5)

# Notice that the content of DataFrame is a ``Row`` object.  
## The ``Row`` object of a DataFrame enables us to access different column values of the rows by the name of their column, as we shall see later.

# Converting a PySpark DataFrame into an RDD
- Just use ``<DataFrame>.rdd``, which returns an RDD representation of the ``<DataFrame>``
- Notice also that the content of the RDD is a list of ``Row`` object.  As we will see later, you can access the contents of the RDD in three ways.


In [11]:
ratings2_RDD = ratings2_DF.rdd
ratings2_RDD.take(3)

                                                                                

[Row(UserID=1, MovieID=31, Rating=2.5),
 Row(UserID=1, MovieID=1029, Rating=3.0),
 Row(UserID=1, MovieID=1061, Rating=3.0)]

# 5.1 Split Data into Three Sets: Training Data, Evaluatiion Data, and Testing Data

# Exercise 2A (5 points)
## Complete the code below to split `ratings2_RDD` into three groups: 60% training, 20% validation, and 20% testing.
## ``randomSplit`` is an RDD method that takes a list of weights/percentages used to split the input RDD. The second parameter is a seed for random number generator (typically a prime number).

In [12]:
training_RDD, validation_RDD, test_RDD = ratings2_RDD.randomSplit([.6, .2, .2], 19)

# Recommendation Systems
The recommendation system in this lab generates a predicted "rating" for a given user and a given movie.  Therefore, the input of the system is a list of two items: a UserID and a Movie ID.
## Prepare input (UserID, MovieID) for training, validation and for testing data
Each entry of the ``ratings2_RDD`` is a ``Row`` object, containing three elements: UserID, MovieID, and Rating.
### We extract the first two elements of each Row in ``traing_RDD``, ``validation_RDD``, and ``test_RDD``to create the input for training, validation, and testing, respectively.
### We will use these RDD in evaluating the model later.  
### We will first construct an ALS recommedation model using training_RDD, which includes both input (UserID, MovieID) and desired prediction output (Rating).

In [13]:
training_input_RDD = training_RDD.map(lambda x: (x[0], x[1]) )
validation_input_RDD = validation_RDD.map(lambda x: (x[0], x[1]) ) 
testing_input_RDD = test_RDD.map(lambda x: (x[0], x[1]) )

# 5.2 Construct a Movie Recommendation Model 
## We use ``ALS`` from `PySpark.MLlib.recommendation` module and training data to train a recommendation model.
## Exercise 2B (10 points)
## Choose a rank between 3 and 6, a randon number for the seed, 30 iterations, and 0.1 regularization parameter.

In [14]:
model = ALS.train(training_RDD, 4, seed=17, iterations=30, lambda_=0.1)

                                                                                

# 5.2.1 Compute Training Error of the ALS recommendation model
### We use the training_input_RDD we prepared earlier to generate predicted ratings for ``(<userID>, <movieID>)`` pairs in the training dataset so that we can compare their predicted ratings with their actual ratings to calculate the error (for training data).

In [15]:
training_prediction_RDD = model.predictAll(training_input_RDD)

                                                                                

In [16]:
training_prediction_RDD.take(4)

                                                                                

[Rating(user=253, product=37739, rating=3.4122745865216526),
 Rating(user=402, product=69069, rating=3.904311960189694),
 Rating(user=213, product=44828, rating=0.4832854965056512),
 Rating(user=428, product=5618, rating=4.406297204322684)]

# Notice: The output of ALS has predefined column names: ``user``, ``product``, and ``rating``, which are different from the column names of the training data.
## Write down these column names.  You will need them in Exercise 3.

# Joining two RDDs on keys
Two key-value RDDs can be joined on their keys so that entries of the two RDDs with common keys are combined in the RDD generated by the join operation.
# Joining two RDDs on keys for Evaluating Recommendation Systems
To evaluate a recommendation system, we need to combine, for each ``(<user> <movie>)`` pair, its predicted rating and its actual rating.
This can be achieved in four steps:
- First, convert the Training_RDD (or Evaluation_RDD) from the format of ``(<user>, <movie>, <actual_rating>)`` into a key-value format: ``( (<user>, <movie>), <actual_rating> )``.  Notice that the key is now a list of two elements: ``<user>`` and ``<movie>``.
- Second, convert training_prediction_RDD also from the format of ``(<user>, <movie>, <predicted_rating>)`` to an identical key-value format: ``( (<user>, <movie>), <predicted_rating> )``
- Third, join the two RDDs on their keys, resulting in a new key-value RDD in the format of ``( (<user>, <movie> ), ( <actual_ratings>, <predicted_ratings> ) )``
- Fourth, calculate the errors between predicted ratings and actual ratings across all users and movies.

# Three Ways to Access Elements of a 'Row' object in an RDD (generated from Spark DataFrame)
Before we perform the three steps above, we are going to first learn three methods for accessing elements of a Row object in an RDD (generated from Spark DataFrame).  Mastering these methods will enable you to, not only implement the three steps above, but also be able to apply them to a wide range of situations where you need to transform the structure of RDDs (e.g., for integrating data, for evaluating models, etc).

For example, you will later use these methods to implment step 1 and step 2 above, and to calculate model prediction errors after step 3. 

In [17]:
training_RDD.take(3)

[Row(UserID=1, MovieID=31, Rating=2.5),
 Row(UserID=1, MovieID=1263, Rating=2.0),
 Row(UserID=1, MovieID=1343, Rating=2.0)]

## Method 1: Access elements of a row using column name of the DataFrame (from which the RDD came from) using the syntax ``<Row variable>[ <ColumnName> ]``

In [18]:
training_target_output_RDD = training_RDD.map(lambda x: ( (x['UserID'], x['MovieID']), x['Rating'] ) )

In [19]:
training_target_output_RDD.take(3)

[((1, 31), 2.5), ((1, 1263), 2.0), ((1, 1343), 2.0)]

### Notice the transformed RDD above has ``(<userID), <movieID>)`` as keys, and their ratings as values.

## Method 2: Access elements of a row using column name (that does not contain space) of the DataFrame schema (from which the RDD came from) using the syntax ``<Row variable>.<ColumnName>``

In [20]:
training_target_output2_RDD = training_RDD.map(lambda x: ( ( x.UserID, x.MovieID ), x.Rating ) )

In [21]:
training_target_output2_RDD.take(3)

[((1, 31), 2.5), ((1, 1263), 2.0), ((1, 1343), 2.0)]

### Notice the transformed RDD using this second method has identical content as that generated by the first method.

## Method 3: Access elements of a row using column name of the DataFrame (from which the RDD came from) using the syntax ``<row variable>[<index>]`` where ``<index>`` is the integer that indicates the position of the element in the row (starting with 0 for the first element).

In [22]:
training_target_output3_RDD = training_RDD.map(lambda x: ( (x[0], x[1]), x[2] ) )

In [23]:
training_target_output3_RDD.take(3)

[((1, 31), 2.5), ((1, 1263), 2.0), ((1, 1343), 2.0)]

### Is the result of the transformed RDD using this method similar to the two above?  
Yes, the result is the same
### What is the benefit of the first two methods?
In the first two methods it may be easier or more comprehensible to use the column name rather than the index of the column.

# Exercise 3 (10 points)
## Use method 2 (for accessing content of RDD) to complete the code below to transform the ALS model prediction of training data into the format of `( (<user> <product>), <rating> )` so that we can later join it with training target outpt RDD for computing Root Mean Square Error of predictions.
### Notice: The 

In [24]:
training_prediction2_RDD = training_prediction_RDD.map(lambda r: ( (r.user, r.product), r.rating ))

In [25]:
training_prediction2_RDD.take(3)

                                                                                

[((253, 37739), 3.4122745865216526),
 ((402, 69069), 3.904311960189694),
 ((213, 44828), 0.4832854965056512)]

In [26]:
training_evaluation_RDD = training_target_output_RDD.join(training_prediction2_RDD)

In [27]:
training_evaluation_RDD.take(3)

                                                                                

[((1, 31), (2.5, 2.5728871712524892)),
 ((1, 1263), (2.0, 2.2544346894707754)),
 ((1, 1343), (2.0, 2.1853108164581343))]

# Because the joined RDD is no longer a Row object, you can access its content using the third method above.

# For an RDD containing numbers, we can use the following actions to calculate their min, max, sum, and mean:
- ``<RDD>.min``: returns the minimum value in the RDD.
- ``<RDD>.max``: returns the maximal value in the RDD.
- ``<RDD>.sum``: returns the sum of all values in the RDD.
- ``<RDD>.mean``: returns the mean of all values in the RDD.

In [28]:
training_error = math.sqrt(training_evaluation_RDD.map(lambda z: (z[1][0] - z[1][1])**2).mean())

                                                                                

In [29]:
print(training_error)

0.6486012428396611


# 5.2.2 Compute Validation Errors
In a similar way, we can calculate the validation error.

In [30]:
validation_prediction_RDD = model.predictAll(validation_input_RDD).map(lambda x: ( (x.user, x.product), x.rating ) )

In [31]:
validation_prediction_RDD.take(3)

[((48, 44828), 0.40229039645299036),
 ((331, 5618), 4.3199181947106755),
 ((577, 5618), 4.926704876628129)]

# Exercise 4 (10 points)
## Complete the code below to transform `validation_RDD` into the same key value pair format of Exercise 3, then join it with `validation_prediction_RDD` to prepare for RMS error calculation.

In [32]:
validation_target_RDD = validation_RDD.map(lambda y: ( (y.UserID, y.MovieID), y.Rating ) )

In [33]:
validation_evaluation_RDD = validation_target_RDD.join(validation_prediction_RDD)

In [34]:
validation_evaluation_RDD.take(3)

                                                                                

[((1, 1129), (2.0, 2.0247077674322007)),
 ((1, 1287), (2.0, 2.241996889286977)),
 ((2, 52), (3.0, 3.5191739609321457))]

# Exercise 5 (10 points)
## Complete the code below to calculate RMS error for validation data.

In [35]:
validation_error_RDD = validation_evaluation_RDD.map(lambda z: (z[1][0] - z[1][1])**2)

In [36]:
validation_error_RDD.take(3)

[0.0006104737714837181, 0.05856249442457332, 0.2695416017099732]

In [37]:
validation_RMS_error = math.sqrt(validation_error_RDD.mean())

In [38]:
print(validation_RMS_error)

0.9393572722599206


## How is the validation error compared to the training error?
The validation error is much larger than training error
### Is this what you expected?
To an extent; validation error tends to be higher as it is new data rather than what the model trained on.

# 5.3 Hyperparameter Tuning
## Iterate through all possible combination of a set of values for three hyperparameters for ALS Recommendation Model:
- rank (k)
- regularization
- iterations 
## Each hyperparameter value combination is used to construct an ALS recommendation model using training data, but evaluate using Evaluation Data
## The evaluation results are saved in a Pandas DataFrame 
``
hyperparams_eval_df
``
## The best hyperprameter value combination is stored in 4 variables
``
best_k, best_regularization, best_iterations, and lowest_validation_error
``

# Exercise 7 (25 points) 
## Complete the code below (using answers to Exercise 3, 4, and 5) to iterate through a set of hyperparameters (rank k, regularization parameter, and number of iterations) to create and evaluate ALS recommendation models to find the best model among all those created.

In [44]:
## Initialize a Pandas DataFrame to store evaluation results of all combination of hyper-parameter settings
hyperparams_eval_df = pd.DataFrame( columns = ['k', 'regularization', 'iterations', 'validation RMS', 'testing RMS'] )
# initialize index to the hyperparam_eval_df to 0
index =0 
# initialize lowest_error
lowest_validation_error = float('inf')
# Set up the possible hyperparameter values to be evaluated
iterations_list = [15, 30]
regularization_list = [0.1, 0.2, 0.3]
rank_list = [4, 7, 10, 13]
for k in rank_list:
    for regularization in regularization_list:
        for iterations in iterations_list:
            seed = 37
            # Construct a recommendation model using a set of hyper-parameter values and training data
            model = ALS.train(training_RDD, rank=k, seed=seed, iterations=iterations, lambda_=regularization)
            # Evaluate the model using evalution data
            # map the output into ( (user, product), rating ) so that we can join with actual evaluation data
            # using (userID, movieID) as keys.
            validation_prediction_RDD= model.predictAll(validation_input_RDD).map(lambda x: (  (x.user, x.product), x.rating )    )
            validation_evaluation_RDD = validation_RDD.map(lambda y: ( (y.UserID, y.MovieID), y.Rating ) ).join(validation_prediction_RDD)
            # Calculate RMS error between the actual rating and predicted rating for (userID, movieID) pairs in validation dataset
            validation_error = math.sqrt(validation_evaluation_RDD.map(lambda z: (z[1][0] - z[1][1])**2).mean())
            # Save the error as a row in a pandas DataFrame
            hyperparams_eval_df.loc[index] = [k, regularization, iterations, validation_error, float('inf')]
            index = index + 1
            # Check whether the current error is the lowest
            if validation_error < lowest_validation_error:
                best_k = k
                best_regularization = regularization
                best_iterations = iterations
                best_index = index - 1
                lowest_validation_error = validation_error
print('The best rank k is ', best_k, ', regularization = ', best_regularization, ', iterations = ',\
      best_iterations, '. Validation Error =', lowest_validation_error)



The best rank k is  10 , regularization =  0.2 , iterations =  15 . Validation Error = 0.9191682688219361


                                                                                

# Use Testing Data to Evaluate the Model built using the Best Hyperparameters                

# 5.4 Evaluate the best hyperparameter combination using testing data
## If the error between rating prediction and actual rating for (userID, movie ID) pairs in the training data is comparable to the error of validation data, our model passes the test.

# Exercise 8 (15 points)
- (a) Complete the code below to evaluate the best hyperparameter combinations (saved in variables such as ``best_k`` in the previous iteration) using testing data. (10 point)
- (b) Does your model performance for testing data comparable to the performance using validation data?  Explain your answer in the Markdown cell below. (5 points)

In [48]:

model = ALS.train(training_RDD, best_k, seed= 19, iterations=best_iterations, lambda_=best_regularization)
testing_prediction_RDD=model.predictAll(testing_input_RDD).map(lambda x: ((x.user, x.product), x.rating) )
testing_evaluation_RDD= test_RDD.map(lambda x: ((x.UserID, x.MovieID), x.Rating)).join(testing_prediction_RDD)
testing_error = math.sqrt(testing_evaluation_RDD.map(lambda x: (x[1][0]-x[1][1])**2).mean())
print('The Testing Error for rank k =', best_k, ' regularization = ', best_regularization, ', iterations = ', \
      best_iterations, ' is : ', testing_error)



The Testing Error for rank k = 10  regularization =  0.2 , iterations =  15  is :  0.9217817274717887


                                                                                

# Answer to Exercise 8: 
- (b) Yes, the model performance is comparable with testing as compared to validation data. The errors are relatively similar, being .919 for validation and .922 for testing. 

In [49]:
print(best_index)

14


In [50]:
# Store the Testing RMS in the DataFrame
hyperparams_eval_df.loc[best_index]=[best_k, best_regularization, best_iterations, lowest_validation_error, testing_error]

In [51]:
schema3= StructType([ StructField("k", FloatType(), True), \
                      StructField("regularization", FloatType(), True ), \
                      StructField("iterations", FloatType(), True), \
                      StructField("Validation RMS", FloatType(), True), \
                      StructField("Testing RMS", FloatType(), True) \
                    ])

## Convert the pandas DataFrame that stores validation errors of all hyperparameters and the testing error for the best model to a Spark DataFrame, so that it can be written in the cluster mode.

In [52]:
HyperParams_RMS_DF = ss.createDataFrame(hyperparams_eval_df, schema3)

# Exercise 9 (10 points)
## Modify the output path so that your output results can be saved in a directory.

In [53]:
output_path = "Lab6ALSHyperParamsTuning9_22"
HyperParams_RMS_DF.write.option("header", True).csv(output_path)

In [54]:
ss.stop()