## Movie Recommendation System

### Imports

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import math
import seaborn as sns
import pyspark
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder

In [None]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 37 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 67.5 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=62e9cbb7634b8a5dd1b7ee3973b16fcc848134c3dba1346a48cfef136d5111cb
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


### Spark Session

In [None]:
conf = pyspark.SparkConf()
sc = pyspark.SparkContext(conf=conf)
sc.setLogLevel('OFF')
spark = pyspark.SQLContext.getOrCreate(sc)
from pyspark.sql.functions import *



### Data Imports

In [None]:
movies = spark.read.option("inferSchema", "true").option("header", "true").csv("movies.csv")
ratings = spark.read.option("inferSchema", "true").option("header", "true").csv("ratings.csv")

### Temp Views for queries on the data

In [None]:
movies.createOrReplaceTempView("movies_data")
ratings.createOrReplaceTempView("ratings_data")

### Data Exploration

In [None]:
movies.show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sen

In [None]:
ratings.show()

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    307|   3.5|1256677221|
|     1|    481|   3.5|1256677456|
|     1|   1091|   1.5|1256677471|
|     1|   1257|   4.5|1256677460|
|     1|   1449|   4.5|1256677264|
|     1|   1590|   2.5|1256677236|
|     1|   1591|   1.5|1256677475|
|     1|   2134|   4.5|1256677464|
|     1|   2478|   4.0|1256677239|
|     1|   2840|   3.0|1256677500|
|     1|   2986|   2.5|1256677496|
|     1|   3020|   4.0|1256677260|
|     1|   3424|   4.5|1256677444|
|     1|   3698|   3.5|1256677243|
|     1|   3826|   2.0|1256677210|
|     1|   3893|   3.5|1256677486|
|     2|    170|   3.5|1192913581|
|     2|    849|   3.5|1192913537|
|     2|   1186|   3.5|1192913611|
|     2|   1235|   3.0|1192913585|
+------+-------+------+----------+
only showing top 20 rows



In [None]:
#Number of Users in the data
q1=spark.sql("""select count(distinct userId) as Number_of_users from ratings_data""")
q1.show()



+---------------+
|Number_of_users|
+---------------+
|         283228|
+---------------+



                                                                                

In [None]:
#Number of Movies in the data
q2=spark.sql("""select count(*) as Number_of_movies from movies_data""")
q2.show()

+----------------+
|Number_of_movies|
+----------------+
|           58098|
+----------------+



In [None]:
#User Rating Range
q3=spark.sql("""select min(rating) as Min_Rating , max(rating) as Max_Rating from ratings_data""")
q3.show()



+----------+----------+
|Min_Rating|Max_Rating|
+----------+----------+
|       0.5|       5.0|
+----------+----------+



                                                                                

In [None]:
#Movies which have no rating
q4=spark.sql("""select 58098-count(distinct movieId)  as Num_movies_with_no_rating from ratings_data""")
q4.show()



+-------------------------+
|Num_movies_with_no_rating|
+-------------------------+
|                     4209|
+-------------------------+



                                                                                

In [None]:
#List of such movies which don't have a rating
q5=spark.sql("""select movies_data.title, movies_data.genres ,ratings_data.rating 
    from movies_data left JOIN ratings_data 
    on ratings_data.movieId = movies_data.movieID 
    where ratings_data.rating IS null LIMIT 10""")
q5.show()



+--------------------+--------------------+------+
|               title|              genres|rating|
+--------------------+--------------------+------+
|Baby Blue Marine ...|               Drama|  null|
|Wide Open Spaces ...|    Animation|Comedy|  null|
|Ashes and Blood (...|               Drama|  null|
|Music in the Air ...|Comedy|Musical|Ro...|  null|
|      Muggers (2000)|              Comedy|  null|
|Tarzan's Magic Fo...|Action|Adventure|...|  null|
| Black August (2007)|               Drama|  null|
|Steel of Fire War...|             Fantasy|  null|
| Yellow Fangs (1990)|    Action|Adventure|  null|
| Yes, Giorgio (1982)|Comedy|Musical|Ro...|  null|
+--------------------+--------------------+------+



                                                                                

In [None]:
#List of movies which have a rating 5 by most users
#Popular Movies
q6=spark.sql("""select ratings_data.movieId ,movies_data.title, count(*) as Num_users_rated_5
            from ratings_data , movies_data
            where ratings_data.movieId=movies_data.movieId
            and rating=5
            group by ratings_data.movieId , movies_data.title
            order by Num_users_rated_5 desc""")
q6.show(truncate=False)



+-------+------------------------------------------------------------------------------+-----------------+
|movieId|title                                                                         |Num_users_rated_5|
+-------+------------------------------------------------------------------------------+-----------------+
|318    |Shawshank Redemption, The (1994)                                              |48762            |
|296    |Pulp Fiction (1994)                                                           |37458            |
|356    |Forrest Gump (1994)                                                           |32009            |
|260    |Star Wars: Episode IV - A New Hope (1977)                                     |31385            |
|593    |Silence of the Lambs, The (1991)                                              |30280            |
|527    |Schindler's List (1993)                                                       |30194            |
|2571   |Matrix, The (1999)          

                                                                                

### Cleaning the Data

In [None]:
#Cleaning Genre in Movies
extract_genres = udf(lambda x: x.split("|"), ArrayType(StringType()))
movies_clean = movies.select("movieId", "title", extract_genres("genres").alias("genres"))

In [None]:
movies_clean.createOrReplaceTempView("movies_clean_data")
q7=spark.sql("select * from movies_clean_data limit 10")
q7.show(truncate=False)

+-------+----------------------------------+-------------------------------------------------+
|movieId|title                             |genres                                           |
+-------+----------------------------------+-------------------------------------------------+
|1      |Toy Story (1995)                  |[Adventure, Animation, Children, Comedy, Fantasy]|
|2      |Jumanji (1995)                    |[Adventure, Children, Fantasy]                   |
|3      |Grumpier Old Men (1995)           |[Comedy, Romance]                                |
|4      |Waiting to Exhale (1995)          |[Comedy, Drama, Romance]                         |
|5      |Father of the Bride Part II (1995)|[Comedy]                                         |
|6      |Heat (1995)                       |[Action, Crime, Thriller]                        |
|7      |Sabrina (1995)                    |[Comedy, Romance]                                |
|8      |Tom and Huck (1995)               |[Adven

In [None]:
#Removing Timestamp from ratings as there is no significance
ratings=ratings.drop('timestamp')
ratings.createOrReplaceTempView("ratings_data")
q8=spark.sql("select * from ratings_data limit 10")
q8.show(truncate=False)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|1     |307    |3.5   |
|1     |481    |3.5   |
|1     |1091   |1.5   |
|1     |1257   |4.5   |
|1     |1449   |4.5   |
|1     |1590   |2.5   |
|1     |1591   |1.5   |
|1     |2134   |4.5   |
|1     |2478   |4.0   |
|1     |2840   |3.0   |
+------+-------+------+



In [None]:
#Since we are using SparkML models we need to convert the datatypes beforehand
ratings = ratings.withColumn("userId", ratings["userId"].cast(IntegerType()))
ratings = ratings.withColumn("movieId", ratings["movieId"].cast(IntegerType()))
ratings = ratings.withColumn("rating", ratings["rating"].cast(FloatType()))

### Recommendation model using ALS (Alternating Least Square)

In [None]:
#Splitting Train and Test Data
(training,test)=ratings.randomSplit([0.8,0.2])

In [None]:
#Base ALS Model
als = ALS(maxIter=5, rank=10, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")

In [None]:
#Paramgrid for testing various models
paramGrid = (ParamGridBuilder()
             .addGrid(als.regParam, [0.01,0.05])
             .addGrid(als.rank, [10,15])
             .addGrid(als.maxIter, [10,15])
             .build())

In [None]:
#Evaluator to calculate rmse
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")

In [None]:
#CroosValidation to choose the best model
from pyspark.ml.tuning import CrossValidator
cv = CrossValidator(estimator=als, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=2)
cvModel = cv.fit(training)



In [None]:
#Parameters of the best choosen model by the Cross Validator
best_model = cvModel.bestModel
print ("Best Model Parameters")
print ("Rank: ", best_model)
print (" MaxIter: ", str(best_model._java_obj.parent().getMaxIter()))
print (" RegParam:",  best_model._java_obj.parent().regParam())

Best Model Parameters
Rank:  ALSModel: uid=ALS_5cd0ffd49dbc, rank=10
 MaxIter:  15
 RegParam: ALS_5cd0ffd49dbc__regParam


In [None]:
#Building the best ALS Model and training 
als = ALS(maxIter=15, rank=10, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(training)

In [None]:
#Predictions on Testing Data
predictions=model.transform(test)
rmse = evaluator.evaluate(predictions)

In [None]:
#RMSE
rmse

0.8306759698070435

In [None]:
#Viewing Predictions
predictions.show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|     1|   1449|   4.5|  4.509094|
|     1|   1590|   2.5|  2.297095|
|     1|   3698|   3.5| 3.3633208|
|     3|    640|   3.0| 3.5243711|
|     3|    828|   4.0|  3.177774|
|     3|   1321|   4.0| 2.9146967|
|     5|     47|   4.0| 4.3403435|
|     5|    318|   5.0| 4.5279922|
|     5|    527|   4.5|  4.076883|
|     5|   1186|   2.0| 3.3310359|
|     5|   1213|   5.0|  4.334006|
|     5|   1222|   5.0|  4.186054|
|     5|   2060|   3.5| 2.8551536|
|     5|   2329|   4.5|  4.431355|
|     5|   2959|   4.5| 4.7137094|
|     5|   4973|   5.0| 4.5480027|
|     5|   6016|   5.0|  4.666273|
|     5|   8784|   4.5|  4.562799|
|     5|   8950|   4.5|   4.17676|
|     5|  44204|   4.0| 3.9779193|
+------+-------+------+----------+
only showing top 20 rows



In [None]:
#Movies Recommendation for User
user_recmds = model.recommendForAllUsers(10)
user_recmds.createOrReplaceTempView("user_recmds_data")



In [None]:
#Recommendation Dataframe
q9=spark.sql("select * from user_recmds_data limit 20")
q9.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|    12|[{129358, 6.98660...|
|    22|[{132490, 11.5450...|
|    26|[{147471, 11.2547...|
|    27|[{152711, 14.4529...|
|    28|[{6149, 12.533043...|
|    31|[{152711, 10.8524...|
|    34|[{59905, 11.41351...|
|    44|[{184795, 18.1898...|
|    47|[{154624, 8.93109...|
|    53|[{151795, 8.37773...|
|    65|[{172833, 21.6217...|
|    76|[{102369, 10.8991...|
|    78|[{8876, 12.162045...|
|    81|[{185211, 8.31143...|
|    85|[{133743, 16.0921...|
|    91|[{124519, 16.9268...|
|    93|[{92083, 12.23409...|
|   101|[{152711, 10.4689...|
|   103|[{58207, 10.45387...|
|   108|[{152711, 13.9579...|
+------+--------------------+



In [None]:
#Converting to Pandas
user_recmds_p=user_recmds.toPandas()

In [None]:
user_recmds_p

Unnamed: 0,userId,recommendations
0,12,"[(129358, 6.986605644226074), (163060, 6.92760..."
1,22,"[(132490, 11.545042991638184), (8876, 11.49709..."
2,26,"[(147471, 11.254745483398438), (27771, 10.6584..."
3,27,"[(152711, 14.452960968017578), (128536, 12.421..."
4,28,"[(6149, 12.533042907714844), (155115, 12.10838..."
...,...,...
281918,283216,"[(100289, 15.648998260498047), (93061, 15.3358..."
281919,283217,"[(124131, 11.295330047607422), (59905, 11.2243..."
281920,283218,"[(126963, 7.237307071685791), (159779, 6.96765..."
281921,283225,"[(152711, 7.781731605529785), (69429, 7.024081..."


In [None]:
#Function for extracting movie recommendations for a user
def get_movies_for_user(user_id):
  for i in range(len(user_recmds_p)):
    if(user_recmds_p.iloc[i]['userId']==user_id):
      return user_recmds_p.iloc[i]['recommendations']
    if(user_recmds_p.iloc[i]['userId']>user_id):
      return "UserId doesn't exist"


In [None]:
#Test1
get_movies_for_user(12)

[Row(movieId=129358, rating=6.986605644226074),
 Row(movieId=163060, rating=6.927602291107178),
 Row(movieId=94101, rating=6.897332668304443),
 Row(movieId=108770, rating=6.844421863555908),
 Row(movieId=145787, rating=6.814711570739746),
 Row(movieId=140196, rating=6.728077411651611),
 Row(movieId=133087, rating=6.563270092010498),
 Row(movieId=167526, rating=6.368298530578613),
 Row(movieId=131554, rating=6.300866603851318),
 Row(movieId=134079, rating=6.213343143463135)]

In [None]:
#Test2 
get_movies_for_user(10177)

[Row(movieId=153010, rating=7.355489730834961),
 Row(movieId=117362, rating=7.297168731689453),
 Row(movieId=99901, rating=7.178287506103516),
 Row(movieId=116507, rating=7.104884147644043),
 Row(movieId=185211, rating=7.053183078765869),
 Row(movieId=116847, rating=6.961415767669678),
 Row(movieId=154624, rating=6.933083534240723),
 Row(movieId=60983, rating=6.888068675994873),
 Row(movieId=153018, rating=6.619941234588623),
 Row(movieId=142677, rating=6.582759857177734)]

In [None]:
#Users Recommendation for Movie
movie_recmds = model.recommendForAllItems(10)
movie_recmds.createOrReplaceTempView("movie_recmds_data")



In [None]:
#Recommendation Dataframe
q10=spark.sql("select * from movie_recmds_data limit 20")
q10.show()

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|      1|[{139001, 10.0706...|
|      3|[{125354, 7.75680...|
|      5|[{108306, 7.29449...|
|      6|[{173843, 8.38528...|
|      9|[{140941, 7.91609...|
|     12|[{169403, 10.6502...|
|     13|[{133450, 9.28163...|
|     15|[{181384, 8.27760...|
|     16|[{270297, 9.14374...|
|     17|[{38421, 9.039085...|
|     19|[{200417, 10.8934...|
|     20|[{37662, 7.93781}...|
|     22|[{95602, 8.120821...|
|     26|[{145572, 6.97089...|
|     27|[{243243, 8.84399...|
|     28|[{58298, 9.086924...|
|     31|[{34976, 8.531595...|
|     34|[{190613, 12.1108...|
|     35|[{281956, 9.67580...|
|     37|[{243243, 8.93157...|
+-------+--------------------+



In [None]:
#Converting to Pandas
movie_recmds_p=movie_recmds.toPandas()

In [None]:
#Function for extracting user recommendations for a movie
def get_users_for_movie(movie_id):
  for i in range(len(movie_recmds_p)):
    if(movie_recmds_p.iloc[i]['movieId']==movie_id):
      return movie_recmds_p.iloc[i]['recommendations']
    if(movie_recmds_p.iloc[i]['movieId']>movie_id):
      return "MovieId doesn't exist"

In [None]:
#Test 1
get_users_for_movie(20)

[Row(userId=37662, rating=7.937809944152832),
 Row(userId=111118, rating=7.344248294830322),
 Row(userId=266729, rating=7.266976356506348),
 Row(userId=266673, rating=7.121130466461182),
 Row(userId=95602, rating=7.106856822967529),
 Row(userId=238903, rating=7.074843406677246),
 Row(userId=201400, rating=6.959606170654297),
 Row(userId=34976, rating=6.878569602966309),
 Row(userId=240636, rating=6.811020851135254),
 Row(userId=61191, rating=6.656955242156982)]

In [None]:
#Test 2
get_users_for_movie(34)

[Row(userId=190613, rating=12.110831260681152),
 Row(userId=139001, rating=10.300350189208984),
 Row(userId=210658, rating=9.101146697998047),
 Row(userId=146315, rating=9.072606086730957),
 Row(userId=266946, rating=8.844446182250977),
 Row(userId=238009, rating=8.80825138092041),
 Row(userId=38421, rating=8.797003746032715),
 Row(userId=214123, rating=8.792547225952148),
 Row(userId=50940, rating=8.76793384552002),
 Row(userId=82786, rating=8.762255668640137)]