# Big Data Platforms

## Movie Recommender System - Collaborative Filtering

DataSet:

https://www.kaggle.com/rounakbanik/the-movies-dataset

Source:

http://www.3leafnodes.com/apache-spark-introduction-recommender-system

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('MovieRecommender').getOrCreate()
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

### Import Data

In [28]:
ratings = spark.read.csv("T:\\courses\\BigData\\data\\the-movies-dataset\\ratings_small.csv", inferSchema=True, header=True)
movies = spark.read.csv("T:\\courses\\BigData\\data\\the-movies-dataset\\movies_metadata.csv", inferSchema=True, header=True)

In [3]:
ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



### Data Exploration

In [4]:
ratings.columns

['userId', 'movieId', 'rating', 'timestamp']

In [5]:
movies.columns

['adult',
 'belongs_to_collection',
 'budget',
 'genres',
 'homepage',
 'id',
 'imdb_id',
 'original_language',
 'original_title',
 'overview',
 'popularity',
 'poster_path',
 'production_companies',
 'production_countries',
 'release_date',
 'revenue',
 'runtime',
 'spoken_languages',
 'status',
 'tagline',
 'title',
 'video',
 'vote_average',
 'vote_count']

In [6]:
ratings = ratings.select(['userId', 'movieId', 'rating'])

In [7]:
ratings.head(5)

[Row(userId=1, movieId=31, rating=2.5),
 Row(userId=1, movieId=1029, rating=3.0),
 Row(userId=1, movieId=1061, rating=3.0),
 Row(userId=1, movieId=1129, rating=2.0),
 Row(userId=1, movieId=1172, rating=4.0)]

In [8]:
ratings.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|     31|   2.5|
|     1|   1029|   3.0|
|     1|   1061|   3.0|
|     1|   1129|   2.0|
|     1|   1172|   4.0|
|     1|   1263|   2.0|
|     1|   1287|   2.0|
|     1|   1293|   2.0|
|     1|   1339|   3.5|
|     1|   1343|   2.0|
|     1|   1371|   2.5|
|     1|   1405|   1.0|
|     1|   1953|   4.0|
|     1|   2105|   4.0|
|     1|   2150|   3.0|
|     1|   2193|   2.0|
|     1|   2294|   2.0|
|     1|   2455|   2.5|
|     1|   2968|   1.0|
|     1|   3671|   3.0|
+------+-------+------+
only showing top 20 rows



In [9]:
ratings.describe().show()

+-------+------------------+------------------+------------------+
|summary|            userId|           movieId|            rating|
+-------+------------------+------------------+------------------+
|  count|            100004|            100004|            100004|
|   mean| 347.0113095476181|12548.664363425463| 3.543608255669773|
| stddev|195.16383797819535|26369.198968815268|1.0580641091070326|
|    min|                 1|                 1|               0.5|
|    max|               671|            163949|               5.0|
+-------+------------------+------------------+------------------+



In [10]:
training, test = ratings.randomSplit([0.8,0.2])

### ALS

[Alternating Least Squares(ALS)](https://spark.apache.org/docs/latest/ml-collaborative-filtering.html) is a the model we’ll use to fit our data and find similarities. ALS is an iterative optimization process where we for every iteration try to arrive closer and closer to a factorized representation of our original data.

### Cold Start Predictions

When there are cold start users or items to make predictions on (ones not available in the model) the predictions produce NaNs as shown in the summary below. This also causes evaluation with the mean squared error to produce a NaN.To solve this problem, the rows can be dropped with <code>predictions.na.drop()</code>. A more streamlined way is to add the <code>coldStartStrategy="drop"</code> as a model parameter.

In [21]:
# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=10, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop", nonnegative = True)

In [22]:
#fit and predict
model = als.fit(training)
predictions = model.transform(test)

In [23]:
predictions.show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   126|    471|   5.0| 4.0813065|
|   274|    471|   5.0| 3.4201708|
|   292|    471|   3.5| 4.1559453|
|    19|    471|   3.0| 3.5502396|
|   659|    471|   4.0| 3.2783334|
|   380|    471|   4.0|  3.194829|
|   649|    471|   3.0| 3.2985878|
|   585|    471|   4.0| 3.7485852|
|   195|    471|   3.0| 3.0037432|
|   574|    471|   3.5| 3.5628338|
|   582|   1088|   3.5|  3.015482|
|    57|   1088|   4.0| 4.2917175|
|   531|   1088|   5.0| 2.2568505|
|   264|   1088|   4.0| 4.4893107|
|   418|   1088|   5.0| 4.7282343|
|   160|   1088|   4.0|  4.528123|
|   621|   1088|   3.5|  3.658979|
|   200|   1088|   1.0| 2.7792714|
|   303|   1088|   3.0| 3.1088583|
|   105|   1088|   3.0| 2.7955267|
+------+-------+------+----------+
only showing top 20 rows



In [24]:
predictions = predictions.na.drop()
predictions.show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   126|    471|   5.0| 4.0813065|
|   274|    471|   5.0| 3.4201708|
|   292|    471|   3.5| 4.1559453|
|    19|    471|   3.0| 3.5502396|
|   659|    471|   4.0| 3.2783334|
|   380|    471|   4.0|  3.194829|
|   649|    471|   3.0| 3.2985878|
|   585|    471|   4.0| 3.7485852|
|   195|    471|   3.0| 3.0037432|
|   574|    471|   3.5| 3.5628338|
|   582|   1088|   3.5|  3.015482|
|    57|   1088|   4.0| 4.2917175|
|   531|   1088|   5.0| 2.2568505|
|   264|   1088|   4.0| 4.4893107|
|   418|   1088|   5.0| 4.7282343|
|   160|   1088|   4.0|  4.528123|
|   621|   1088|   3.5|  3.658979|
|   200|   1088|   1.0| 2.7792714|
|   303|   1088|   3.0| 3.1088583|
|   105|   1088|   3.0| 2.7955267|
+------+-------+------+----------+
only showing top 20 rows



In [29]:
movies = movies.select('id','title','genres')
predictions = predictions.join(movies, movies.id == predictions.movieId)
predictions.show(truncate = False)

+------+-------+------+----------+----+-----------+--------------------------------------------------------------+
|userId|movieId|rating|prediction|id  |title      |genres                                                        |
+------+-------+------+----------+----+-----------+--------------------------------------------------------------+
|126   |471    |5.0   |4.0813065 |471 |Bandyta    |[{'id': 18, 'name': 'Drama'}]                                 |
|274   |471    |5.0   |3.4201708 |471 |Bandyta    |[{'id': 18, 'name': 'Drama'}]                                 |
|292   |471    |3.5   |4.1559453 |471 |Bandyta    |[{'id': 18, 'name': 'Drama'}]                                 |
|19    |471    |3.0   |3.5502396 |471 |Bandyta    |[{'id': 18, 'name': 'Drama'}]                                 |
|659   |471    |4.0   |3.2783334 |471 |Bandyta    |[{'id': 18, 'name': 'Drama'}]                                 |
|380   |471    |4.0   |3.194829  |471 |Bandyta    |[{'id': 18, 'name': 'Drama'}]

### Prediction Performance 

The RMSE with 100,004 data points is 1.1244220. 

Adding additional data points (26,024,289) is expected to increase the prediction performance. Run this notebook with the full dataset to see the lift.

In [30]:
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating')
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 1.0402709708916424


### Predictions

In [31]:
# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
userRecs.show(10)

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   471|[[4965, 5.910043]...|
|   463|[[5792, 5.6284003...|
|   496|[[65188, 7.088074...|
|   148|[[1192, 6.1758485...|
|   540|[[1162, 10.622456...|
|   392|[[3328, 8.12228],...|
|   243|[[3684, 5.260863]...|
|   623|[[7063, 8.133765]...|
|    31|[[850, 5.920644],...|
|   516|[[4967, 6.0645213...|
+------+--------------------+
only showing top 10 rows



In [33]:
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)

movieRecs.show(10, truncate=False)

+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|movieId|recommendations                                                                                                                                                                    |
+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1580   |[[113, 5.152397], [543, 5.014614], [546, 4.993715], [556, 4.824686], [28, 4.806556], [482, 4.7947316], [137, 4.7783413], [656, 4.76151], [484, 4.7389026], [89, 4.7019334]]        |
|5300   |[[337, 6.888681], [123, 6.7578964], [228, 5.8700266], [244, 5.8230276], [483, 5.5648847], [545, 5.4522266], [542, 5.448466], [155, 5.359071], [70, 5.3525], [411, 5.284403]]       |
|6620   |[[123, 8.250882], [337, 6.9006367], [395,

In [34]:
# Generate top 10 movie recommendations for a specified set of users
users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)

users.show(10)

+------+
|userId|
+------+
|   148|
|   463|
|   471|
+------+



In [35]:
# Generate top 10 user recommendations for a specified set of movies
movies = ratings.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10)

movies.show(10)

+-------+
|movieId|
+-------+
|   1580|
|   2659|
|   3794|
+-------+

