In [4]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName('Use Collaborative Filtering for movie recommendations') \
    .getOrCreate()

rawData = spark.read\
            .format('csv')\
            .option('header', 'true')\
            .load('datasets/movielens/ratings.csv')

In [3]:
#rawData.toPandas().head()

#### Pick all columns except the timestamp

In [5]:
from pyspark.sql.functions import col

dataset = rawData.select(col('userId').cast('int'), 
                         col('movieId').cast('int'), 
                         col('rating').cast('float')
                        )

dataset.toPandas().head()

Unnamed: 0,userId,movieId,rating
0,1,31,2.5
1,1,1029,3.0
2,1,1061,3.0
3,1,1129,2.0
4,1,1172,4.0


#### Check the distribution of rating in the dataset

In [6]:
dataset.select('rating').toPandas().describe()

Unnamed: 0,rating
count,100004.0
mean,3.543608
std,1.058048
min,0.5
25%,3.0
50%,4.0
75%,4.0
max,5.0


#### Split into training and test data sets

In [7]:
(trainingData, testData) = dataset.randomSplit([0.8, 0.2])

### Define the Collaborative Filtering model

Uses the Alternating Least Squares algorithm to learn the latent factors

    - maxIter: The maximum number of iterations to run
    - regParam: Specifies the regularization parameter in ALS (defaults to 1.0)
    - coldStartStrategy: Strategy for handling unknown or new users/items during prediction (which was not encountered in training). Options are 'drop' and 'nan'. We will drop unknown users/items from the predictions



In [8]:
from pyspark.ml.recommendation import ALS

als = ALS(maxIter=5, 
          regParam=0.1, 
          userCol='userId', 
          itemCol='movieId', 
          ratingCol='rating',
          coldStartStrategy='drop')

### Build the ALSModel using the model definition and training data

In [10]:
model = als.fit(trainingData)

#### Get the predictions for the test data

In [11]:
predictions = model.transform(testData)
predictions.toPandas().head()

Unnamed: 0,userId,movieId,rating,prediction
0,232,463,4.0,3.630554
1,380,463,3.0,2.934693
2,534,463,4.0,3.69206
3,548,471,4.0,3.150016
4,292,471,3.5,3.94711


#### Compare the distribution of values for ratings and predictions

In [12]:
predictions.select('rating', 'prediction').toPandas().describe()

Unnamed: 0,rating,prediction
count,19550.0,19550.0
mean,3.559335,3.395237
std,1.051218,0.748203
min,0.5,-0.099011
25%,3.0,2.961224
50%,4.0,3.477877
75%,4.0,3.918231
max,5.0,5.505121


#### Get the Root Mean Square Error on the test data

In [13]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(metricName='rmse', 
                                labelCol='rating',
                                predictionCol='prediction')

rmse = evaluator.evaluate(predictions)
print('RMSE = ', rmse)

RMSE =  0.9173916784685738


#### The ALS model can be used to get predictions for all users
Specify the number of predictions you would like for each user

In [14]:
userRecsAll = model.recommendForAllUsers(3)
userRecsAll

DataFrame[userId: int, recommendations: array<struct<movieId:int,rating:float>>]

#### View the recommendations
For each userId there is a list of tuples representing a movieId and it's rating for the user

In [15]:
userRecsAll.toPandas().head()

Unnamed: 0,userId,recommendations
0,471,"[(3414, 5.053248405456543), (83411, 4.90093469..."
1,463,"[(67504, 4.9483642578125), (83411, 4.948364257..."
2,496,"[(3414, 5.679231643676758), (115569, 5.3586602..."
3,148,"[(67504, 5.735400199890137), (83411, 5.7354001..."
4,540,"[(67504, 5.983666896820068), (83411, 5.9836668..."


#### Get the top user recommendations for each movie
* The users who are most likely to like a particular movie
* Get the top 3 users

In [16]:
movieRecsAll = model.recommendForAllItems(3)
movieRecsAll.toPandas().head()

Unnamed: 0,movieId,recommendations
0,1580,"[(543, 4.801964282989502), (287, 4.80122661590..."
1,5300,"[(113, 4.604008197784424), (401, 4.56217908859..."
2,6620,"[(465, 4.738456726074219), (577, 4.63346385955..."
3,32460,"[(298, 4.923527240753174), (670, 4.85837888717..."
4,54190,"[(545, 4.807101249694824), (477, 4.36607646942..."


#### Get recommendations for a subset of users
* Start off by creating a list of users who make up our subset
* Convert that list to a dataframe which will be used shortly

In [17]:
from pyspark.sql.types import IntegerType

usersList = [148, 463, 267]
usersDF = spark.createDataFrame(usersList, IntegerType()).toDF('userId')

usersDF.take(3)

[Row(userId=148), Row(userId=463), Row(userId=267)]

#### Use the recommendForUserSubset function
This gets the recommendations for specific users

In [18]:
userRecs = model.recommendForUserSubset(usersDF, 5)
userRecs.toPandas()

Unnamed: 0,userId,recommendations
0,148,"[(67504, 5.735400199890137), (83318, 5.7354001..."
1,463,"[(83318, 4.9483642578125), (83411, 4.948364257..."
2,267,"[(83318, 5.408815860748291), (67504, 5.4088158..."


#### Extract recommendations for specific user
* We get a list comprising a Row object which in turn contains a list of Rows
* To get the movie names from the movieIds so we will need to perform some transformations

In [19]:
userMoviesList = userRecs.filter(userRecs.userId == 148)\
.select('recommendations')

userMoviesList.collect()

[Row(recommendations=[Row(movieId=67504, rating=5.735400199890137), Row(movieId=83318, rating=5.735400199890137), Row(movieId=83411, rating=5.735400199890137), Row(movieId=83359, rating=5.735400199890137), Row(movieId=3414, rating=5.230541229248047)])]

#### Extract the list of recommendations
We get the list of Rows contining the movieId and rating for the user

In [20]:
moviesList = userMoviesList.collect()[0].recommendations
moviesList

[Row(movieId=67504, rating=5.735400199890137),
 Row(movieId=83318, rating=5.735400199890137),
 Row(movieId=83411, rating=5.735400199890137),
 Row(movieId=83359, rating=5.735400199890137),
 Row(movieId=3414, rating=5.230541229248047)]

#### Create a DataFrame containing the movieId and rating as columns
Use the moviesList created previously

In [21]:
moviesDF = spark.createDataFrame(moviesList)
moviesDF.toPandas()

Unnamed: 0,movieId,rating
0,67504,5.7354
1,83318,5.7354
2,83411,5.7354
3,83359,5.7354
4,3414,5.230541


#### The movie names are stored in a csv file called movies.csv
Load that into another dataframe

In [22]:
movieData = sqlContext.read.csv('datasets/movielens/movies.csv',
                              header=True,
                              ignoreLeadingWhiteSpace= True)
movieData.toPandas().head()

Unnamed: 0,movieId,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance
4,5,Father of the Bride Part II (1995),Comedy


In [23]:
recommendedMovies = movieData.join(moviesDF, on=['movieId'])\
.orderBy('rating', ascending=False)\
.select('title', 'genres', 'rating')

recommendedMovies.toPandas()

Unnamed: 0,title,genres,rating
0,Land of Silence and Darkness (Land des Schweig...,Documentary,5.7354
1,"Goat, The (1921)",Comedy,5.7354
2,Cops (1922),Comedy,5.7354
3,"Play House, The (1921)",Comedy,5.7354
4,Love Is a Many-Splendored Thing (1955),Drama|Romance|War,5.230541


In [24]:
from pyspark.sql.types import IntegerType

def getRecommendationsForUser(userId, numRecs):
    
    usersDF = spark.\
    createDataFrame([userId], IntegerType()).\
    toDF('userId')
    
    userRecs = model.recommendForUserSubset(usersDF, numRecs)
    
    moviesList = userRecs.collect()[0].recommendations
    moviesDF = spark.createDataFrame(moviesList)
    
    recommendedMovies = movieData.join(moviesDF, on=['movieId'])\
    .orderBy('rating', ascending=False)\
    .select('title', 'genres', 'rating')
    
    return recommendedMovies

In [25]:
recommendationsForUser = getRecommendationsForUser(219, 10)
recommendationsForUser.toPandas()

Unnamed: 0,title,genres,rating
0,Event Horizon (1997),Horror|Sci-Fi|Thriller,5.493504
1,"Outlaw Josey Wales, The (1976)",Action|Adventure|Drama|Thriller|Western,5.375618
2,Pink Flamingos (1972),Comedy,5.29305
3,Let It Be (1970),Documentary,5.277246
4,May (2002),Drama|Horror,5.260504
5,"Goat, The (1921)",Comedy,5.200023
6,Cops (1922),Comedy,5.200023
7,"Play House, The (1921)",Comedy,5.200023
8,Land of Silence and Darkness (Land des Schweig...,Documentary,5.200023
9,Lake of Fire (2006),Documentary,5.174538


In [1]:
#some