# Movie Recommendation Engines

## 1. Import packages and data

### 1.1 Spark configuration

In [134]:
# Import packages
import os
import pyspark
import pyspark.sql.functions as F  

from pyspark import SparkConf
from pyspark.context import SparkContext

from pyspark.sql.functions import min, max, avg,col
import pyspark.sql.types as T

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import HashingTF, MinHashLSH



In [2]:
# Import the PySpark module
from pyspark.sql import SparkSession

# Create SparkSession object
spark = SparkSession.builder \
                    .master('local[*]') \
                    .appName('recommender') \
                    .getOrCreate()


# Print out Spark version
print(spark.version)


3.0.2


In [3]:
# Create checkpoint
sc = SparkContext.getOrCreate(SparkConf())
sc.setCheckpointDir('checkpoint/')


In [46]:
# Name of the Spark application instance
app_name = spark.conf.get('spark.app.name')
print("Name: %s" % app_name)

# Driver TCP port
driver_tcp_port = spark.conf.get('spark.driver.port')
print("Driver TCP port: %s" % driver_tcp_port)

# Number of join partitions
num_partitions = spark.conf.get('spark.sql.shuffle.partitions')
print("Number of partitions: %s" % num_partitions)


Name: recommender
Driver TCP port: 54271
Number of partitions: 200


### 1.2 Import data

In [4]:
# Read data from CSV file
ratings = spark.read.csv('ratings.csv',
                         sep=',',
                         header=True,
                         inferSchema=True,
                         nullValue='NA')
# Get number of records
print("The data contain %d records." % ratings.count())


The data contain 100836 records.


In [12]:
ratings.printSchema()


root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [14]:
ratings.show(5)


+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [7]:
# Read data from CSV file
movies = spark.read.csv('movies.csv',
                         sep=',',
                         header=True,
                         inferSchema=True,
                         nullValue='NA')
# Get number of records
print("The data contain %d records." % movies.count())


The data contain 9742 records.


In [15]:
movies.show(5)


+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [13]:
movies.printSchema()


root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



## 2. Content-based recommendation engine
       The main approach is to calculate Jaccard similarity amongs movies based on their genres. 


### 2.1 Preprocess data

In [74]:
# Split genres on the | character 
genres_split = F.split(movies['genres'], '\|')
movies_genres_split = movies.withColumn('genre_split', genres_split)

In [93]:
# Create df with movieId and genre_split 
id_genres = movies_genres_split.select('movieId','genre_split')
id_genres.show(5)

+-------+--------------------+
|movieId|         genre_split|
+-------+--------------------+
|      1|[Adventure, Anima...|
|      2|[Adventure, Child...|
|      3|   [Comedy, Romance]|
|      4|[Comedy, Drama, R...|
|      5|            [Comedy]|
+-------+--------------------+
only showing top 5 rows



In [131]:
# Apply  HashingTF and MinHashLSH function to return the right format to feed to algorithm
classifier = Pipeline(stages=[
                              HashingTF(inputCol="genre_split", outputCol="vectors"),
                              MinHashLSH(inputCol="vectors", outputCol="lsh")
                             ]).fit(id_genres)





### 2.2 Calculate Jaccard similarity

In [132]:
# Save fitted model
classifier.write().save("content_based_filter")

# Load back to use
classifier_ = PipelineModel.load("content_based_filter")

Py4JJavaError: An error occurred while calling o38928.save.
: java.io.IOException: Path content_based_filter already exists. To overwrite it, please use write.overwrite().save(path) for Scala and use write().overwrite().save(path) for Java and Python.
	at org.apache.spark.ml.util.FileSystemOverwrite.handleOverwrite(ReadWrite.scala:683)
	at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:167)
	at org.apache.spark.ml.PipelineModel$PipelineModelWriter.super$save(Pipeline.scala:344)
	at org.apache.spark.ml.PipelineModel$PipelineModelWriter.$anonfun$save$4(Pipeline.scala:344)
	at org.apache.spark.ml.MLEvents.withSaveInstanceEvent(events.scala:176)
	at org.apache.spark.ml.MLEvents.withSaveInstanceEvent$(events.scala:171)
	at org.apache.spark.ml.util.Instrumentation.withSaveInstanceEvent(Instrumentation.scala:42)
	at org.apache.spark.ml.PipelineModel$PipelineModelWriter.$anonfun$save$3(Pipeline.scala:344)
	at org.apache.spark.ml.PipelineModel$PipelineModelWriter.$anonfun$save$3$adapted(Pipeline.scala:344)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.PipelineModel$PipelineModelWriter.save(Pipeline.scala:344)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [133]:
# Transform id_genres
id_genres_hashed = classifier.transform(id_genres)
id_genres_hashed.show(5)

+-------+--------------------+--------------------+----------------+
|movieId|         genre_split|             vectors|             lsh|
+-------+--------------------+--------------------+----------------+
|      1|[Adventure, Anima...|(262144,[4915,139...|[[5.87112014E8]]|
|      2|[Adventure, Child...|(262144,[13956,42...|[[5.87112014E8]]|
|      3|   [Comedy, Romance]|(262144,[4915,160...|[[9.15231855E8]]|
|      4|[Comedy, Drama, R...|(262144,[4915,160...|[[9.15231855E8]]|
|      5|            [Comedy]|(262144,[4915],[1...|[[9.15231855E8]]|
+-------+--------------------+--------------------+----------------+
only showing top 5 rows



In [98]:
# Calculate Jaccard similarity amongs movies

genres_matches = classifier.stages[-1].approxSimilarityJoin(id_genres_hashed, id_genres_hashed, 0.9)


In [99]:
genres_matches.show(5)

+--------------------+--------------------+------------------+
|            datasetA|            datasetB|           distCol|
+--------------------+--------------------+------------------+
|[1, [Adventure, A...|[37830, [Action, ...|0.5714285714285714|
|[1, [Adventure, A...|[54001, [Adventur...|0.7142857142857143|
|[1, [Adventure, A...|[106489, [Adventu...|0.6666666666666667|
|[2, [Adventure, C...|[3516, [Comedy, F...|               0.8|
|[3, [Comedy, Roma...|[1746, [Comedy], ...|               0.5|
+--------------------+--------------------+------------------+
only showing top 5 rows



In [107]:
# Return movie-paired df with distance
movies_dist = genres_matches.select (F.col('datasetA.movieId').alias('1st_movie'),
                                    F.col('datasetB.movieId').alias('2nd_movie'),
                                    F.col('distCol'))
movies_dist.show(5)

+---------+---------+------------------+
|1st_movie|2nd_movie|           distCol|
+---------+---------+------------------+
|        1|    37830|0.5714285714285714|
|        1|    54001|0.7142857142857143|
|        1|   106489|0.6666666666666667|
|        2|     3516|               0.8|
|        3|     1746|               0.5|
+---------+---------+------------------+
only showing top 5 rows



In [108]:
# Calculate similarity from distance

# Create function to calculate similarity
def similarity(distance):
    return 1- distance

# define UDF
udfsim = F.udf(similarity, T.FloatType())

# Create similarity using UDF
movies_dist = movies_dist.withColumn('similarity', udfsim(movies_dist.distCol))

# Show the DataFrame
movies_dist.show()

+---------+---------+-------------------+----------+
|1st_movie|2nd_movie|            distCol|similarity|
+---------+---------+-------------------+----------+
|        1|    37830| 0.5714285714285714|0.42857143|
|        1|    54001| 0.7142857142857143| 0.2857143|
|        1|   106489| 0.6666666666666667|0.33333334|
|        2|     3516|                0.8|       0.2|
|        3|     1746|                0.5|       0.5|
|        3|     2431| 0.6666666666666667|0.33333334|
|        3|     4347| 0.6666666666666667|0.33333334|
|        3|     6619|                0.5|       0.5|
|        3|    62439|                0.0|       1.0|
|        3|    66509| 0.6666666666666667|0.33333334|
|        3|    96616|                0.5|       0.5|
|        3|   104076|               0.75|      0.25|
|        4|     6415|0.33333333333333337| 0.6666667|
|        4|     8614|0.33333333333333337| 0.6666667|
|        4|    31049|               0.75|      0.25|
|        4|    80584|                0.0|     

In [116]:
# df with 1st movie title
sim_with_title = movies_dist.join(movies, F.col('1st_movie') == F.col('movieId'))\
                            .select('1st_movie',F.col('title').alias('1st_movie_title'),'similarity')

In [117]:
sim_with_title.show(2)

+---------+----------------+---------+----------+
|1st_movie| 1st_movie_title|2nd_movie|similarity|
+---------+----------------+---------+----------+
|        1|Toy Story (1995)|    37830|0.42857143|
|        1|Toy Story (1995)|    54001| 0.2857143|
+---------+----------------+---------+----------+
only showing top 2 rows



In [118]:
# df with 1st and 2nd movie title
sim_all_titles = sim_with_title.join(movies, F.col('2nd_movie')== F.col('movieId'))\
                               .select('1st_movie', '1st_movie_title',\
                                       '2nd_movie', F.col('title').alias('2nd_movie_title'),'similarity')

In [119]:
sim_all_titles.show(2)

+---------+----------------+---------+--------------------+----------+
|1st_movie| 1st_movie_title|2nd_movie|     2nd_movie_title|similarity|
+---------+----------------+---------+--------------------+----------+
|        1|Toy Story (1995)|    37830|Final Fantasy VII...|0.42857143|
|        1|Toy Story (1995)|    54001|Harry Potter and ...| 0.2857143|
+---------+----------------+---------+--------------------+----------+
only showing top 2 rows



### 2.3 Recommendations

In [122]:
# Create function to return top 5 recommended movies
def recommended_movies(movie_title):
    return sim_all_titles.filter(col('1st_movie_title')== movie_title)\
                         .select('2nd_movie', '2nd_movie_title','similarity')\
                         .orderBy("similarity", ascending = False).show(5)

In [125]:
recommended_movies("Captain America: The First Avenger (2011)")

+---------+--------------------+----------+
|2nd_movie|     2nd_movie_title|similarity|
+---------+--------------------+----------+
|    87232|X-Men: First Clas...|       1.0|
|    88140|Captain America: ...|       1.0|
|     5264|Clockstoppers (2002)|       0.8|
|     1127|   Abyss, The (1989)|       0.8|
|    70336|G.I. Joe: The Ris...|       0.8|
+---------+--------------------+----------+
only showing top 5 rows



## 3. Collaborative filtering engines
   This engine recommends movies for each user based on their ratings.   
   Using ALS algorithm, we will be able to find expected user's ratings on unseen movies based on their past behavior and from there, recommend high predicted rating movies to users.
       

### 3.1 Inspect data

Since ALS works better on sparsed data, we will inspect some figures

In [23]:
# Variables to to calculate sparsity
total_ratings = ratings.select('rating').count()
num_users = ratings.select('userId').distinct().count()
num_movies = ratings.select('movieId').distinct().count()

# Calculate sparsity
sparsity = (1.0 - float(total_ratings)/ (num_users * num_movies))*100
print( 'Rating is','%.2f' %sparsity + '% empty')


Rating is 98.30% empty


In [41]:
# Inspect min, max and, avg of ratings per movie and user
print(" Stats for ratings per movie:")
ratings.groupBy('movieId').count().select(min('count')).show()
ratings.groupBy('movieId').count().select(max('count')).show()
ratings.groupBy('movieId').count().select(avg('count')).show()

print(" Stats for ratings per user:")
ratings.groupBy('userId').count().select(min('count')).show()
ratings.groupBy('userId').count().select(max('count')).show()
ratings.groupBy('userId').count().select(avg('count')).show()


 Stats for ratings per movie:
+----------+
|min(count)|
+----------+
|         1|
+----------+

+----------+
|max(count)|
+----------+
|       329|
+----------+

+------------------+
|        avg(count)|
+------------------+
|10.369806663924312|
+------------------+

 Stats for ratings per user:
+----------+
|min(count)|
+----------+
|        20|
+----------+

+----------+
|max(count)|
+----------+
|      2698|
+----------+

+------------------+
|        avg(count)|
+------------------+
|165.30491803278687|
+------------------+



### 3.2 ALS

In [8]:
# Create train and test set using ratings df without timestamp
ratings_df = ratings.select('userId', 'movieId', 'rating')

(train,test) = ratings_df.randomSplit([0.8,0.2], seed =42)


In [9]:
# Create ALS model
als = ALS (userCol ='userId',
            itemCol = 'movieId',
            ratingCol = 'rating',
            nonnegative =True,
            implicitPrefs = False,
           coldStartStrategy="drop")


In [12]:
# Create param_grid to tune model
param_grid = ParamGridBuilder() \
            .addGrid(als.rank,  [80,100,150] ) \
            .addGrid(als.maxIter, [50,100,150]) \
            .addGrid(als.regParam,  [.05,.1,.15,.2] ) \
            .build()


In [13]:
# create evaluator object
evaluator = RegressionEvaluator(metricName="rmse", 
                                labelCol="rating", 
                                predictionCol="prediction") 
# Check number of test model
print ("Number of test models: ", len(param_grid))



Number of test models:  36


In [14]:
# create CrossValidator object
cv = CrossValidator(estimator=als, 
                    estimatorParamMaps=param_grid,
                    evaluator=evaluator,
                    numFolds=5)

# Confirm cv was built
print(cv)


CrossValidator_7620d1bdbd6e


In [15]:
ALS.checkpointInterval = 2


In [16]:
#Fit to train set
models = cv.fit(train)



NameError: name 'model' is not defined

In [29]:
# Return best model
best_model = models.bestModel
print(type(best_model))

# # Return "Rank"
print("  Rank:", best_model._java_obj.parent().getRank())

# Return "MaxIter"
print("  MaxIter:", best_model._java_obj.parent().getMaxIter())

# Return "RegParam"
print("  RegParam:", best_model._java_obj.parent().getRegParam())

<class 'pyspark.ml.recommendation.ALSModel'>
  Rank: 80
  MaxIter: 50
  RegParam: 0.15


In [23]:
# View RMSE
predictions = best_model.transform(test)
RMSE = evaluator.evaluate(predictions)
print(RMSE)


0.8664535766468544


### 3.3 Recommendations

In [30]:
# Generate 5 best recommendations for all users
recommendations = best_model.recommendForAllUsers(5)
recommendations.show()


+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   471|[[96004, 4.681238...|
|   463|[[33649, 4.896413...|
|   496|[[5747, 4.2322197...|
|   148|[[33649, 4.344435...|
|   540|[[96004, 5.216998...|
|   392|[[25771, 4.704468...|
|   243|[[67618, 5.429157...|
|    31|[[33649, 5.014798...|
|   516|[[4429, 4.8194695...|
|   580|[[6300, 4.6803193...|
|   251|[[96004, 5.563831...|
|   451|[[96004, 5.266256...|
|    85|[[1140, 4.856921]...|
|   137|[[96004, 4.808377...|
|    65|[[96004, 4.787784...|
|   458|[[67618, 5.209992...|
|   481|[[51931, 3.946999...|
|    53|[[96004, 6.692408...|
|   255|[[1739, 3.8876514...|
|   588|[[96004, 4.246173...|
+------+--------------------+
only showing top 20 rows



In [34]:
# Explode recommendations vector of each userId
recommendations = recommendations.withColumn("recommendations_explode", F.explode("recommendations"))\
                                 .select('userId', col("recommendations_explode.movieId"), col("recommendations_explode.rating"))
recommendations.show(10)

+------+-------+---------+
|userId|movieId|   rating|
+------+-------+---------+
|   471|  96004| 4.681238|
|   471|   3379| 4.681238|
|   471|  33649| 4.640755|
|   471| 177593| 4.614352|
|   471|  89904| 4.588294|
|   463|  33649|4.8964133|
|   463|  96004|4.8776793|
|   463|   3379|4.8776793|
|   463| 171495| 4.833774|
|   463|  78836| 4.792316|
+------+-------+---------+
only showing top 10 rows



In [54]:
# getmovierecom function to return recommended movies
def getmovierecom(Id):
    recom_df = recommendations.join(movies, on='movieId').filter(col('userId')==Id)
    return recom_df.select('movieId', 'title','genres','rating').show()


In [55]:
# test function for userId 1
getmovierecom(1)

+-------+--------------------+--------------------+---------+
|movieId|               title|              genres|   rating|
+-------+--------------------+--------------------+---------+
|  96004|Dragon Ball Z: Th...|Action|Adventure|...| 5.559495|
|   3379| On the Beach (1959)|               Drama| 5.559495|
| 132333|         Seve (2014)|   Documentary|Drama| 5.530093|
|  33649|  Saving Face (2004)|Comedy|Drama|Romance| 5.445059|
|   5915|Victory (a.k.a. E...|    Action|Drama|War|5.4239016|
+-------+--------------------+--------------------+---------+



In [56]:
# movierated function to return movies watched and rated by users
def movierated(Id):
    ratedtitle = ratings.join(movies, on='movieId'). filter(col('userId')==Id)
    return ratedtitle.select('movieId', 'title','genres','rating').show()

In [57]:
# see movied rated by userId 1 to compare with recommended movies
movierated(1)



+-------+--------------------+--------------------+------+
|movieId|               title|              genres|rating|
+-------+--------------------+--------------------+------+
|      1|    Toy Story (1995)|Adventure|Animati...|   4.0|
|      3|Grumpier Old Men ...|      Comedy|Romance|   4.0|
|      6|         Heat (1995)|Action|Crime|Thri...|   4.0|
|     47|Seven (a.k.a. Se7...|    Mystery|Thriller|   5.0|
|     50|Usual Suspects, T...|Crime|Mystery|Thr...|   5.0|
|     70|From Dusk Till Da...|Action|Comedy|Hor...|   3.0|
|    101|Bottle Rocket (1996)|Adventure|Comedy|...|   5.0|
|    110|   Braveheart (1995)|    Action|Drama|War|   4.0|
|    151|      Rob Roy (1995)|Action|Drama|Roma...|   5.0|
|    157|Canadian Bacon (1...|          Comedy|War|   5.0|
|    163|    Desperado (1995)|Action|Romance|We...|   5.0|
|    216|Billy Madison (1995)|              Comedy|   5.0|
|    223|       Clerks (1994)|              Comedy|   3.0|
|    231|Dumb & Dumber (Du...|    Adventure|Comedy|   5.

In [65]:
# save the ALS model
best_model.write().overwrite().save('ALS')