# Problem 3: Collaborative Filtering Implementation

# Spark ALS based colloborative filtering model

Get the dataset from s3 bucket

In [2]:
import os
dbfs_dir = 's3://netflixdata-yr/'
movies_filename = dbfs_dir + 'movie_titles.txt'
testing_filename = dbfs_dir + 'TestingRatings.txt'
training_filename = dbfs_dir + 'TrainingRatings.txt'

Define the schemas

In [3]:
from pyspark.sql.types import *

movies_df_schema = StructType(
  [StructField('movie_Id', IntegerType()),
   StructField('year', IntegerType()),
   StructField('title', StringType())]
)

testing_df_schema = StructType(
  [StructField('movie_Id', IntegerType()),
   StructField('userId', IntegerType()),
   StructField('rating', DoubleType())]
)

training_df_schema = StructType(
  [StructField('movie_Id', IntegerType()),
   StructField('userId', IntegerType()),
   StructField('rating', DoubleType())]
)

Load and cache the data 

In [4]:
from pyspark.sql.functions import regexp_extract
from pyspark.sql.types import *

movies_titles_df = sqlContext.read.options(header=False, inferSchema=False).schema(movies_df_schema).csv("s3://netflixdata-yr/movie_titles.txt")
testing_df = sqlContext.read.options(header=False, inferSchema=False).schema(testing_df_schema).csv("s3://netflixdata-yr/TestingRatings.txt")
training_df = sqlContext.read.options(header=False, inferSchema=False).schema(testing_df_schema).csv("s3://netflixdata-yr/TrainingRatings.txt")

movies_titles_df.cache()
testing_df.cache()
training_df.cache()

DataFrame[movie_Id: int, userId: int, rating: double]

 Merge movie titles dataframe with testing and training dataframe and drop the duplicate column movieID 

In [4]:
# testing_df = movies_titles_df.join(testing_ratings_df, movies_titles_df["movie_Id"]==testing_ratings_df["movieId"])
# testing_df = testing_df.drop("movieId")
# print(testing_df.show(3))

In [5]:
# training_df = movies_titles_df.join(training_ratings_df, movies_titles_df["movie_Id"]==training_ratings_df["movieId"])
# training_df = training_df.drop("movieId")
# print(training_df.show(3))

Creating temp view for analysis

In [7]:
from pyspark.sql import SQLContext
import pyspark
sqlContext = pyspark.SQLContext(sc)  
testing_df.createOrReplaceTempView('testing_df')
training_df.createOrReplaceTempView('training_df')

In [8]:
# Import the required functions
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [9]:
# Create ALS model
als = ALS(userCol="userId", itemCol="movie_Id", 
          ratingCol="rating", nonnegative = True, implicitPrefs = False, coldStartStrategy="drop")
type(als)

pyspark.ml.recommendation.ALS

In [10]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Adding hyperparameters values to param_grid
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [5,10,15]) \
            .addGrid(als.regParam, [.01, .05, 0.1]) \
            .build()
           
# Define evaluator as RMSE and print length of evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction") 
print ("Num models to be tested: ", len(param_grid))

Num models to be tested:  9


In [11]:
# Cross validation using CrossValidator
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)
print(cv)

CrossValidator_a3ead770949b


In [12]:
#Fit cross validator to the training dataset
model = cv.fit(training_df)

#Extract best model from the cv model above
best_model = model.bestModel

                                                                                

In [13]:
# Print best_model &  ALS model parameters
print(type(best_model))
print("**Best Model**")
print("  Rank:", best_model._java_obj.parent().getRank())
print("  MaxIter:", best_model._java_obj.parent().getMaxIter())
print("  RegParam:", best_model._java_obj.parent().getRegParam())

<class 'pyspark.ml.recommendation.ALSModel'>
**Best Model**
  Rank: 15
  MaxIter: 10
  RegParam: 0.05


In [43]:
# train_predictions = best_model.transform(training_df)
# RMSE_train = evaluator.evaluate(train_predictions)
# print("RMSE for training data is:", RMSE_train)

# mae_train = evaluator.evaluate(train_predictions, {evaluator.metricName: "mae"})
# print("MAE for training data is: %.3f" % mae_train)

                                                                                

RMSE for training data is: 0.767821286433335




MAE for training data is: 0.606


                                                                                

In [14]:
# Generate test set predictions and evaluate using RMSE & MAE
test_predictions = best_model.transform(testing_df)
RMSE = evaluator.evaluate(test_predictions)
print("RMSE for testing data is:", RMSE)

mae = evaluator.evaluate(test_predictions, {evaluator.metricName: "mae"})
print("MAE for testing data is:: %.3f" % mae)

                                                                                

RMSE for testing data is: 0.8371780508585032




MAE for testing data is:: 0.660


                                                                                

In [15]:
#view top 10 test prediction movies
test_predictions.show(10)

+--------+-------+------+----------+
|movie_Id| userId|rating|prediction|
+--------+-------+------+----------+
|      28|2358799|   3.0| 3.9407172|
|     156| 973051|   5.0| 4.0387187|
|     851|1189060|   3.0| 3.4920712|
|    1100|2376892|   2.0| 2.2084737|
|    1123|1628484|   3.0| 3.4529614|
|    1289|1552084|   3.0| 3.5745318|
|    1744|2376892|   5.0| 3.7657683|
|    1851| 675056|   4.0| 3.5719743|
|    1983|2376892|   4.0|  3.174658|
|    1983|2629660|   3.0| 2.8910725|
+--------+-------+------+----------+
only showing top 10 rows



In [18]:
#  Recommendations for all users and list top 10
reco = best_model.recommendForAllUsers(10)
reco.limit(10).show()



+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   481|[[6991, 5.7360873...|
|  2678|[[12125, 4.468851...|
|  3595|[[12293, 4.680737...|
|  6460|[[12232, 4.913296...|
|  7284|[[12544, 5.472037...|
|  7576|[[15557, 5.109460...|
|  9597|[[12952, 4.429915...|
| 15191|[[14283, 4.873109...|
| 15846|[[11284, 5.180236...|
| 20461|[[2939, 5.0058455...|
+------+--------------------+



                                                                                

In [19]:
from pyspark.sql.functions import UserDefinedFunction, explode, desc
from pyspark.sql.functions import col, avg, when, count
#Unclean Recommendation Output
reco1 = reco\
    .withColumn("rec_exp", explode("recommendations"))\
    .select('userId', col("rec_exp.movie_Id"), col("rec_exp.rating"))
reco1.limit(10).show()



+------+--------+---------+
|userId|movie_Id|   rating|
+------+--------+---------+
|   481|    6991|5.7360873|
|   481|   14648| 5.140416|
|   481|   10743| 5.124085|
|   481|   14361| 5.055481|
|   481|    7569| 5.051366|
|   481|   12952|5.0289598|
|   481|     634| 4.975964|
|   481|    4238|4.9639053|
|   481|   15567|  4.94801|
|   481|   10947| 4.940893|
+------+--------+---------+



                                                                                

In [21]:
#view the recommendation with movie titles
reco1.join(movies_titles_df, on='movie_Id').show(10)

                                                                                

+--------+------+---------+----+--------------------+
|movie_Id|userId|   rating|year|               title|
+--------+------+---------+----+--------------------+
|    6991|   481|5.7360873|2001|    A History of God|
|   14648|   481| 5.140416|2003|Finding Nemo (Ful...|
|   10743|   481| 5.124085|2001|Pearl Jam: Tourin...|
|   14361|   481| 5.055481|1999|SpongeBob SquareP...|
|    7569|   481| 5.051366|2004|Dead Like Me: Sea...|
|   12952|   481|5.0289598|2005|The God Who Wasn'...|
|     634|   481| 4.975964|1989|Christmas with Th...|
|    4238|   481|4.9639053|2000|           Inu-Yasha|
|   15567|   481|  4.94801|1987|Grateful Dead: Ti...|
|   10947|   481| 4.940893|2004|     The Incredibles|
+--------+------+---------+----+--------------------+
only showing top 10 rows



Predict recommendations for a single user

In [26]:
user_79 = training_df.filter('userId = 79').sort('rating', ascending=False)
user_79.join(movies_titles_df, on='movie_Id').show(10)

+--------+------+------+----+--------------------+
|movie_Id|userId|rating|year|               title|
+--------+------+------+----+--------------------+
|    2660|    79|   5.0|1989|When Harry Met Sally|
|    3538|    79|   5.0|1988|             Beaches|
|    3541|    79|   5.0|1981|History of the Wo...|
|    6971|    79|   5.0|1986|Ferris Bueller's ...|
|    8512|    79|   5.0|1999|The World Is Not ...|
|   13489|    79|   5.0|2000|              Attila|
|   13748|    79|   5.0|1996|The First Wives Club|
|   14185|    79|   5.0|1964|        Mary Poppins|
|    4569|    79|   5.0|1992|            3 Ninjas|
|    4640|    79|   5.0|1988|            Rain Man|
+--------+------+------+----+--------------------+
only showing top 10 rows



In [50]:
testing_df.filter('userId = 79').sort('rating', ascending=False).limit(10).show()

+--------+------+------+
|movie_Id|userId|rating|
+--------+------+------+
|   14648|    79|   5.0|
|    2913|    79|   4.0|
|   12497|    79|   4.0|
|    8163|    79|   3.0|
+--------+------+------+



In [31]:
#Recommending Movies with ALS for the user 79
single_user = testing_df.filter(testing_df['userId']==79).select(['movie_Id','rating','userId'])
single_user.join(movies_titles_df, on='movie_Id').show()

+--------+------+------+----+--------------------+
|movie_Id|rating|userId|year|               title|
+--------+------+------+----+--------------------+
|    2913|   4.0|    79|2004|   Finding Neverland|
|    8163|   3.0|    79|2004|        Two Brothers|
|   12497|   4.0|    79|2000|         Bring It On|
|   14648|   5.0|    79|2003|Finding Nemo (Ful...|
+--------+------+------+----+--------------------+



In [32]:
recomendations = model.transform(single_user)
reco_user_79 =recomendations.orderBy('prediction',ascending=False)
reco_user_79.join(movies_titles_df, on='movie_Id').show(10)

+--------+------+------+----------+----+--------------------+
|movie_Id|rating|userId|prediction|year|               title|
+--------+------+------+----------+----+--------------------+
|   14648|   5.0|    79| 4.4529114|2003|Finding Nemo (Ful...|
|    2913|   4.0|    79| 3.9445221|2004|   Finding Neverland|
|   12497|   4.0|    79| 3.5837138|2000|         Bring It On|
|    8163|   3.0|    79|  3.225082|2004|        Two Brothers|
+--------+------+------+----------+----+--------------------+



# Step 3: Does your approach work for your own preferences? 

# My Movie Ratings
Add self as a new user to the data set by creating a new unique user ID for self - here it is 11111
Selecting some movies that I have seen among those in the training set, and add my ratings for those and create a dataframe

In [33]:
from pyspark.sql import Row
my_user_id = 11111

my_rated_movies = [
     ( 2959, my_user_id, 3),
     ( 2571, my_user_id, 4),
     ( 1207, my_user_id,5),
     ( 296, my_user_id, 1),
     ( 2858, my_user_id, 5), 
     ( 1172, my_user_id, 5), 
     ( 593, my_user_id,1),
     ( 745, my_user_id,2), 
     ( 1198,my_user_id, 4),
     ( 6016, my_user_id, 1)    
]

my_ratings_df = sqlContext.createDataFrame(my_rated_movies, ['movie_Id', 'user_Id','rating'])
print ('My movie ratings:')
my_ratings_df.show(3)

My movie ratings:
+--------+-------+------+
|movie_Id|user_Id|rating|
+--------+-------+------+
|    2959|  11111|     3|
|    2571|  11111|     4|
|    1207|  11111|     5|
+--------+-------+------+
only showing top 3 rows



In [38]:
# Add My Movies to Training Dataset

In [34]:
from pyspark.sql import SparkSession

training_with_my_ratings_df = training_df.union(my_ratings_df)

user_data = (training_with_my_ratings_df[training_with_my_ratings_df.userId == '11111'])
user_data.show(3)


+--------+------+------+
|movie_Id|userId|rating|
+--------+------+------+
|    2959| 11111|   3.0|
|    2571| 11111|   4.0|
|    1207| 11111|   5.0|
+--------+------+------+
only showing top 3 rows



In [35]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [5, 10, 15]) \
            .addGrid(als.regParam, [.05, 0.1]) \
            .build()
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction") 
print ("Num models to be tested: ", len(param_grid))

Num models to be tested:  6


In [36]:
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)
print(cv)

CrossValidator_83b0618e5bf2


In [49]:
# Train a Model with My Ratings

In [37]:
#cross validate to the new training dataset with added data
model_my_data = cv.fit(training_with_my_ratings_df)

#Extract best model from the cv model above for the new added data 
best_model_data = model_my_data.bestModel

                                                                                

In [39]:
# Print best_model &  ALS model parameters for the new added data set
print(type(best_model))
print("**Best Model**")
print("  Rank:", best_model._java_obj.parent().getRank())
print("  MaxIter:", best_model._java_obj.parent().getMaxIter())
print("  RegParam:", best_model._java_obj.parent().getRegParam())

<class 'pyspark.ml.recommendation.ALSModel'>
**Best Model**
  Rank: 15
  MaxIter: 10
  RegParam: 0.05


In [60]:
# #evaluate the new added data training set
# train_predictions_data = best_model_data.transform(training_with_my_ratings_df)
# RMSE_train_data = evaluator.evaluate(train_predictions_data)
# print("RMSE for training data with added data is:", RMSE_train_data)

# mae_train_data = evaluator.evaluate(train_predictions_data, {evaluator.metricName: "mae"})
# print("MAE for training data with added data is: %.3f" % mae_train_data)

                                                                                

RMSE for training data with added data is: 0.7682813125220835




MAE for training data with added data is: 0.607


                                                                                

In [None]:
# Check RMSE for the New Model with my Ratings

In [40]:
#evaluate test set from the model built on new added training data
test_predictions_data = best_model_data.transform(testing_df)
RMSE_data = evaluator.evaluate(test_predictions_data)
print("RMSE for testing data with model trained on added data is:", RMSE_data)

mae_data = evaluator.evaluate(test_predictions_data, {evaluator.metricName: "mae"})
print("MAE for testing data with model trained on added data is:: %.3f" % mae_data)

                                                                                

RMSE for testing data with model trained on added data is: 0.8375366933337033




MAE for testing data with model trained on added data is:: 0.661


                                                                                

In [41]:
#list to 5 predictions with added data
test_predictions_data.show(5)

+--------+-------+------+----------+
|movie_Id| userId|rating|prediction|
+--------+-------+------+----------+
|      28|2358799|   3.0|  3.798907|
|     156| 973051|   5.0|  4.303177|
|     851|1189060|   3.0| 3.3992517|
|    1100|2376892|   2.0| 2.2637594|
|    1123|1628484|   3.0| 3.4634411|
+--------+-------+------+----------+
only showing top 5 rows



In [42]:
reco_data = best_model_data.recommendForAllUsers(10)
reco_data.limit(5).show()



+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|  3039|[[2858, 5.317329]...|
|  3694|[[1172, 6.69683],...|
|  4247|[[1172, 7.3059635...|
|  7601|[[1172, 6.402927]...|
|  8095|[[1207, 7.9479885...|
+------+--------------------+



                                                                                

In [43]:
from pyspark.sql.functions import UserDefinedFunction, explode, desc
from pyspark.sql.functions import col, avg, when, count

reco1_data = reco_data\
    .withColumn("rec_exp", explode("recommendations"))\
    .select('userId', col("rec_exp.movie_Id"), col("rec_exp.rating"))

reco1_data.limit(5).show()



+------+--------+---------+
|userId|movie_Id|   rating|
+------+--------+---------+
|  3039|    1172| 5.317329|
|  3039|    1207| 5.317329|
|  3039|    2858| 5.317329|
|  3039|    7016| 4.563325|
|  3039|    3151|4.4051466|
+------+--------+---------+



                                                                                

In [48]:
# recommendations for me 
reco1_data.join(movies_titles_df, on='movie_Id').filter('userId = 11111').sort(desc('rating')).show(5)



+--------+------+---------+----+--------------------+
|movie_Id|userId|   rating|year|               title|
+--------+------+---------+----+--------------------+
|    2858| 11111|4.9714956|2000|Bounce: Bonus Mat...|
|    1207| 11111|4.9714956|1962|Experiment in Terror|
|    1172| 11111|4.9714956|1998| Krippendorf's Tribe|
|    2571| 11111|3.9771967|2002|Woodrow Wilson: A...|
|    1198| 11111|3.9771967|1971|The Cat O'Nine Tails|
+--------+------+---------+----+--------------------+
only showing top 5 rows



                                                                                

In [50]:
# Movies for myself in the training data
single_user_data= (training_with_my_ratings_df[training_with_my_ratings_df.userId == '11111'])
single_user_data.join(movies_titles_df, on='movie_Id').filter('userId = 11111').sort(desc('rating')).show(5)

+--------+------+------+----+--------------------+
|movie_Id|userId|rating|year|               title|
+--------+------+------+----+--------------------+
|    2858| 11111|   5.0|2000|Bounce: Bonus Mat...|
|    1207| 11111|   5.0|1962|Experiment in Terror|
|    1172| 11111|   5.0|1998| Krippendorf's Tribe|
|    2571| 11111|   4.0|2002|Woodrow Wilson: A...|
|    1198| 11111|   4.0|1971|The Cat O'Nine Tails|
+--------+------+------+----+--------------------+
only showing top 5 rows



In [52]:
# predicted movies based on ALS for me
recomendations_data = model_my_data.transform(single_user_data)
recomendations_data.orderBy('prediction',ascending=False).show(5)

+--------+------+------+----------+
|movie_Id|userId|rating|prediction|
+--------+------+------+----------+
|    2858| 11111|   5.0| 4.9714956|
|    1207| 11111|   5.0| 4.9714956|
|    1172| 11111|   5.0| 4.9714956|
|    2571| 11111|   4.0| 3.9771967|
|    1198| 11111|   4.0| 3.9771967|
+--------+------+------+----------+
only showing top 5 rows



Top movies titles recommended to me by ALS model

In [62]:
# recomendations_data.join(movies_titles_df, on='movie_Id').show(5)
array = [2959,2571,1207,296, 2858, 1172, 593, 745, 1198, 6016]  
top_reco =reco1_data.join(movies_titles_df, on='movie_Id').filter('userId = 11111').sort(desc('rating'))
top_reco_yr =top_reco.filter(top_reco.movie_Id.isin(array) == False)
top_reco_yr.orderBy('rating',ascending=True).show()

                                                                                

+--------+------+---------+----+--------------------+
|movie_Id|userId|   rating|year|               title|
+--------+------+---------+----+--------------------+
|    3941| 11111|1.6965362|1975|The French Connec...|
|   13878| 11111|1.7531966|1975|Sanford and Son: ...|
|    5369| 11111| 1.787698|1972|Sanford and Son: ...|
+--------+------+---------+----+--------------------+



In [38]:
# Get the learning curve

In [76]:
import math
def get_training_curve(train,rank,regular_param, iterations):
    errors = []
    for num_iters in range(1,iterations):
        model = ALS(rank = rank, maxIter = num_iters, regParam = regular_param, userCol = 'userId', itemCol = 'movie_Id', nonnegative = True ).fit(train)
        predictions = model.transform(train).fillna(0)
        condition = [train.userId == predictions.userId,  train.movie_Id == predictions.movie_Id]
        error = predictions.join(train,condition).select(predictions.prediction,train.rating).rdd.map(lambda x: (x[0]-x[1])**2).mean()
        errors.append(math.sqrt(error))
        predictions.select(predictions.userId,predictions.movie_Id,predictions.prediction).withColumnRenamed("prediction","rating")
    return errors

In [77]:
#use the best model parameters
errors = get_training_curve(training_df,rank=15,regular_param=0.5,iterations=10)

                                                                                

In [78]:
#get trainings set min error: Unable to plot learing curve as matpotlib is not working on pyspark
print("minimum training error", min(errors))

minimum training error 1.0025921288190505


In [79]:
# test_predictions error
test_error = math.sqrt(test_predictions.rdd.map(lambda x: (x.rating-x.prediction)**2).mean())
test_error

                                                                                

0.8371780505884567