# Recommender Model using Alternating Least Squares

The goal of this project is to build a recommender engine, that uses a collaborative filtering methodology, to recommend to users some movies to watch. The collaborative filtering methodology means that the engine will find movies watched and rated by multiple users and it will then recommend to a particular user movies that are rated favorably by similar users. 

# Imports

In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession, Row
import pyspark.sql.functions as F

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Initialize a Spark Session

In [2]:
spark = (SparkSession.builder
         .config("spark.driver.memory", "64g")
         .config("spark.dynamicAllocation.enabled", "true")
         .config("spark.dynamicAllocation.minExecutors", 2)
         .config("spark.dynamicAllocation.schedulerBacklogTimeout", "1m")
         .config("spark.dynamicAllocation.maxExecutor", 12)
         .config("spark.dynamicAllocation.executorIdleTimeout", "2min")
         .appName("SparkML")
         .getOrCreate())

# Load the data sets

In [3]:
ratings = spark.read.csv("Data/ratings.csv", header=True, inferSchema=True)
ratings = ratings.select("*").withColumn('rating', F.col('rating').cast('float'))
ratings.dtypes

[('userId', 'int'),
 ('movieId', 'int'),
 ('rating', 'float'),
 ('timestamp', 'int')]

In [4]:
movies = spark.read.csv("Data/movies.csv", header=True, inferSchema=True)
movies = movies.select('movieId', 'title', 'genres').withColumn('movieId', F.col('movieId').cast('int'))
movies.dtypes

[('movieId', 'int'), ('title', 'string'), ('genres', 'string')]

# Recommender model

In [5]:
# Create test and train set
(train, test) = ratings.randomSplit([0.8, 0.2], seed = 555)

# Alternating Least Squares model
als = (
    ALS()
    .setUserCol("userId")
    .setItemCol("movieId")
    .setRatingCol("rating")
    .setNonnegative(True)
    .setImplicitPrefs(False)
    .setColdStartStrategy("drop")
    .setSeed(555)
)

In [6]:
# Explanation of the parameters available for ALS
als.explainParams()

"alpha: alpha for implicit preference (default: 1.0)\nblockSize: block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data. (default: 4096)\ncheckpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext. (default: 10)\ncoldStartStrategy: strategy for dealing with unknown or new users/items at prediction time. This may be useful in cross-validation or production scenarios, for handling user/item ids the model has not seen in the training data. Supported values: 'nan', 'drop'. (default: nan, current: drop)\nfinalStorageLevel: StorageLevel for ALS model factors. (default: MEMORY_AND_DISK)\nimplicitPrefs: whether to use implicit preference (default: False, current: False)\nintermediate

# Hyper-Parameter Tuning

In [7]:
# Add hyperparameters and their respective values to param_grid
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [50, 100]) \
            .addGrid(als.regParam, [.01, .05]) \
            .addGrid(als.maxIter, [5, 10]) \
            .build()

           
# Define evaluator as RMSE and print length of evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction") 
print ("Num models to be tested: ", len(param_grid))

Num models to be tested:  8


# Use Cross Validation

In [8]:
# Build cross validation using CrossValidator
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=3)

#Fit cross validator to the 'train' dataset
model = cv.fit(train)

#Extract best model from the cv model above
best_model = model.bestModel

In [9]:
# Check the best hyper params 
print(best_model._java_obj.parent().getRegParam(),
best_model._java_obj.parent().getRank(),
best_model._java_obj.parent().getMaxIter()
     )


0.05 100 10


In [10]:
# Check the performance on the test set
test_predictions = model.transform(test)
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

0.9330405567880267


In [11]:
# Compare predictions to true values
test_predictions.select('userId', 'movieId', 'rating', 'prediction').show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   597|    471|   2.0|  4.040897|
|   436|    471|   3.0|  3.901048|
|   602|    471|   4.0| 2.6662421|
|   555|    471|   3.0|  3.367009|
|   287|    471|   4.5| 2.7757618|
|    32|    471|   3.0| 3.5545151|
|   414|    471|   5.0| 3.6757836|
|    44|    833|   2.0| 1.9619899|
|   307|    833|   1.0| 1.3297665|
|   492|    833|   4.0| 1.8884012|
|   599|   1088|   2.5| 2.2798963|
|    47|   1088|   4.0|  2.456705|
|   132|   1088|   4.0| 2.8928294|
|   381|   1088|   3.5| 3.6785822|
|   555|   1088|   4.0| 3.1095972|
|    84|   1088|   3.0| 3.3638976|
|    10|   1088|   3.0| 2.4173305|
|   200|   1088|   4.0| 3.4741125|
|   188|   1088|   4.0|  3.901537|
|   525|   1088|   4.5| 3.3552601|
+------+-------+------+----------+
only showing top 20 rows



# Make Recommendations

In [12]:
nrecommendations = best_model.recommendForAllUsers(10) # Generate 10 recommendations for all users
nrecommendations.limit(10).show(truncate = False)

+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                                                                                                                      |
+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|471   |[{2324, 4.7729206}, {79702, 4.6961594}, {50, 4.4881716}, {1148, 4.468414}, {1, 4.466831}, {158966, 4.41828}, {527, 4.4088154}, {1203, 4.3847857}, {168252, 4.3651834}, {92259, 4.3546176}]           |
|463   |[{2324, 4.641824}, {293, 4.544984}, {110, 4.481902}, {8874, 4.467929}, {54997, 4.4452295}, {68157, 4.4204025}, {318, 4.381118}, {51255, 4.378471}, {750, 4.364389}, 

In [13]:
nrecommendations = (
    nrecommendations
    .withColumn("rec_exp", F.explode("recommendations"))
    .select('userId', F.col("rec_exp.movieId").alias("Recommended movieId"), F.col("rec_exp.rating").alias("Predicted rating"))
                   )

nrecommendations.limit(10).show()

+------+-------------------+----------------+
|userId|Recommended movieId|Predicted rating|
+------+-------------------+----------------+
|   471|               2324|       4.7729206|
|   471|              79702|       4.6961594|
|   471|                 50|       4.4881716|
|   471|               1148|        4.468414|
|   471|                  1|        4.466831|
|   471|             158966|         4.41828|
|   471|                527|       4.4088154|
|   471|               1203|       4.3847857|
|   471|             168252|       4.3651834|
|   471|              92259|       4.3546176|
+------+-------------------+----------------+



# Check the Recommendations

In [14]:
rec_movies = [nrecommendations.select('Recommended movieId').where('userId = 2').limit(10).collect()[i][0] for i in range(0,10)]
rec_movies

[131724, 441, 122882, 58998, 60756, 106782, 7361, 4848, 148626, 56782]

In [15]:
# Check a particular user's watched and reviewed movies and sort by the highest rating
ratings.join(movies, ratings['movieId']==movies['movieId']).filter('userId = 2').sort('rating', ascending=False).limit(10).show(truncate = False)

+------+-------+------+----------+-------+----------------------------------------------------+--------------------------------+
|userId|movieId|rating|timestamp |movieId|title                                               |genres                          |
+------+-------+------+----------+-------+----------------------------------------------------+--------------------------------+
|2     |60756  |5.0   |1445714980|60756  |Step Brothers (2008)                                |Comedy                          |
|2     |106782 |5.0   |1445714966|106782 |Wolf of Wall Street, The (2013)                     |Comedy|Crime|Drama              |
|2     |80906  |5.0   |1445715172|80906  |Inside Job (2010)                                   |Documentary                     |
|2     |89774  |5.0   |1445715189|89774  |Warrior (2011)                                      |Drama                           |
|2     |122882 |5.0   |1445715272|122882 |Mad Max: Fury Road (2015)                           |Ac

In [16]:
# Check what movies were recommended to this user by our recommender engine
movies.select("*").where(F.col('movieId').isin(rec_movies)).show(truncate=False)

+-------+----------------------------------------------------+--------------------------------------+
|movieId|title                                               |genres                                |
+-------+----------------------------------------------------+--------------------------------------+
|441    |Dazed and Confused (1993)                           |Comedy                                |
|4848   |Mulholland Drive (2001)                             |Crime|Drama|Film-Noir|Mystery|Thriller|
|7361   |Eternal Sunshine of the Spotless Mind (2004)        |Drama|Romance|Sci-Fi                  |
|56782  |There Will Be Blood (2007)                          |Drama|Western                         |
|58998  |Forgetting Sarah Marshall (2008)                    |Comedy|Romance                        |
|60756  |Step Brothers (2008)                                |Comedy                                |
|106782 |Wolf of Wall Street, The (2013)                     |Comedy|Crime|Drama  

Looking at the recommendations, a lot of Comedy, Drama movies were recommended to this user. Looking at the user's ratings it seems that these recommendations make sense!

In [17]:
# End the spark session
spark.stop()