### Spark HW2 Moive Recommendation （Deadeline: 6/07/2021）
In this notebook, we will use an Alternating Least Squares (ALS) algorithm with Spark APIs to predict the ratings for the movies in [MovieLens small dataset](https://grouplens.org/datasets/movielens/latest/)

In [0]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import math
%matplotlib inline



In [0]:
import os
os.environ["PYSPARK_PYTHON"] = "python3"

## Part1: Data ETL and Data Exploration

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("moive analysis") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [0]:
movies_df = spark.read.load("/FileStore/tables/movies.csv", format='csv', header = True)
ratings_df = spark.read.load("/FileStore/tables/movies/ratings.csv", format='csv', header = True)
links_df = spark.read.load("/FileStore/tables/movies/links.csv", format='csv', header = True)
tags_df = spark.read.load("/FileStore/tables/movies/tags.csv", format='csv', header = True)

In [0]:
movies_df.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [0]:
ratings_df.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [0]:
links_df.show(5)

+-------+-------+------+
|movieId| imdbId|tmdbId|
+-------+-------+------+
|      1|0114709|   862|
|      2|0113497|  8844|
|      3|0113228| 15602|
|      4|0114885| 31357|
|      5|0113041| 11862|
+-------+-------+------+
only showing top 5 rows



In [0]:
tags_df.show(5)

+------+-------+---------------+----------+
|userId|movieId|            tag| timestamp|
+------+-------+---------------+----------+
|     2|  60756|          funny|1445714994|
|     2|  60756|Highly quotable|1445714996|
|     2|  60756|   will ferrell|1445714992|
|     2|  89774|   Boxing story|1445715207|
|     2|  89774|            MMA|1445715200|
+------+-------+---------------+----------+
only showing top 5 rows



In [0]:
tmp1 = ratings_df.groupBy("userID").count().toPandas()['count'].min()
tmp2 = ratings_df.groupBy("movieId").count().toPandas()['count'].min()
print('For the users that rated movies and the movies that were rated:')
print('Minimum number of ratings per user is {}'.format(tmp1))
print('Minimum number of ratings per movie is {}'.format(tmp2))

For the users that rated movies and the movies that were rated:
Minimum number of ratings per user is 20
Minimum number of ratings per movie is 1


In [0]:
tmp1 = sum(ratings_df.groupBy("movieId").count().toPandas()['count'] == 1)
tmp2 = ratings_df.select('movieId').distinct().count()
print('{} out of {} movies are rated by only one user'.format(tmp1, tmp2))

3446 out of 9724 movies are rated by only one user


## Part 1: Spark SQL and OLAP

In [0]:
movies_df.registerTempTable("movies")
ratings_df.registerTempTable("ratings")
links_df.registerTempTable("links")
tags_df.registerTempTable("tags")



### The number of Users

In [0]:
%sql 
SELECT COUNT(DISTINCT userID)
FROM ratings

count(DISTINCT userID)
610


### The number of Movies

In [0]:
%sql 
SELECT COUNT(DISTINCT movieID)
FROM movies

count(DISTINCT movieID)
9742


### How many movies are rated by users? List movies not rated before

In [0]:
%sql 
SELECT a.title, a.genres
FROM movies a LEFT JOIN ratings b ON a.movieID = b.movieID
WHERE b.rating IS NULL

title,genres
"Innocents, The (1961)",Drama|Horror|Thriller
Niagara (1953),Drama|Thriller
For All Mankind (1989),Documentary
"Color of Paradise, The (Rang-e khoda) (1999)",Drama
I Know Where I'm Going! (1945),Drama|Romance|War
"Chosen, The (1981)",Drama
"Road Home, The (Wo de fu qin mu qin) (1999)",Drama|Romance
Scrooge (1970),Drama|Fantasy|Musical
Proof (1991),Comedy|Drama|Romance
"Parallax View, The (1974)",Thriller


In [0]:
%sql 
SELECT COUNT(DISTINCT movieID)
FROM movies

count(DISTINCT movieID)
9742


### List Movie Genres

In [0]:
%sql
SELECT genres
FROM movies

genres
Adventure|Animation|Children|Comedy|Fantasy
Adventure|Children|Fantasy
Comedy|Romance
Comedy|Drama|Romance
Comedy
Action|Crime|Thriller
Comedy|Romance
Adventure|Children
Action
Action|Adventure|Thriller


In [0]:
# to split the genres we can use get_dummies function in pandas
movies_pd = movies_df.toPandas()

In [0]:
gen_split = movies_pd['genres'].str.get_dummies(sep="|").sum()
print(gen_split)

(no genres listed)      34
Action                1828
Adventure             1263
Animation              611
Children               664
Comedy                3756
Crime                 1199
Documentary            440
Drama                 4361
Fantasy                779
Film-Noir               87
Horror                 978
IMAX                   158
Musical                334
Mystery                573
Romance               1596
Sci-Fi                 980
Thriller              1894
War                    382
Western                167
dtype: int64


### Movie for Each Category

In [0]:
gen_split = movies_pd['genres'].str.get_dummies(sep="|")
temp_movie = movies_pd.drop(['genres'], axis=1)
new_movies = pd.concat([temp_movie, gen_split], axis=1)
new_movies.head()

Unnamed: 0,movieId,title,(no genres listed),Action,Adventure,Animation,Children,Comedy,Crime,Documentary,...,Film-Noir,Horror,IMAX,Musical,Mystery,Romance,Sci-Fi,Thriller,War,Western
0,1,Toy Story (1995),0,0,1,1,1,1,0,0,...,0,0,0,0,0,0,0,0,0,0
1,2,Jumanji (1995),0,0,1,0,1,0,0,0,...,0,0,0,0,0,0,0,0,0,0
2,3,Grumpier Old Men (1995),0,0,0,0,0,1,0,0,...,0,0,0,0,0,1,0,0,0,0
3,4,Waiting to Exhale (1995),0,0,0,0,0,1,0,0,...,0,0,0,0,0,1,0,0,0,0
4,5,Father of the Bride Part II (1995),0,0,0,0,0,1,0,0,...,0,0,0,0,0,0,0,0,0,0


In [0]:
new_movies_df = spark.createDataFrame(new_movies)
new_movies_df.registerTempTable("new_movies")



In [0]:
%sql
SELECT title
FROM new_movies
WHERE Action = 1

title
Heat (1995)
Sudden Death (1995)
GoldenEye (1995)
Cutthroat Island (1995)
Money Train (1995)
Assassins (1995)
Dead Presidents (1995)
Mortal Kombat (1995)
Lawnmower Man 2: Beyond Cyberspace (1996)
From Dusk Till Dawn (1996)


In [0]:
%sql
SELECT title
FROM new_movies
WHERE Drama = 1

title
Waiting to Exhale (1995)
"American President, The (1995)"
Nixon (1995)
Casino (1995)
Sense and Sensibility (1995)
Money Train (1995)
Copycat (1995)
Powder (1995)
Leaving Las Vegas (1995)
Othello (1995)


## Part2: Spark ALS based approach for training model
We will use an Spark ML to predict the ratings, so let's reload "ratings.csv" using ``sc.textFile`` and then convert it to the form of (user, item, rating) tuples.

In [0]:
ratings_df.show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
|     1|    163|   5.0|964983650|
|     1|    216|   5.0|964981208|
|     1|    223|   3.0|964980985|
|     1|    231|   5.0|964981179|
|     1|    235|   4.0|964980908|
|     1|    260|   5.0|964981680|
|     1|    296|   3.0|964982967|
|     1|    316|   3.0|964982310|
|     1|    333|   5.0|964981179|
|     1|    349|   4.0|964982563|
+------+-------+------+---------+
only showing top 20 rows



In [0]:
movie_ratings=ratings_df.drop('timestamp')

In [0]:
# Data type convert
from pyspark.sql.types import IntegerType, FloatType
movie_ratings = movie_ratings.withColumn("userId", movie_ratings["userId"].cast(IntegerType()))
movie_ratings = movie_ratings.withColumn("movieId", movie_ratings["movieId"].cast(IntegerType()))
movie_ratings = movie_ratings.withColumn("rating", movie_ratings["rating"].cast(FloatType()))

In [0]:
movie_ratings.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|      6|   4.0|
|     1|     47|   5.0|
|     1|     50|   5.0|
|     1|     70|   3.0|
|     1|    101|   5.0|
|     1|    110|   4.0|
|     1|    151|   5.0|
|     1|    157|   5.0|
|     1|    163|   5.0|
|     1|    216|   5.0|
|     1|    223|   3.0|
|     1|    231|   5.0|
|     1|    235|   4.0|
|     1|    260|   5.0|
|     1|    296|   3.0|
|     1|    316|   3.0|
|     1|    333|   5.0|
|     1|    349|   4.0|
+------+-------+------+
only showing top 20 rows



### ALS Model Selection and Evaluation

With the ALS model, we can use a grid search to find the optimal hyperparameters.

In [0]:
# import package
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder

In [0]:
#Create test and train set
(training,test)=movie_ratings.randomSplit([0.8,0.2])

In [0]:
print(training.head(10))

[Row(userId=1, movieId=6, rating=4.0), Row(userId=1, movieId=47, rating=5.0), Row(userId=1, movieId=50, rating=5.0), Row(userId=1, movieId=101, rating=5.0), Row(userId=1, movieId=110, rating=4.0), Row(userId=1, movieId=151, rating=5.0), Row(userId=1, movieId=163, rating=5.0), Row(userId=1, movieId=216, rating=5.0), Row(userId=1, movieId=223, rating=3.0), Row(userId=1, movieId=231, rating=5.0)]


In [0]:
#Create ALS model
als = ALS(maxIter=5, rank=10, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")

#fit the model
model = als.fit(training)

In [0]:
#Tune model using ParamGridBuilder
builder = ParamGridBuilder().baseOn({als.rank:10, als.userCol:"userId", als.itemCol:"movieId", als.ratingCol:"rating", }).addGrid(als.maxIter, [1,5, 10]).addGrid(als.regParam, [0.01, 1.0, 2.0]).build()

# Define evaluator as RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

# Build Cross validation 
cv = CrossValidator(estimator=als, estimatorParamMaps=builder, evaluator=evaluator, numFolds=5)

In [0]:
cvModel = cv.fit(training)

In [0]:
# Obtain the best model from CV
bestModel = cvModel.bestModel

### Model testing
And finally, make a prediction and check the testing error.

In [0]:
#Generate predictions and evaluate using RMSE
predictions=bestModel.transform(test)
rmse = evaluator.evaluate(predictions)

In [0]:
#Print evaluation metrics and model parameters
print ("RMSE = "+str(rmse))
print ("**Best Model**")
print (" Rank:"+str(bestModel._java_obj.parent().getRank())), 
print (" MaxIter:"+str(bestModel._java_obj.parent().getMaxIter())), 
print (" RegParam:"+str(bestModel._java_obj.parent().getRegParam())), 

RMSE = 1.0934309069692294
**Best Model**
 Rank:10
 MaxIter:5
 RegParam:0.01
Out[57]: (None,)

In [0]:
predictions.show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   148|   4896|   4.0| 3.8046253|
|   148|  40815|   4.0|  4.132256|
|   148|  50872|   3.0| 3.0393445|
|   148|  81847|   4.5|  1.955942|
|   148|  98243|   4.5|  5.278136|
|   148|  99149|   3.0| 3.2202404|
|   148| 108932|   4.0| 3.6175215|
|   148| 115617|   3.5| 3.4617686|
|   463|    296|   4.0|  4.248542|
|   463|    520|   4.0| 3.9542644|
|   463|    552|   3.5| 2.9018903|
|   463|   1690|   4.0|  3.108027|
|   463|   3448|   3.0|  4.010786|
|   463|   5952|   5.0| 3.9307723|
|   463|   7320|   4.0| 3.3047616|
|   471|    296|   4.0|  5.351489|
|   471|    356|   3.0|  4.377989|
|   471|   8874|   3.5|  4.029876|
|   471|  44191|   3.5| 3.6614168|
|   471|  92259|   4.5| 3.9955904|
+------+-------+------+----------+
only showing top 20 rows



### Model apply and see the performance

In [0]:
alldata=bestModel.transform(movie_ratings)
rmse = evaluator.evaluate(alldata)
print ("RMSE = "+str(rmse))

RMSE = 0.6628964952844711


In [0]:
alldata.registerTempTable("alldata")



In [0]:
%sql select * from alldata

userId,movieId,rating,prediction
148,356,4.0,3.4569361
148,1197,3.0,3.6965964
148,4308,4.0,3.5245883
148,4886,3.0,3.7179108
148,4896,4.0,3.8046253
148,4993,3.0,3.124133
148,5618,3.0,3.1390483
148,5816,4.0,3.6933594
148,5952,3.0,3.1456192
148,6377,3.0,3.392334


## Recommend moive to users with id: 575, 232. 
you can choose some users to recommend the moives

In [0]:
userRecs = bestModel.recommendForAllUsers(10)
userRecs.registerTempTable("userRec")

In [0]:
%sql
SELECT *
FROM userRec
WHERE userId = 575 OR userId = 232

userId,recommendations
232,"List(List(3200, 4.769612), List(7121, 4.699797), List(7706, 4.6666393), List(85342, 4.640274), List(1147, 4.61867), List(53123, 4.580992), List(5066, 4.5721054), List(3508, 4.568757), List(158872, 4.555845), List(6987, 4.535132))"
575,"List(List(3814, 7.39736), List(55276, 7.2607408), List(2195, 7.0349455), List(2318, 6.851413), List(800, 6.8418713), List(3616, 6.692211), List(2384, 6.5455213), List(4040, 6.5013547), List(106489, 6.4895554), List(69644, 6.46819))"


## Find the similar moives for moive with id: 463, 471
You can find the similar moives based on the ALS results

In [0]:
movieRecs = bestModel.recommendForAllItems(10)
movieRecs.registerTempTable("movieRec")

In [0]:
%sql
SELECT *
FROM movieRec
WHERE movieId = 463 OR movieId = 471

movieId,recommendations
471,"List(List(147, 10.20135), List(259, 8.709133), List(502, 7.64036), List(126, 7.159145), List(35, 6.863675), List(531, 6.620311), List(418, 6.2766795), List(375, 6.2339005), List(383, 6.1521726), List(497, 6.092638))"
