# Building a Recommendation System in PySpark

## Movies Dataset

In [1]:
# import necessary libraries
from pyspark.sql import SparkSession

# instantiate SparkSession object
# spark = SparkSession.builder.master('local').getOrCreate()

spark = SparkSession\
        .builder\
        .appName('ALSExample').config('spark.driver.host', 'localhost')\
        .getOrCreate()

In [2]:
# read in the dataset into pyspark DataFrame
movie_ratings = spark.read.csv('ratings.csv', header='true', inferSchema='true')

In [3]:
#checking data types
movie_ratings.dtypes

[('userId', 'int'),
 ('movieId', 'int'),
 ('rating', 'double'),
 ('timestamp', 'int')]

In [4]:
#drop unwanted columns
movie_ratings = movie_ratings.drop('timestamp')

In [5]:
#Data already preprocessed we proceed to fit ALS
#Fitting the Alternating Least Squares Model
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml.recommendation import ALS

# split into training and testing sets
(training, test) = movie_ratings.randomSplit([0.8, 0.2])

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5,rank=4, regParam=0.01, userCol='userId', itemCol='movieId', ratingCol='rating',
          coldStartStrategy='drop')

# fit the ALS model to the training set
model = als.fit(training)

In [6]:
#Evaluating model performance

# importing appropriate library
from pyspark.ml.evaluation import RegressionEvaluator

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating',
                                predictionCol='prediction')
rmse = evaluator.evaluate(predictions)
print('Root-mean-square error = ' + str(rmse))

Root-mean-square error = 0.9821188167469224


In [7]:
#Cross-validation to Find the Optimal Model

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# initialize the ALS model
als_model = ALS(userCol='userId', itemCol='movieId', 
                ratingCol='rating', coldStartStrategy='drop')

# create the parameter grid                 
params = ParamGridBuilder()\
          .addGrid(als_model.regParam, [0.01, 0.001, 0.1])\
          .addGrid(als_model.rank, [4, 10, 50]).build()


# instantiating crossvalidator estimator
cv = CrossValidator(estimator=als_model, estimatorParamMaps=params,evaluator=evaluator,parallelism=4)
best_model = cv.fit(movie_ratings)    

# We see the best model has a rank of 50, so we will use that in our future models with this dataset
best_model.bestModel.rank

50

In [10]:
#Incorporating movie names
movie_titles = spark.read.csv('movies.csv',header='true',inferSchema='true')
movie_titles.head(5)

[Row(movieId=1, title='Toy Story (1995)', genres='Adventure|Animation|Children|Comedy|Fantasy'),
 Row(movieId=2, title='Jumanji (1995)', genres='Adventure|Children|Fantasy'),
 Row(movieId=3, title='Grumpier Old Men (1995)', genres='Comedy|Romance'),
 Row(movieId=4, title='Waiting to Exhale (1995)', genres='Comedy|Drama|Romance'),
 Row(movieId=5, title='Father of the Bride Part II (1995)', genres='Comedy')]

We will eventually be matching up the movie ids with the movie titles. In the cell below, create a function `name_retriever()` that takes in a `movie_id` and returns a string that represents the movie title. 

In [11]:
def name_retriever(movie_id, movie_title_df):
    return movie_title_df.where(movie_title_df.movieId == movie_id).take(1)[0]['title']

In [12]:
print(name_retriever(1023, movie_titles))

Winnie the Pooh and the Blustery Day (1968)


Now it's time to actually get some recommendations! The ALS model has built-in methods called `.recommendForUserSubset()` and `.recommendForAllUsers()`. We'll start off with using a subset of users.

In [13]:
users = movie_ratings.select(als.getUserCol()).distinct().limit(1)
userSubsetRecs = model.recommendForUserSubset(users, 10)
recs = userSubsetRecs.take(1)

We can now see we have a list of rows with recommended items. Now try and get the name of the top recommended movie by way of the function you just created, using number one item for this user.

In [14]:
# use indexing to obtain the movie id of top predicted rated item
first_recommendation = recs[0]['recommendations'][0][0]

# use the name retriever function to get the values
name_retriever(first_recommendation,movie_titles)

'Violet & Daisy (2011)'

We can also make recommendations for everyone, although this will take longer. In the next line, we are creating an RDD with the top 5 recommendations for every user and then selecting one user to find out his predictions:

In [15]:
recommendations = model.recommendForAllUsers(5)
recommendations.where(recommendations.userId == 3).collect()

[Row(userId=3, recommendations=[Row(movieId=91470, rating=7.31921911239624), Row(movieId=2017, rating=6.326812744140625), Row(movieId=101577, rating=6.122920989990234), Row(movieId=6932, rating=5.955191135406494), Row(movieId=3385, rating=5.838411808013916)])]

Now, it's time to put together all that you've learned in this section to create a function that will take in a new user and some movies they've rated and then return $n$ number of highest recommended movies. This function will have multiple different steps to it:

* Adding the new ratings into the DataFrame (hint: look into using the `.union()` method) 
* Fitting the ALS model  
* Make recommendations for the user of choice 
* Print out the names of the top $n$ recommendations in a reader-friendly manner 

In [16]:
#Rate new movies
def new_user_recs(user_id, new_ratings, rating_df, movie_title_df, num_recs):
    # turn the new_recommendations list into a spark DataFrame
    new_user_ratings = spark.createDataFrame(new_ratings,rating_df.columns)
    
    # combine the new ratings df with the rating_df
    movie_ratings_combined = rating_df.union(new_user_ratings)
    
    # split the dataframe into a train and test set
#     (training, test) = movie_ratings_combined.randomSplit([0.8, 0.2],seed=0)
    
    # create an ALS model and fit it
    als = ALS(maxIter=5,rank=50, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
    model = als.fit(movie_ratings_combined)
    
    # make recommendations for all users using the recommendForAllUsers method
    recommendations = model.recommendForAllUsers(num_recs)
    
    # get recommendations specifically for the new user that has been added to the DataFrame
    recs_for_user = recommendations.where(recommendations.userId == user_id).take(1)
    
    for ranking, (movie_id, rating) in enumerate(recs_for_user[0]['recommendations']):
        movie_string = name_retriever(movie_id,movie_title_df)
        print('Recommendation {}: {}  | predicted score :{}'.format(ranking+1,movie_string,rating))

In [17]:
user_id = 100000
user_ratings_1 = [(user_id,3253,5),
                  (user_id,2459,5),
                  (user_id,2513,4),
                  (user_id,6502,5),
                  (user_id,1091,5),
                  (user_id,441,4)]
new_user_recs(user_id,
             new_ratings=user_ratings_1,
             rating_df=movie_ratings,
             movie_title_df=movie_titles,
             num_recs = 10)

Recommendation 1: Princess Bride, The (1987)  | predicted score :5.555953025817871
Recommendation 2: Star Wars: Episode IV - A New Hope (1977)  | predicted score :5.5542731285095215
Recommendation 3: Forrest Gump (1994)  | predicted score :5.53633451461792
Recommendation 4: Star Wars: Episode VI - Return of the Jedi (1983)  | predicted score :5.520097255706787
Recommendation 5: Usual Suspects, The (1995)  | predicted score :5.494187831878662
Recommendation 6: Monty Python and the Holy Grail (1975)  | predicted score :5.454071998596191
Recommendation 7: Lord of the Rings: The Fellowship of the Ring, The (2001)  | predicted score :5.450919151306152
Recommendation 8: Life Is Beautiful (La Vita è bella) (1997)  | predicted score :5.449873924255371
Recommendation 9: Shawshank Redemption, The (1994)  | predicted score :5.444922924041748
Recommendation 10: Highlander (1986)  | predicted score :5.437699317932129


***So here we have it! Our recommendation system is generating recommendations for the top 10 movies.***

## Summary
​We haveou built a model using Spark, performed some parameter selection, and updated the model every time new user preferences came in. You looked at how Spark's ALS implementation can be used to build a scalable and efficient recommendation system. You also saw that such systems can become computationally expensive and using them with an online system could be a problem with traditional computational platforms. Spark's distributed computing architecture provides a great solution to deploy such recommendation systems for real-world applications (think Amazon, Spotify).

In [18]:
spark.stop()