## **INTRODUCTION**

In this part, we will learn how to implement Machine Learning in Movie Recommendation. Let's started!

In [1]:
# install pyspark
!pip install pyspark



In [2]:
# call the dataset
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json
!kaggle datasets download -d shubhammehta21/movie-lens-small-latest-dataset

movie-lens-small-latest-dataset.zip: Skipping, found more recently modified local copy (use --force to force download)


In [3]:
# check our content information
!ls

kaggle.json  movie-lens-small-latest-dataset.zip  ratings.csv  sample_data
links.csv    movies.csv				  README.txt   tags.csv


In [8]:
# unzipping dataset
!unzip "movie-lens-small-latest-dataset.zip"

Archive:  movie-lens-small-latest-dataset.zip
  inflating: README.txt              
  inflating: links.csv               
  inflating: movies.csv              
  inflating: ratings.csv             
  inflating: tags.csv                


In [1]:
# import useful library
import pyspark as ps
from pyspark.sql import SQLContext, Row
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import udf, col, when
import numpy as np

In [2]:
# create spark environment
spark = ps.sql.SparkSession.builder \
            .master("local") \
            .appName("Spark ML") \
            .getOrCreate()

In [3]:
sc = spark.sparkContext
sqlContext = SQLContext(sc)



In [4]:
# read dataset 
sdf_rating = spark.read.csv('ratings.csv',
                            header=True,
                            inferSchema=True)
sdf_rating.drop(col("timestamp")).show(3, False)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|1     |1      |4.0   |
|1     |3      |4.0   |
|1     |6      |4.0   |
+------+-------+------+
only showing top 3 rows



In [5]:
# read dataset 
sdf_movie = spark.read.csv('movies.csv',
                            header=True,
                            inferSchema=True)
sdf_movie.show(3, False)

+-------+-----------------------+-------------------------------------------+
|movieId|title                  |genres                                     |
+-------+-----------------------+-------------------------------------------+
|1      |Toy Story (1995)       |Adventure|Animation|Children|Comedy|Fantasy|
|2      |Jumanji (1995)         |Adventure|Children|Fantasy                 |
|3      |Grumpier Old Men (1995)|Comedy|Romance                             |
+-------+-----------------------+-------------------------------------------+
only showing top 3 rows



## **Modelling phase**

In [6]:
# split dataset
sdf_train, sdf_test = sdf_rating.randomSplit([.8, .2])

In [7]:
# build ALS model
iterations = 10
regularization_parameter = 0.1

# rank 4
als = ALS(maxIter=iterations,
          regParam=regularization_parameter,
          rank=4, userCol='userId',
          itemCol='movieId', ratingCol='rating')

model = als.fit(sdf_train)
pred = model.transform(sdf_test).filter(col('prediction') != np.nan)
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating',
                               predictionCol='prediction')
rmse = evaluator.evaluate(pred)
print("Root-mean-square-error: ", str(rmse))

Root-mean-square-error:  0.8796625524372769


In [8]:
# for rank 4-10
for i in range(4,10):
  als = ALS(maxIter=iterations,
            regParam=regularization_parameter,
            rank=i, userCol='userId', 
            itemCol='movieId', ratingCol='rating')
  model = als.fit(sdf_train)
  pred = model.transform(sdf_test).filter(col('prediction') != np.nan)
  evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating',
                                  predictionCol='prediction')
  rmse = evaluator.evaluate(pred)
  print("Root-mean-square-error with rank {}: {}".format(i, rmse))

Root-mean-square-error with rank 4: 0.8796625524372769
Root-mean-square-error with rank 5: 0.8863303129274225
Root-mean-square-error with rank 6: 0.8806676420787486
Root-mean-square-error with rank 7: 0.8844390708536144
Root-mean-square-error with rank 8: 0.8805893517832818
Root-mean-square-error with rank 9: 0.8864129454188683


In [10]:
# the best rank
rank = 9

In [11]:
# find-out best model using CrossValidator
als = ALS(maxIter=iterations, regParam=regularization_parameter,
          rank=rank, userCol="userId", itemCol="movieId", 
          ratingCol="rating")
paramGrid = ParamGridBuilder() \
    .addGrid(als.regParam, [0.1, 0.01, 0.18]) \
    .addGrid(als.rank, range(4, 6)) \
    .build()
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", 
                                predictionCol="prediction")
crossval = CrossValidator(estimator=als,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)
cvModel = crossval.fit(sdf_train)

In [14]:
# the best algoritma
cvModel_pred = cvModel.transform(sdf_test).filter(col("prediction") != np.nan)
rmse = evaluator.evaluate(cvModel_pred)
print("The optimal RMSE with cross validation is: {}".format(rmse))

The optimal RMSE with cross validation is: 0.8796625524372769


In [15]:
final_als = ALS(maxIter=10, regParam=0.1, rank=4, 
                userCol="userId", itemCol="movieId", 
                ratingCol="rating")
final_model = final_als.fit(sdf_train)

In [16]:
preds = final_model.transform(sdf_test)
preds.show(5)

+------+-------+------+---------+----------+
|userId|movieId|rating|timestamp|prediction|
+------+-------+------+---------+----------+
|     1|     47|   5.0|964983815| 4.7784996|
|     1|    260|   5.0|964981680|  5.041264|
|     1|    296|   3.0|964982967| 5.1031036|
|     1|    596|   5.0|964982838| 3.8803313|
|     1|    923|   5.0|964981529|  4.670707|
+------+-------+------+---------+----------+
only showing top 5 rows



In [21]:
preds.join(sdf_movie, "movieId")\
          .select("userId","title","genres","prediction").show(5)

+------+--------------------+--------------------+----------+
|userId|               title|              genres|prediction|
+------+--------------------+--------------------+----------+
|   148| Moulin Rouge (2001)|Drama|Musical|Rom...| 3.2637208|
|   148| Finding Nemo (2003)|Adventure|Animati...| 3.8192651|
|   148|Lord of the Rings...|Action|Adventure|...|  3.796996|
|   148|Harry Potter and ...|Adventure|Fantasy...| 3.8075116|
|   148|Harry Potter and ...|Adventure|Fantasy...| 3.5156643|
+------+--------------------+--------------------+----------+
only showing top 5 rows



In [22]:
sdf_links = spark.read.csv("links.csv", 
                           header=True,
                           inferSchema=True)
sdf_links.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- imdbId: integer (nullable = true)
 |-- tmdbId: integer (nullable = true)



In [25]:
for_one_user = preds\
                    .filter(col("userId")==450)\
                    .join(sdf_movie, "movieId")\
                    .join(sdf_links,"movieId")\
                    .select("userId","title","genres","tmdbId","prediction")
for_one_user.show(5)

+------+--------------------+--------------------+------+----------+
|userId|               title|              genres|tmdbId|prediction|
+------+--------------------+--------------------+------+----------+
|   450|Star Wars: Episod...|Action|Adventure|...|    11|  4.663552|
|   450|Higher Learning (...|               Drama| 16295|  2.924056|
|   450|Nightmare Before ...|Animation|Childre...|  9479| 3.9417827|
|   450|Boogie Nights (1997)|               Drama|  4995|  4.456446|
|   450|Nightmare on Elm ...|              Horror| 10160|   2.48462|
+------+--------------------+--------------------+------+----------+
only showing top 5 rows



In [27]:
import webbrowser
link = "https://wwww.themoviedb.org/movie/"
for movie in for_one_user.take(3):
  movieUrl = link + str(movie.tmdbId)
  print(movie.title)
  print(movie.tmdbId)
  webbrowser.open(movieUrl)

Star Wars: Episode IV - A New Hope (1977)
11
Higher Learning (1995)
16295
Nightmare Before Christmas, The (1993)
9479


In [None]:
# generate top 5 movies for each user
userRecomments = final_model.recommendForAllUsers(5)
# generate top 5 user recommendations for each movie
movieRecomments = final_model.recommendForAllItems(5)

In [33]:
userRecomments.printSchema()

root
 |-- userId: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- movieId: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



In [31]:
userRecomments.select("userId","recommendations.movieId").show(5, False)

+------+----------------------------------+
|userId|movieId                           |
+------+----------------------------------+
|1     |[7842, 7121, 3379, 33649, 92494]  |
|2     |[7842, 33649, 3379, 131724, 26073]|
|3     |[70946, 8138, 61350, 26865, 430]  |
|4     |[4442, 5034, 25825, 26171, 5485]  |
|5     |[5490, 5915, 7121, 92494, 102217] |
+------+----------------------------------+
only showing top 5 rows



In [32]:
movieRecomments.select("movieId","recommendations.userId").show(5, False)

+-------+------------------------+
|movieId|userId                  |
+-------+------------------------+
|1      |[53, 43, 543, 452, 413] |
|2      |[53, 43, 543, 413, 554] |
|3      |[53, 543, 43, 452, 578] |
|4      |[543, 594, 43, 554, 584]|
|5      |[543, 53, 43, 413, 267] |
+-------+------------------------+
only showing top 5 rows



In [34]:
# generate top 10 movie recommendations for a specified set of users
users = sdf_rating.select("userId").distinct().limit(3);
users.show()

+------+
|userId|
+------+
|   148|
|   463|
|   471|
+------+



# **THANK YOU**