Author: Gabriella Sussman

Due Date: 11/28/2021 Sunday 6:00pm 

# Option 1: Recommender System

MovieLens is a dataset that is collected by the GroupLens Research Project at the University of
Minnesota and made available rating data sets from the MovieLens web site. Download and
unzip the MovieLens 100K Dataset (ml-100k.zip).


u.data is the dataset for this assignment

The full dataset contains 100,000 ratings by 943 users on 1682 items. Each user has rated at least 20 movies. Users and items are numbered consecutively from 1. The data is randomly ordered. The format is:

user_id <tab> item_id <tab> rating <tab> timestamp.

The time stamps are unix seconds since 1/1/1970 UTC


##Spark Setup

In [1]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
# Change directory
%cd /content/drive/My Drive/IDS561_HW4

/content/drive/My Drive/IDS561_HW4


In [3]:
# Download Java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [4]:
# Get Spark installer (check the path on spark.apache.org)
#!wget -v https://dlcdn.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz

In [5]:
# Untar the Spark installer
!tar -xvf spark-3.1.2-bin-hadoop3.2.tgz

spark-3.1.2-bin-hadoop3.2/
spark-3.1.2-bin-hadoop3.2/R/
spark-3.1.2-bin-hadoop3.2/R/lib/
spark-3.1.2-bin-hadoop3.2/R/lib/sparkr.zip
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/worker/
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/worker/worker.R
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/worker/daemon.R
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/tests/
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/tests/testthat/
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/tests/testthat/test_basic.R
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/profile/
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/profile/shell.R
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/profile/general.R
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/doc/
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/doc/sparkr-vignettes.html
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/doc/sparkr-vignettes.Rmd
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/doc/sparkr-vignettes.R
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/doc/index.html
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/R/
spark-3.1.2-

In [6]:
# Install findspark - a python library to find Spark
!pip install -q findspark

In [7]:
# Set environment variables - set Java and Spark home based on the location where they are stored
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/drive/My Drive/IDS561_HW4/spark-3.1.2-bin-hadoop3.2"

# Create a local Spark session
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
spark = SparkSession.builder.master("local[*]").getOrCreate()

# load packages
import numpy as np
from pyspark import SparkContext

# Import functions
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import explode, col

sc = SparkContext.getOrCreate()

##1. Import the MovieLens dataset.


In [8]:
# Read CSV files and convert to Spark DF
raw_ratings_DF = spark.read.options(inferSchema='True',delimiter='\t').csv("/content/drive/My Drive/IDS561_HW4/u.data", header=False)

raw_ratings_DF.show()

+---+----+---+---------+
|_c0| _c1|_c2|      _c3|
+---+----+---+---------+
|196| 242|  3|881250949|
|186| 302|  3|891717742|
| 22| 377|  1|878887116|
|244|  51|  2|880606923|
|166| 346|  1|886397596|
|298| 474|  4|884182806|
|115| 265|  2|881171488|
|253| 465|  5|891628467|
|305| 451|  3|886324817|
|  6|  86|  3|883603013|
| 62| 257|  2|879372434|
|286|1014|  5|879781125|
|200| 222|  5|876042340|
|210|  40|  3|891035994|
|224|  29|  3|888104457|
|303| 785|  3|879485318|
|122| 387|  5|879270459|
|194| 274|  2|879539794|
|291|1042|  4|874834944|
|234|1184|  2|892079237|
+---+----+---+---------+
only showing top 20 rows



In [9]:
# Rename column headers
raw_ratings_DF = raw_ratings_DF.withColumnRenamed("_c0", "user_id")
raw_ratings_DF = raw_ratings_DF.withColumnRenamed("_c1", "item_id")
raw_ratings_DF = raw_ratings_DF.withColumnRenamed("_c2", "rating")
raw_ratings_DF = raw_ratings_DF.withColumnRenamed("_c3", "timestamp")

# Convert column values to int
raw_ratings_DF = raw_ratings_DF.withColumn("user_id", raw_ratings_DF["user_id"].cast("int"))
raw_ratings_DF = raw_ratings_DF.withColumn("item_id", raw_ratings_DF["item_id"].cast("int"))
raw_ratings_DF = raw_ratings_DF.withColumn("rating", raw_ratings_DF["rating"].cast("int"))
raw_ratings_DF = raw_ratings_DF.withColumn("timestamp", raw_ratings_DF["timestamp"].cast("int"))

# Drop timestamp column as it is not needed
ratingsDF = raw_ratings_DF.drop("timestamp")

ratingsDF.show()

+-------+-------+------+
|user_id|item_id|rating|
+-------+-------+------+
|    196|    242|     3|
|    186|    302|     3|
|     22|    377|     1|
|    244|     51|     2|
|    166|    346|     1|
|    298|    474|     4|
|    115|    265|     2|
|    253|    465|     5|
|    305|    451|     3|
|      6|     86|     3|
|     62|    257|     2|
|    286|   1014|     5|
|    200|    222|     5|
|    210|     40|     3|
|    224|     29|     3|
|    303|    785|     3|
|    122|    387|     5|
|    194|    274|     2|
|    291|   1042|     4|
|    234|   1184|     2|
+-------+-------+------+
only showing top 20 rows



In [10]:
# Split the data into training and testing data
(train_data, test_data) = ratingsDF.randomSplit([0.8, 0.2], seed=13)

In [11]:
# Store training and testing data in cache 
train_data.cache()
test_data.cache()

DataFrame[user_id: int, item_id: int, rating: int]

##2.	Build a recommendation model using Alternating Least Squares.

In [12]:
# Build the recommendation model using ALS on the training data
als_original = ALS(userCol="user_id",
                   itemCol="item_id",
                   ratingCol="rating",
                   nonnegative=True,
                   implicitPrefs = False)

# Fit als to training data
als_original_model = als_original.fit(train_data)

# Get predictions for the model
als_original_predictions = als_original_model.transform(test_data)

In [13]:
# Set the settings for the evaluator
evaluator = RegressionEvaluator(metricName="mse",
                                labelCol="rating",
                                predictionCol="prediction")

In [14]:
# Evaluate the model by computing the MSE on the testing data
als_original_mse = evaluator.evaluate(als_original_predictions)
print("MSE for Original ALS Model (without setting the coldStartStrategy) = " + str(als_original_mse))

MSE for Original ALS Model (without setting the coldStartStrategy) = nan


Since there are missing values, we need to set the coldStartStrategy parameter to “drop” in order to drop any rows in the DataFrame of predictions that contain NaN values to prevent getting a resulting MSE that is nan

In [15]:
# Build the recommendation model using ALS on the training data
# Set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(userCol="user_id",
          itemCol="item_id",
          ratingCol="rating",
          nonnegative=True,
          implicitPrefs = False,
          coldStartStrategy="drop")

# Fit als to training data
als_model = als.fit(train_data)

# Get predictions for the model
als_predictions = als_model.transform(test_data)

##3.	Report the original performance (Mean Squared Error)

In [16]:
# Evaluate the model by computing the MSE on the testing data
als_mse = evaluator.evaluate(als_predictions)
print("MSE for Original ALS Model = " + str(als_mse))

MSE for Original ALS Model = 0.844817767274214


We get an MSE as shown above, and since we set our coldStartStrategy parameter to "drop", it drops all the nan values in the dataset, allowing us to calculate the MSE without ending up with a nan result

##4.	Try to improve the performance of the original model using 10-fold cross validation and solve the cold-start problem.

In [17]:
# Add hyperparameters and their respective values using ParamGridBuilder
param_grid = ParamGridBuilder().addGrid(als.regParam, [0.1]).build()

# Build cross validation using CrossValidator
cv = CrossValidator(estimator=als,
                    estimatorParamMaps=param_grid,
                    evaluator=evaluator,
                    numFolds=10,
                    seed=13)

#Fit cross validator to the 'train' dataset
cv_model = cv.fit(train_data)

# View the predictions
cv_test_predictions = cv_model.transform(test_data)

cv_MSE = evaluator.evaluate(cv_test_predictions)

print("MSE for CV Model = " + str(cv_MSE))

MSE for CV Model = 0.844817767274214


Using 10-fold cross validation does not improve the model, so we will have to try to optimize the parameters, such as rank or regParam, to try to improve the performance of the model

##5.	Optimize the model based on step 4 and report the improvement of performance.

In [17]:
# Add hyperparameters and their respective values using ParamGridBuilder. We will try all 9 combinations of values and determine best model using the evaluator
param_grid_optimized = ParamGridBuilder() \
            .addGrid(als.rank, [10, 50, 100]) \
            .addGrid(als.regParam, [0.05, 0.1, 0.15]) \
            .build()

# Build cross validation using CrossValidator
cv_optimized = CrossValidator(estimator=als,
                              estimatorParamMaps=param_grid_optimized,
                              evaluator=evaluator,
                              numFolds=10,
                              seed=13)

# Fit cross validator to the training data
model_optimized = cv_optimized.fit(train_data)

# Extract best model from the cv model above
best_model = model_optimized.bestModel

# Evaluate the model using the testing data
test_predictions_optimized = best_model.transform(test_data)

MSE_optimized = evaluator.evaluate(test_predictions_optimized)

print("MSE for Optimized CV Model = " + str(MSE_optimized))

print("Best Model:")

# Get the parameters for the best model
print("Rank:", best_model._java_obj.parent().getRank())
print("RegParam:", best_model._java_obj.parent().getRegParam())

MSE for Optimized CV Model = 0.838989834844714
Best Model:
Rank: 100
RegParam: 0.1


In [18]:
# Add hyperparameters and their respective values using ParamGridBuilder. We will try all 3 rank values with regParam = 0.1 to determine which rank value is better, and then we will determine best model using the evaluator
param_grid_optimized2 = ParamGridBuilder() \
            .addGrid(als.rank, [100,150,250]) \
            .addGrid(als.regParam, [0.1]) \
            .build()

# Build cross validation using CrossValidator
cv_optimized2 = CrossValidator(estimator=als,
                               estimatorParamMaps=param_grid_optimized2,
                               evaluator=evaluator,
                               numFolds=10,
                               seed=13)

# Fit cross validator to the training data
model_optimized2 = cv_optimized2.fit(train_data)

# Extract best model from the cv model above
best_model2 = model_optimized2.bestModel

# Evaluate the model using the testing data
test_predictions_optimized2 = best_model2.transform(test_data)

MSE_optimized2 = evaluator.evaluate(test_predictions_optimized2)

print("MSE for Optimized CV Model - Second Round = " + str(MSE_optimized2))

print("Best Model - Second Round:")

# Get the parameters for the best model
print("Rank:", best_model2._java_obj.parent().getRank())
print("RegParam:", best_model2._java_obj.parent().getRegParam())

MSE for Optimized CV Model - Second Round = 0.8354403750646043
Best Model - Second Round:
Rank: 150
RegParam: 0.1


As you can see, using a a rank value of 250 compared to 150 improves the model's MSE slightly. Our resulting best model shows an MSE of 0.8360939303567281 compared to the original ALS model without Cross Validation, which has an MSE of 0.8509505917560914. So overall the model is improved slightly

##6.	Output top 10 movies for all the users with the following format:
userID<\tab>itemID1,itemID2,itemID3 ...,itemID10

In [19]:
# Generate top 10 movie recommendations using the recommendForAllUsers(n) function in als that takes n recommedations for each user, using our best model. 
recommendations_df = best_model2.recommendForAllUsers(10)

recommendations_df.show()

+-------+--------------------+
|user_id|     recommendations|
+-------+--------------------+
|    471|[{1159, 4.5934134...|
|    463|[{887, 4.2665234}...|
|    833|[{1187, 4.397768}...|
|    496|[{320, 4.3859844}...|
|    148|[{408, 4.9318495}...|
|    540|[{169, 4.759341},...|
|    392|[{483, 5.0419016}...|
|    243|[{1449, 4.45107},...|
|    623|[{50, 4.4439306},...|
|    737|[{127, 4.577489},...|
|    897|[{50, 4.6613936},...|
|    858|[{9, 4.5225873}, ...|
|     31|[{1022, 4.632285}...|
|    516|[{169, 4.7193413}...|
|    580|[{50, 4.593873}, ...|
|    251|[{1449, 4.6142626...|
|    451|[{313, 4.223077},...|
|     85|[{483, 4.2958164}...|
|    137|[{1019, 5.341129}...|
|    808|[{127, 5.1286597}...|
+-------+--------------------+
only showing top 20 rows



In [20]:
# Convert  DataFrame to rdd
recommendations_rdd = recommendations_df.rdd

print(recommendations_rdd.collect())

[Row(user_id=471, recommendations=[Row(item_id=1159, rating=4.593413352966309), Row(item_id=8, rating=4.415029525756836), Row(item_id=932, rating=4.3966264724731445), Row(item_id=477, rating=4.298967361450195), Row(item_id=82, rating=4.144421100616455), Row(item_id=465, rating=4.1242218017578125), Row(item_id=720, rating=4.02098274230957), Row(item_id=225, rating=4.01944637298584), Row(item_id=699, rating=4.003026485443115), Row(item_id=1167, rating=3.9358370304107666)]), Row(user_id=463, recommendations=[Row(item_id=887, rating=4.266523361206055), Row(item_id=19, rating=4.224681854248047), Row(item_id=116, rating=4.09044075012207), Row(item_id=253, rating=4.062330722808838), Row(item_id=221, rating=4.033151626586914), Row(item_id=124, rating=4.026793956756592), Row(item_id=20, rating=4.002568244934082), Row(item_id=1449, rating=3.991797685623169), Row(item_id=285, rating=3.936876058578491), Row(item_id=302, rating=3.9311270713806152)]), Row(user_id=833, recommendations=[Row(item_id=11

In [21]:
# Use explode to get single rows with user_id and rating in separate columns
top10_recommendations_df = recommendations_df\
    .withColumn("rec_exp", explode("recommendations"))\
    .select('user_id', col("rec_exp.item_id"), col("rec_exp.rating"))

# Show the top 10 movie recommendations for all users
top10_recommendations_df.show()

+-------+-------+---------+
|user_id|item_id|   rating|
+-------+-------+---------+
|    471|   1159|4.5934134|
|    471|      8|4.4150295|
|    471|    932|4.3966265|
|    471|    477|4.2989674|
|    471|     82| 4.144421|
|    471|    465| 4.124222|
|    471|    720|4.0209827|
|    471|    225|4.0194464|
|    471|    699|4.0030265|
|    471|   1167| 3.935837|
|    463|    887|4.2665234|
|    463|     19| 4.224682|
|    463|    116|4.0904408|
|    463|    253|4.0623307|
|    463|    221|4.0331516|
|    463|    124| 4.026794|
|    463|     20|4.0025682|
|    463|   1449|3.9917977|
|    463|    285| 3.936876|
|    463|    302| 3.931127|
+-------+-------+---------+
only showing top 20 rows



In [22]:
# Drop "rating" column
top10_recommendations_no_rating_df = top10_recommendations_df.drop("rating")

top10_recommendations_no_rating_df.show()

+-------+-------+
|user_id|item_id|
+-------+-------+
|    471|   1159|
|    471|      8|
|    471|    932|
|    471|    477|
|    471|     82|
|    471|    465|
|    471|    720|
|    471|    225|
|    471|    699|
|    471|   1167|
|    463|    887|
|    463|     19|
|    463|    116|
|    463|    253|
|    463|    221|
|    463|    124|
|    463|     20|
|    463|   1449|
|    463|    285|
|    463|    302|
+-------+-------+
only showing top 20 rows



In [23]:
# Convert to RDD
top10_recommendations_rdd = top10_recommendations_no_rating_df.rdd

top10_recommendations_rdd.take(5)

[Row(user_id=471, item_id=1159),
 Row(user_id=471, item_id=8),
 Row(user_id=471, item_id=932),
 Row(user_id=471, item_id=477),
 Row(user_id=471, item_id=82)]

In [24]:
# Map the RDD and group by user_id and output top 10 item_id as list in a second column
top10_recommendations_grouped_list = top10_recommendations_rdd.map(lambda x: (x[0], x[1])).groupByKey().mapValues(list).collect()

top10_recommendations_grouped_list[:5]

[(800, [1449, 64, 313, 318, 169, 223, 98, 963, 12, 79]),
 (400, [313, 127, 318, 64, 258, 169, 114, 272, 50, 98]),
 (200, [50, 1643, 1169, 174, 318, 64, 98, 172, 1449, 603]),
 (600, [127, 1449, 98, 185, 64, 515, 318, 187, 100, 357]),
 (601, [179, 408, 919, 169, 1084, 168, 474, 114, 173, 1449])]

Output top 10 movies for users in this format:
userID<\tab>itemID1,itemID2,itemID3 ...,itemID10

In [32]:
# Create txt file and output top 10 movies for users
with open("HW4_Output.txt", "w") as outfile:
    outfile.write("\n".join(str(row[0]) +'\t' + str(row[1]).replace(" ", "").lstrip("[").rstrip("]") for row in top10_recommendations_grouped_list))

In [33]:
# Open and print file contents of HW4_Output.txt
f = open('HW4_Output.txt', 'r')
OutputTXT = f.read()
print (OutputTXT)
f.close()

800	1449,64,313,318,169,223,98,963,12,79
400	313,127,318,64,258,169,114,272,50,98
200	50,1643,1169,174,318,64,98,172,1449,603
600	127,1449,98,185,64,515,318,187,100,357
601	179,408,919,169,1084,168,474,114,173,1449
1	169,1449,119,50,408,173,253,12,114,1467
801	313,272,318,64,22,963,316,79,12,333
201	1174,1070,1131,1643,93,56,1642,357,320,340
401	1643,196,162,318,735,427,1449,178,79,371
602	50,64,1169,174,169,172,313,22,408,963
402	1449,318,483,127,169,64,1643,408,515,511
202	604,516,96,963,258,178,1286,174,1472,451
2	127,1449,963,100,302,251,64,285,1512,197
802	183,195,313,1019,1169,326,96,1218,79,176
603	114,1483,313,189,50,169,921,12,1240,1169
3	1643,340,347,48,902,321,172,262,316,318
403	963,272,64,313,22,318,50,1269,223,735
203	50,83,1137,313,172,12,1169,1007,1084,432
803	285,1449,100,1512,408,512,1642,272,515,56
804	169,50,408,114,174,64,1019,1269,1169,172
604	670,48,262,567,718,1642,100,173,50,960
4	189,320,169,1643,114,318,922,1642,64,30
404	22,1169,313,1019,64,87,1269,12,966,17