### Download dataset
<b>Dataset location: </b>http://files.grouplens.org/datasets/movielens/ml-latest-small.zip

In [28]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName('Use Collaborative Filtering for movie recommendations') \
    .getOrCreate()

rawData = spark.read\
            .format('csv')\
            .option('header', 'true')\
            .load('../datasets/movielens/ratings.csv')

In [34]:
rawData.toPandas().head()

Unnamed: 0,userId,movieId,rating,timestamp
0,1,31,2.5,1260759144
1,1,1029,3.0,1260759179
2,1,1061,3.0,1260759182
3,1,1129,2.0,1260759185
4,1,1172,4.0,1260759205


#### Pick all columns except the timestamp (won't be used in the recommendation model)

In [35]:
from pyspark.sql.functions import col

dataset = rawData.select(col('userId').cast('int'), 
                         col('movieId').cast('int'), 
                         col('rating').cast('float')
                        )

dataset.toPandas().head()

Unnamed: 0,userId,movieId,rating
0,1,31,2.5
1,1,1029,3.0
2,1,1061,3.0
3,1,1129,2.0
4,1,1172,4.0


#### Check the distribution of rating in the dataset

In [36]:
dataset.select('rating').toPandas().describe()

Unnamed: 0,rating
count,100004.0
mean,3.543608
std,1.058048
min,0.5
25%,3.0
50%,4.0
75%,4.0
max,5.0


#### Split into training and test data sets

In [37]:
(trainingData, testData) = dataset.randomSplit([0.8, 0.2])

### Define the Collaborative Filtering model
Uses the Alternating Least Squares algorithm to learn the latent factors
* <b>maxIter: </b>The maximum number of iterations to run
* <b>regParam: </b>Specifies the regularization parameter in ALS (defaults to 1.0)
* <b>coldStartStrategy: </b> Strategy for handling unknown or new users/items during prediction (which was not encountered in training). Options are 'drop' and 'nan'. We will drop unknown users/items from the predictions

In [38]:
from pyspark.ml.recommendation import ALS

als = ALS(maxIter=5, 
          regParam=0.1, 
          userCol='userId', 
          itemCol='movieId', 
          ratingCol='rating',
          coldStartStrategy='drop')

#### Build the ALSModel using the model definition and training data

In [39]:
model = als.fit(trainingData)

#### Get the predictions for the test data

In [40]:
predictions = model.transform(testData)
predictions.toPandas().head()

Unnamed: 0,userId,movieId,rating,prediction
0,440,471,3.0,3.351176
1,306,471,3.0,3.5143
2,15,471,3.0,2.631747
3,358,471,5.0,3.940906
4,659,471,4.0,3.343483


#### Compare the distribution of values for ratings and predictions

In [41]:
predictions.select('rating', 'prediction').toPandas().describe()

Unnamed: 0,rating,prediction
count,19084.0,19084.0
mean,3.564635,3.394229
std,1.049842,0.746356
min,0.5,-0.550637
25%,3.0,2.957347
50%,4.0,3.473495
75%,4.0,3.913187
max,5.0,5.703673


Here maximum prediction rating has gone to 5.7, which is above max rating (which is 5). This is because we haven't capped the predicted ratings at 5.

### Get the Root Mean Square Error on the test data. 

**We can use RMSE becuase we have used explicit ratings here for modeling**

In [42]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(metricName='rmse', 
                                labelCol='rating',
                                predictionCol='prediction')

rmse = evaluator.evaluate(predictions)
print('RMSE = ', rmse)

RMSE =  0.9128278539080936


#### The ALS model can be used to get predictions for all users
Specify the number of predictions you would like for each user

In [43]:
# Getting top 3 movie recommendation for all users
userRecsAll = model.recommendForAllUsers(3)
userRecsAll

DataFrame[userId: int, recommendations: array<struct<movieId:int,rating:float>>]

#### View the recommendations
For each userId there is a list of tuples representing a movieId and it's rating for the user

In [44]:
# recommendation contains a tupule
# (movie_id, rating)
userRecsAll.toPandas().head()

Unnamed: 0,userId,recommendations
0,471,"[(65037, 4.660741329193115), (8132, 4.65989589..."
1,463,"[(67504, 4.828551292419434), (83411, 4.8285512..."
2,496,"[(1180, 6.262843132019043), (8535, 5.351010322..."
3,148,"[(67504, 5.351772308349609), (83411, 5.3517723..."
4,540,"[(1180, 5.591663360595703), (1162, 5.553853511..."


#### Get the top user recommendations for each movie
* The users who are most likely to like a particular movie
* Get the top 3 users

In [45]:
# Reverse of above
# users most likely to like a movie
movieRecsAll = model.recommendForAllItems(3)
movieRecsAll.toPandas().head()

Unnamed: 0,movieId,recommendations
0,1580,"[(46, 5.074670791625977), (543, 4.822387695312..."
1,5300,"[(357, 4.206568241119385), (545, 3.86646676063..."
2,6620,"[(156, 4.609171390533447), (450, 4.42620134353..."
3,7340,"[(621, 4.454606533050537), (644, 4.10312461853..."
4,32460,"[(298, 4.939304351806641), (145, 4.82760429382..."


#### Get recommendations for a subset of users
* Start off by creating a list of users who make up our subset
* Convert that list to a dataframe which will be used shortly

In [46]:
from pyspark.sql.types import IntegerType

# Generally we dont need to look for all user
# Create a list of users, whom we want to recommend
usersList = [148, 463, 267]
usersDF = spark.createDataFrame(usersList, 
                        IntegerType()).toDF('userId')

usersDF.take(3)

[Row(userId=148), Row(userId=463), Row(userId=267)]

#### Use the recommendForUserSubset function
This gets the recommendations for specific users

In [47]:
# Top 5 movie recommendation for these users
userRecs = model.recommendForUserSubset(usersDF, 5)
userRecs.toPandas()

Unnamed: 0,userId,recommendations
0,148,"[(83318, 5.351772308349609), (67504, 5.3517723..."
1,463,"[(83318, 4.828551292419434), (67504, 4.8285512..."
2,267,"[(65037, 5.2384562492370605), (93320, 5.235360..."


#### Extract recommendations for specific user
* We get a list comprising a Row object which in turn contains a list of Rows
* To get the movie names from the movieIds so we will need to perform some transformations

In [52]:
# Focusing on user 148
userMoviesList = userRecs.filter(userRecs['userId'] == 148)\
                                    .select('recommendations')

userMoviesList.collect()

[Row(recommendations=[Row(movieId=83318, rating=5.351772308349609), Row(movieId=67504, rating=5.351772308349609), Row(movieId=83411, rating=5.351772308349609), Row(movieId=83359, rating=5.351772308349609), Row(movieId=7096, rating=5.078452110290527)])]

#### Extract the list of recommendations
We get the list of Rows contining the movieId and rating for the user

In [53]:
moviesList = userMoviesList.collect()[0].recommendations
moviesList

[Row(movieId=83318, rating=5.351772308349609),
 Row(movieId=67504, rating=5.351772308349609),
 Row(movieId=83411, rating=5.351772308349609),
 Row(movieId=83359, rating=5.351772308349609),
 Row(movieId=7096, rating=5.078452110290527)]

#### Create a DataFrame containing the movieId and rating as columns
Use the moviesList created previously

In [54]:
moviesDF = spark.createDataFrame(moviesList)
moviesDF.toPandas()

Unnamed: 0,movieId,rating
0,83318,5.351772
1,67504,5.351772
2,83411,5.351772
3,83359,5.351772
4,7096,5.078452


#### The movie names are stored in a csv file called movies.csv
Load that into another dataframe

In [55]:
movieData = spark.read.csv('../datasets/movielens/movies.csv',
                              header=True,
                              ignoreLeadingWhiteSpace= True)
movieData.toPandas().head()

Unnamed: 0,movieId,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance
4,5,Father of the Bride Part II (1995),Comedy


In [56]:
# Top 5 movie recommendation for user number 148
recommendedMovies = movieData.join(moviesDF, on=['movieId'])\
                            .orderBy('rating', ascending=False)\
                                .select('title', 'genres', 'rating')

recommendedMovies.toPandas()

Unnamed: 0,title,genres,rating
0,"Goat, The (1921)",Comedy,5.351772
1,Land of Silence and Darkness (Land des Schweig...,Documentary,5.351772
2,Cops (1922),Comedy,5.351772
3,"Play House, The (1921)",Comedy,5.351772
4,Rivers and Tides (2001),Documentary,5.078452


In [57]:
from pyspark.sql.types import IntegerType

def getRecommendationsForUser(userId, numRecs):
    
    usersDF = spark.\
    createDataFrame([userId], IntegerType()).\
    toDF('userId')
    
    userRecs = model.recommendForUserSubset(usersDF, numRecs)
    
    moviesList = userRecs.collect()[0].recommendations
    moviesDF = spark.createDataFrame(moviesList)
    
    recommendedMovies = movieData.join(moviesDF, on=['movieId'])\
    .orderBy('rating', ascending=False)\
    .select('title', 'genres', 'rating')
    
    return recommendedMovies

In [58]:
# Top 10 movie recommendation for user no. 219
recommendationsForUser = getRecommendationsForUser(219, 10)
recommendationsForUser.toPandas()

Unnamed: 0,title,genres,rating
0,"Hello, Dolly! (1969)",Comedy|Musical|Romance,5.434948
1,Land of Silence and Darkness (Land des Schweig...,Documentary,5.366544
2,"Goat, The (1921)",Comedy,5.366544
3,Cops (1922),Comedy,5.366544
4,"Play House, The (1921)",Comedy,5.366544
5,Philomena (2013),Comedy|Drama,5.198465
6,Love Me If You Dare (Jeux d'enfants) (2003),Drama|Romance,5.135067
7,Before Midnight (2013),Drama|Romance,5.094049
8,Dead Man's Shoes (2004),Crime|Thriller,5.023376
9,Pandorum (2009),Horror|Sci-Fi|Thriller,4.978452
