<a href="https://colab.research.google.com/github/MandyZhangxy/movie-recommendation/blob/master/Movie_Recommendation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Movie Recommendation Project**
In this project, I will use an Alternating Least Squares (ALS) algorithm with Spark APIs to predict the ratings for the movies in [MovieLens small dataset](https://grouplens.org/datasets/movielens/latest/)

## **Running Pyspark in Colab**

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

## **Setting up environment**

In [4]:
%matplotlib inline
import os

import numpy as np
import pandas as pd
import seaborn as sns

# from nba_utils import draw_3pt_piechart,plot_shot_chart

from IPython.core.display import display, HTML
from IPython.core.magic import register_cell_magic, register_line_cell_magic, register_line_magic
from matplotlib import pyplot as plt
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import array, col, count, mean, sum, udf, when
from pyspark.sql.types import DoubleType, IntegerType, StringType, Row
from pyspark.sql.functions import sum, col, udf

import warnings
warnings.filterwarnings("ignore")

sns.set_style("white")
sns.set_color_codes()

  import pandas.util.testing as tm


In [5]:
!ls

sample_data  spark-2.4.5-bin-hadoop2.7	spark-2.4.5-bin-hadoop2.7.tgz


In [0]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("moive analysis") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

### **Data ETL and Data Exploration**

In [8]:
from google.colab import files
files.upload()

movies_df = spark.read.load("movies.csv", format="csv",header = True)
ratings_df = spark.read.load("ratings.csv", format = "csv", header = True)
links_df = spark.read.load("ratings.csv", format = "csv", header = True)
tags_df = spark.read.load("tags.csv", format = "csv", header = True)

Saving links.csv to links.csv
Saving movies.csv to movies.csv
Saving ratings.csv to ratings.csv
Saving tags.csv to tags.csv


In [9]:
!ls

links.csv   ratings.csv  spark-2.4.5-bin-hadoop2.7	tags.csv
movies.csv  sample_data  spark-2.4.5-bin-hadoop2.7.tgz


In [10]:
movies_df.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [11]:
ratings_df.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [12]:
links_df.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [13]:
tags_df.show(5)

+------+-------+---------------+----------+
|userId|movieId|            tag| timestamp|
+------+-------+---------------+----------+
|     2|  60756|          funny|1445714994|
|     2|  60756|Highly quotable|1445714996|
|     2|  60756|   will ferrell|1445714992|
|     2|  89774|   Boxing story|1445715207|
|     2|  89774|            MMA|1445715200|
+------+-------+---------------+----------+
only showing top 5 rows



In [14]:
ratings_df.groupBy("userID").count().show()

+------+-----+
|userID|count|
+------+-----+
|   296|   27|
|   467|   22|
|   125|  360|
|   451|   34|
|     7|  152|
|    51|  359|
|   124|   50|
|   447|   78|
|   591|   54|
|   307|  975|
|   475|  155|
|   574|   23|
|   169|  269|
|   205|   27|
|   334|  154|
|   544|   22|
|   577|  161|
|   581|   40|
|   272|   31|
|   442|   20|
+------+-----+
only showing top 20 rows



In [15]:
tmp1 = ratings_df.groupBy("userID").count().toPandas()['count'].min()
tmp2 = ratings_df.groupBy("movieId").count().toPandas()['count'].min()
print('For the users that rated movies and the movies that were rated:')
print('Minimum number of ratings per user is {}'.format(tmp1))
print('Minimum number of ratings per movie is {}'.format(tmp2))

For the users that rated movies and the movies that were rated:
Minimum number of ratings per user is 20
Minimum number of ratings per movie is 1


In [16]:
tmp1 = ratings_df.groupBy("movieId").count().toPandas()['count']==1
tmp2 = ratings_df.select('movieId').distinct().count()
print('{} out of {} movies are rated by only one user'.format(tmp1.sum(), tmp2))

3446 out of 9724 movies are rated by only one user


## **Spark SQL and OLAP**

### Register the DataFrame as a local temporary view

In [0]:
movies_df.registerTempTable("movies")
ratings_df.registerTempTable("ratings")
links_df.registerTempTable("links")
tags_df.registerTempTable("tags")

### The number of Users


In [18]:
spark.sql(
"select count(distinct userId) as total_users from ratings"
).show()

+-----------+
|total_users|
+-----------+
|        610|
+-----------+



### The number of Movies

In [19]:
spark.sql(
    "select count(distinct movieId) as total_movies from movies"

).show()

+------------+
|total_movies|
+------------+
|        9742|
+------------+



### numbers of movies that are rated by users:

In [20]:
spark.sql(
    '''
    with t as 
    (select m.movieId, title, genres,userId, rating from 
    movies m left join ratings r
    on m.movieId = r.movieId)

    select count(distinct movieId) 
    from t
    where rating is not null
    '''
).show()

+-----------------------+
|count(DISTINCT movieId)|
+-----------------------+
|                   9724|
+-----------------------+



### Movies not rated before:

In [21]:
spark.sql(
    '''
    with t as 
    (select m.movieId, title, genres,userId, rating from 
    movies m left join ratings r
    on m.movieId = r.movieId)

    select movieId, title,genres
    from t
    where rating is null
    group by 1,2,3
    '''
).show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|   4194|I Know Where I'm ...|   Drama|Romance|War|
|   1076|Innocents, The (1...|Drama|Horror|Thri...|
|  30892|In the Realms of ...|Animation|Documen...|
|  26085|Mutiny on the Bou...|Adventure|Drama|R...|
|   5721|  Chosen, The (1981)|               Drama|
|  32160|Twentieth Century...|              Comedy|
|   2939|      Niagara (1953)|      Drama|Thriller|
|  25855|Roaring Twenties,...|Crime|Drama|Thriller|
|  32371|Call Northside 77...|Crime|Drama|Film-...|
|   6849|      Scrooge (1970)|Drama|Fantasy|Mus...|
|   3338|For All Mankind (...|         Documentary|
|  85565|  Chalet Girl (2011)|      Comedy|Romance|
|   7792|Parallax View, Th...|            Thriller|
|   6668|Road Home, The (W...|       Drama|Romance|
|   3456|Color of Paradise...|               Drama|
|   7020|        Proof (1991)|Comedy|Drama|Romance|
|   8765|Thi

### List All Movie Genres

In [22]:
spark.sql(
    """
    select movie_genres from movies
    lateral view explode(split(genres, '[|]')) as movie_genres 
    where movie_genres <> "(no genres listed)"
    group by 1
    order by 1
    """
).show()

+------------+
|movie_genres|
+------------+
|      Action|
|   Adventure|
|   Animation|
|    Children|
|      Comedy|
|       Crime|
| Documentary|
|       Drama|
|     Fantasy|
|   Film-Noir|
|      Horror|
|        IMAX|
|     Musical|
|     Mystery|
|     Romance|
|      Sci-Fi|
|    Thriller|
|         War|
|     Western|
+------------+



### Count Movie for Each Category

In [23]:
spark.sql(
    """
select movie_genres,count(*) as total_movies from movies
lateral view explode(split(genres,'[|]')) as movie_genres
group by 1
""").show()

+------------------+------------+
|      movie_genres|total_movies|
+------------------+------------+
|             Crime|        1199|
|           Romance|        1596|
|          Thriller|        1894|
|         Adventure|        1263|
|             Drama|        4361|
|               War|         382|
|       Documentary|         440|
|           Fantasy|         779|
|           Mystery|         573|
|           Musical|         334|
|         Animation|         611|
|         Film-Noir|          87|
|(no genres listed)|          34|
|              IMAX|         158|
|            Horror|         978|
|           Western|         167|
|            Comedy|        3756|
|          Children|         664|
|            Action|        1828|
|            Sci-Fi|         980|
+------------------+------------+



### List all movie names in each category

In [24]:
spark.sql(
    """
    select t1.movie_genres, concat_ws("|",collect_set(t1.title)) as list_of_movies
    from
    (
    select title,movie_genres from movies
    lateral view explode(split(genres, '[|]')) as movie_genres
    group by 1,2
    ) t1
    group by 1    
    """
).show()

+------------------+--------------------+
|      movie_genres|      list_of_movies|
+------------------+--------------------+
|             Crime|Stealing Rembrand...|
|           Romance|Vampire in Brookl...|
|          Thriller|Element of Crime,...|
|         Adventure|Ice Age: Collisio...|
|             Drama|Airport '77 (1977...|
|               War|General, The (192...|
|       Documentary|The Barkley Marat...|
|           Fantasy|Masters of the Un...|
|           Mystery|Before and After ...|
|           Musical|U2: Rattle and Hu...|
|         Animation|Ice Age: Collisio...|
|         Film-Noir|Rififi (Du rififi...|
|(no genres listed)|T2 3-D: Battle Ac...|
|              IMAX|Harry Potter and ...|
|            Horror|Sweeney Todd (200...|
|           Western|Man Who Shot Libe...|
|            Comedy|Hysteria (2011)|H...|
|          Children|Ice Age: Collisio...|
|            Action|Stealing Rembrand...|
|            Sci-Fi|Push (2009)|SORI:...|
+------------------+--------------

## **Spark ALS based approach for training model**
using an Spark ML to predict the ratings

ALS (or matrix factorization algorithms) can solve three major problem is user-item matrix:
* **Popularity bias**: refers to system recommends the movies with the most interactions without any personalization.
* **Item cold-start problem**: refers to when movies added to the catalogue have either none or very little interactions while recommender rely on the movie’s interactions to make recommendations.
* **Scalability issue**: refers to lack of the ability to scale to much larger sets of data when more and more users and movies added into our database.


matrix factorization algorithms work by decomposing the user-item interaction matrix into the product of two lower dimensionality rectangular matrices. One matrix can be seen as the user matrix where rows represent users and columns are latent factors. The other matrix is the item matrix where rows are latent factors and columns represent items.

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.ml.tuning import TrainValidationSplit,ParamGridBuilder

In [26]:
ratings_df.head(5)

[Row(userId='1', movieId='1', rating='4.0', timestamp='964982703'),
 Row(userId='1', movieId='3', rating='4.0', timestamp='964981247'),
 Row(userId='1', movieId='6', rating='4.0', timestamp='964982224'),
 Row(userId='1', movieId='47', rating='5.0', timestamp='964983815'),
 Row(userId='1', movieId='50', rating='5.0', timestamp='964982931')]

In [27]:
ratings_df.printSchema()

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [0]:
# convert data type in each column
ratings_df=ratings_df.withColumn("rating",ratings_df["rating"].cast("float").alias("rating"))
ratings_df=ratings_df.withColumn("userId",ratings_df["userId"].cast("float").alias("userId"))
ratings_df=ratings_df.withColumn("movieId",ratings_df["movieId"].cast("float").alias("movieId"))

In [29]:
ratings_df.printSchema()

root
 |-- userId: float (nullable = true)
 |-- movieId: float (nullable = true)
 |-- rating: float (nullable = true)
 |-- timestamp: string (nullable = true)



In [0]:
# split into traning and testing dataset 
(training, test) = ratings_df.randomSplit([0.8, 0.2],seed = 1)

In [0]:
# Create ALS model
als = ALS(userCol = "userId", itemCol="movieId", ratingCol="rating",coldStartStrategy="drop", nonnegative=True)

In [0]:
# Tune model using ParamGridBuilder
param_grid = ParamGridBuilder().addGrid(als.rank, [8,10,12]).addGrid(als.maxIter, [10,13,16]).addGrid(als.regParam,[0.1,0.15,0.2]).build()


In [0]:
# Define evaluator as RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

In [0]:
# build cross validation using TranValidationSplit:
tvs = TrainValidationSplit(
    estimator = als,
    estimatorParamMaps=param_grid,
    evaluator = evaluator
)

In [0]:
# Fit ALS model to training data
model = tvs.fit(training)

In [0]:
# Extract best model from the tuning exercise using ParamGridBuilder
best_model = model.bestModel

### Model Performance Review

In [38]:
# Generate predictions and evaluate using RMSE
predictions = best_model.transform(test)
rmse = evaluator.evaluate(predictions)

# Print evaluation metrics and model paramters
print("RMSE = " + str(rmse))
print("**Best Model**")
print(" Rank:", best_model.rank)
print(" Max Iteration:", best_model._java_obj.parent().getMaxIter())
print(" Regularization Prameter:", best_model._java_obj.parent().getRegParam())

RMSE = 0.8683452447775787
**Best Model**
 Rank: 8
 Max Iteration: 16
 Regularization Prameter: 0.15


In [39]:
predictions.sort("userId","rating").show()

+------+-------+------+---------+----------+
|userId|movieId|rating|timestamp|prediction|
+------+-------+------+---------+----------+
|   1.0| 2253.0|   2.0|964981775|  2.810391|
|   1.0| 2093.0|   3.0|964981710| 3.8446262|
|   1.0| 1580.0|   3.0|964981125| 4.0631766|
|   1.0| 2528.0|   3.0|964982328| 3.7057185|
|   1.0| 1920.0|   4.0|964981780|  3.663424|
|   1.0| 2143.0|   4.0|964981725| 3.5299761|
|   1.0|  356.0|   4.0|964980962|  4.957389|
|   1.0| 2268.0|   4.0|964982989| 4.4709234|
|   1.0| 2000.0|   4.0|964982211|  4.498873|
|   1.0| 2174.0|   4.0|964981680|  4.363098|
|   1.0| 3479.0|   4.0|964981725| 3.8514338|
|   1.0| 2012.0|   4.0|964984176|  4.112434|
|   1.0| 2193.0|   4.0|964981710| 3.9646702|
|   1.0|  733.0|   4.0|964982400| 4.0464654|
|   1.0| 2797.0|   4.0|964981710|  4.580347|
|   1.0|  590.0|   4.0|964982546| 4.1709065|
|   1.0| 2105.0|   4.0|964981725| 3.7055383|
|   1.0|  441.0|   4.0|964980868| 4.7324114|
|   1.0|  804.0|   4.0|964980499| 3.7447224|
|   1.0| 1

### Integrate all movie details with prediction

In [49]:
# predictions of all movies
all_ratings = best_model.transform(ratings_df)
all_ratings.sort("userId","rating").show(5)

+------+-------+------+---------+----------+
|userId|movieId|rating|timestamp|prediction|
+------+-------+------+---------+----------+
|   1.0| 3176.0|   1.0|964983504| 3.6588602|
|   1.0| 2617.0|   2.0|964982588|  3.573943|
|   1.0| 2389.0|   2.0|964983094| 2.1139872|
|   1.0| 2253.0|   2.0|964981775|  2.810391|
|   1.0| 1219.0|   2.0|964983393|  4.621076|
+------+-------+------+---------+----------+
only showing top 5 rows



In [55]:
# combine all tables together
all_ratings.registerTempTable("movie_ratings")
output = spark.sql("""
select m.movieId,m.title, m.genres, mr.userId, mr.rating, mr.prediction as predicted_rating from movie_ratings mr
join movies m
on m.movieId = mr.movieId
order by 1
""")
output.show(5)

+-------+----------------+--------------------+------+------+----------------+
|movieId|           title|              genres|userId|rating|predicted_rating|
+-------+----------------+--------------------+------+------+----------------+
|      1|Toy Story (1995)|Adventure|Animati...| 580.0|   3.0|       3.3192892|
|      1|Toy Story (1995)|Adventure|Animati...| 451.0|   5.0|        4.193929|
|      1|Toy Story (1995)|Adventure|Animati...| 137.0|   4.0|       3.5978155|
|      1|Toy Story (1995)|Adventure|Animati...|  31.0|   5.0|        4.268749|
|      1|Toy Story (1995)|Adventure|Animati...| 471.0|   5.0|       3.2545033|
+-------+----------------+--------------------+------+------+----------------+
only showing top 5 rows



### Recommend Movie to Users with User Id 575, 232

In [0]:
# recommend top 10 movies for each user:
user_recs = best_model.recommendForAllUsers(10)

In [0]:
# helper function to get recommendation for single user
def get_recs_for_user(recs,user_id):
  # recs should be for a specific user
  recs = recs.filter(recs.userId == user_id)
  recs = recs.select("recommendations.movieId","recommendations.rating")
  movies = recs.select("movieId").toPandas().iloc[0,0]
  ratings = recs.select("rating").toPandas().iloc[0,0]
  ratings_matrix = pd.DataFrame(movies, columns = ["movieId"])
  ratings_matrix["ratings"] = ratings
  ratings_matrix_ps = spark.createDataFrame(ratings_matrix)
  ratings_matrix_ps.registerTempTable("rec_user_id")
  output = spark.sql("""
  select m.movieId, m.title, m.genres, mr.ratings as predicted_rating 
  from rec_user_id mr
  join movies m
  on m.movieId = mr.movieId
  order by 1
  """)
  return output

In [70]:
rec_575 = get_recs_for_user(user_recs, 575)
rec_575.show()

+-------+--------------------+--------------------+-----------------+
|movieId|               title|              genres| predicted_rating|
+-------+--------------------+--------------------+-----------------+
|  25947|Unfaithfully Your...|              Comedy|5.152613639831543|
|  26326|Holy Mountain, Th...|               Drama|5.103254795074463|
|  27555|        Fubar (2002)|              Comedy|5.154544830322266|
|  33649|  Saving Face (2004)|Comedy|Drama|Romance|5.230558395385742|
|  40491|Match Factory Gir...|        Comedy|Drama|5.099648952484131|
|  68945|Neon Genesis Evan...|Action|Animation|...|5.295875549316406|
|   8477|    Jetée, La (1962)|      Romance|Sci-Fi|5.188567638397217|
|  84847|         Emma (2009)|Comedy|Drama|Romance|5.218502998352051|
|  87234|    Submarine (2010)|Comedy|Drama|Romance|5.070328712463379|
|  99764|It's Such a Beaut...|Animation|Comedy|...|5.099648952484131|
+-------+--------------------+--------------------+-----------------+



In [71]:
rec_232 = get_recs_for_user(user_recs, 232)
rec_232.show()

+-------+--------------------+--------------------+------------------+
|movieId|               title|              genres|  predicted_rating|
+-------+--------------------+--------------------+------------------+
| 117531|    Watermark (2014)|         Documentary|  4.62070894241333|
| 138966|Nasu: Summer in A...|           Animation|  4.62070894241333|
| 179135|Blue Planet II (2...|         Documentary|  4.62070894241333|
| 184245|De platte jungle ...|         Documentary|  4.62070894241333|
|  26073|Human Condition I...|           Drama|War|  4.62070894241333|
|  59018| Visitor, The (2007)|       Drama|Romance|4.6876444816589355|
|  68945|Neon Genesis Evan...|Action|Animation|...| 4.751951694488525|
|   7481|   Enemy Mine (1985)|Adventure|Drama|S...|4.6876444816589355|
|   7841|Children of Dune ...|      Fantasy|Sci-Fi|4.6876444816589355|
|  84273|Zeitgeist: Moving...|         Documentary|  4.62070894241333|
+-------+--------------------+--------------------+------------------+

