### Spark HW2 Moive Recommendation
In this notebook, we will use an Alternating Least Squares (ALS) algorithm with Spark APIs to predict the ratings for the movies in [MovieLens small dataset](https://grouplens.org/datasets/movielens/latest/)

In [2]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import math
%matplotlib inline

In [3]:
import os
os.environ["PYSPARK_PYTHON"] = "python3"

## Part1: Data ETL and Data Exploration

In [5]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("moive analysis") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [6]:
movies_df = spark.read.load("/FileStore/tables/movies.csv", format='csv', header = True)
ratings_df = spark.read.load("/FileStore/tables/ratings.csv", format='csv', header = True)
links_df = spark.read.load("/FileStore/tables/links.csv", format='csv', header = True)
tags_df = spark.read.load("/FileStore/tables/tags.csv", format='csv', header = True)

In [7]:
movies_df.show(5)

In [8]:
ratings_df.show(5)

In [9]:
links_df.show(5)

In [10]:
tags_df.show(5)

In [11]:
tmp1 = ratings_df.groupBy("userID").count().toPandas()['count'].min()
tmp2 = ratings_df.groupBy("movieId").count().toPandas()['count'].min()
print('For the users that rated movies and the movies that were rated:')
print('Minimum number of ratings per user is {}'.format(tmp1))
print('Minimum number of ratings per movie is {}'.format(tmp2))

In [12]:
tmp1 = sum(ratings_df.groupBy("movieId").count().toPandas()['count'] == 1)
tmp2 = ratings_df.select('movieId').distinct().count()
print('{} out of {} movies are rated by only one user'.format(tmp1, tmp2))

## Part 1: Spark SQL and OLAP

In [14]:
movies_df.registerTempTable("movies")
ratings_df.registerTempTable("ratings")
links_df.registerTempTable("links")
tags_df.registerTempTable("tags")

### Q1: The number of Users

In [16]:
%sql select count(distinct userID) from ratings

count(DISTINCT userID)
610


### Q2: The number of Movies

In [18]:
%sql select count(distinct movieID) from movies

count(DISTINCT movieID)
9742


### Q3:  How many movies are rated by users? List movies not rated before

In [20]:
%sql select title, genres from movies where movieID not in (select distinct movieID from ratings)

title,genres
"Innocents, The (1961)",Drama|Horror|Thriller
Niagara (1953),Drama|Thriller
For All Mankind (1989),Documentary
"Color of Paradise, The (Rang-e khoda) (1999)",Drama
I Know Where I'm Going! (1945),Drama|Romance|War
"Chosen, The (1981)",Drama
"Road Home, The (Wo de fu qin mu qin) (1999)",Drama|Romance
Scrooge (1970),Drama|Fantasy|Musical
Proof (1991),Comedy|Drama|Romance
"Parallax View, The (1974)",Thriller


In [21]:
%sql select count(distinct movieID) from ratings 

count(DISTINCT movieID)
9724


### Q4: List Movie Genres

In [23]:
%sql select distinct genres from movies

genres
Comedy|Horror|Thriller
Adventure|Sci-Fi|Thriller
Action|Adventure|Drama|Fantasy
Action|Drama|Horror
Action|Animation|Comedy|Sci-Fi
Animation|Children|Drama|Musical|Romance
Action|Adventure|Drama
Adventure|Sci-Fi
Documentary|Musical|IMAX
Adventure|Children|Fantasy|Sci-Fi|Thriller


In [24]:
%sql select distinct explode(split(genres, '[|]')) as Genres from movies order by Genres 

Genres
(no genres listed)
Action
Adventure
Animation
Children
Comedy
Crime
Documentary
Drama
Fantasy


### Q5: Movie for Each Category

In [26]:
%sql select title, explode(split(genres, '[|]')) as Genres from movies order by Genres 

title,Genres
Too Funny to Fail: The Life and Death of The Dana Carvey Show (2017),(no genres listed)
Superfast! (2015),(no genres listed)
Ali Wong: Baby Cobra (2016),(no genres listed)
Serving in Silence: The Margarethe Cammermeyer Story (1995),(no genres listed)
A Christmas Story Live! (2017),(no genres listed)
Green Room (2015),(no genres listed)
A Midsummer Night's Dream (2016),(no genres listed)
The Forbidden Dance (1990),(no genres listed)
Ben-hur (2016),(no genres listed)
The Brand New Testament (2015),(no genres listed)


In [27]:
%sql select a.Genres, count(*) as Count from (select explode(split(genres, '[|]')) as Genres from movies) as a group by a.Genres order by a.Genres

Genres,Count
(no genres listed),34
Action,1828
Adventure,1263
Animation,611
Children,664
Comedy,3756
Crime,1199
Documentary,440
Drama,4361
Fantasy,779


## Part2: Spark ALS based approach for training model
We will use an Spark ML to predict the ratings, so let's reload "ratings.csv" using ``sc.textFile`` and then convert it to the form of (user, item, rating) tuples.

In [29]:
ratings_df.show()

In [30]:
movie_ratings0=ratings_df.drop('timestamp')

In [31]:
# Data type convert
from pyspark.sql.types import IntegerType, FloatType
movie_ratings0 = movie_ratings0.withColumn("userId", movie_ratings0["userId"].cast(IntegerType()))
movie_ratings0 = movie_ratings0.withColumn("movieId", movie_ratings0["movieId"].cast(IntegerType()))
movie_ratings0 = movie_ratings0.withColumn("rating", movie_ratings0["rating"].cast(FloatType()))

In [32]:
movie_ratings0.show()

In [33]:
from pyspark.sql.functions import monotonically_increasing_id
movie_ratings0 = movie_ratings0.withColumn("id", monotonically_increasing_id())
movie_ratings0.show()

In [34]:
movie_ratings0.count()

In [35]:
from pyspark.sql.functions import col
(movie_ratings, rest) = movie_ratings0.randomSplit([0.1, 0.9])
movie_ratings.count()

### ALS Model Selection and Evaluation

With the ALS model, we can use a grid search to find the optimal hyperparameters.

In [37]:
# import package
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder

In [38]:
#Create test and train set
(training,test)=movie_ratings.randomSplit([0.8,0.2])
training.show()

In [39]:
training.count()

In [40]:
#Create ALS model 
als = ALS(regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")

In [41]:
type(als)

In [42]:
#Tune model using ParamGridBuilder
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [5, 10]) \
            .addGrid(als.maxIter, [5, 10]) \
            .addGrid(als.regParam, [0.01, 0.1]) \
            .build()
print(len(param_grid))

In [43]:
# Define evaluator as RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

In [44]:
# Build Cross validation b
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds = 5)

In [45]:
#Fit ALS model to training data
cvModel=cv.fit(training)

In [46]:
#Extract best model from the tuning exercise using ParamGridBuilder
best_model = cvModel.bestModel

### Model testing
And finally, make a prediction and check the testing error.

In [48]:
#Generate predictions and evaluate using RMSE
predictions=best_model.transform(test)
rmse = evaluator.evaluate(predictions)

In [49]:
#Print evaluation metrics and model parameters
print ("RMSE = "+str(rmse))
print ("**Best Model**")
print (" Rank: " + str(best_model.rank)), 
print (" MaxIter: " + str(best_model
    ._java_obj     # Get Java object
    .parent()      # Get parent (ALS estimator)
    .getMaxIter())), 
print (" RegParam: " + str(best_model._java_obj.parent().getRegParam())), 

In [50]:
predictions.show()

### Model apply and see the performance

In [52]:
alldata = best_model.transform(movie_ratings)
rmse = evaluator.evaluate(alldata)
print ("RMSE = "+str(rmse))
alldata.count()

In [53]:
alldata.registerTempTable("alldata")

In [54]:
%sql select * from alldata order by id

userId,movieId,rating,id,prediction
1,50,5.0,4,4.8139777
1,673,3.0,39,3.0140967
1,1298,5.0,90,4.787555
1,1408,3.0,94,3.1122477
1,2596,5.0,168,4.868236
1,2616,4.0,169,3.8497894
1,2628,4.0,171,1.4667412
1,2797,4.0,182,4.071091
1,3034,5.0,199,4.866223
1,3053,5.0,201,2.9049928


In [55]:
%sql select * from movies join alldata on movies.movieId=alldata.movieId order by id

movieId,title,genres,userId,movieId.1,rating,id,prediction
50,"Usual Suspects, The (1995)",Crime|Mystery|Thriller,1,50,5.0,4,4.8139777
673,Space Jam (1996),Adventure|Animation|Children|Comedy|Fantasy|Sci-Fi,1,673,3.0,39,3.0140967
1298,Pink Floyd: The Wall (1982),Drama|Musical,1,1298,5.0,90,4.787555
1408,"Last of the Mohicans, The (1992)",Action|Romance|War|Western,1,1408,3.0,94,3.1122477
2596,SLC Punk! (1998),Comedy|Drama,1,2596,5.0,168,4.868236
2616,Dick Tracy (1990),Action|Crime,1,2616,4.0,169,3.8497894
2628,Star Wars: Episode I - The Phantom Menace (1999),Action|Adventure|Sci-Fi,1,2628,4.0,171,1.4667412
2797,Big (1988),Comedy|Drama|Fantasy|Romance,1,2797,4.0,182,4.071091
3034,Robin Hood (1973),Adventure|Animation|Children|Comedy|Musical,1,3034,5.0,199,4.866223
3053,"Messenger: The Story of Joan of Arc, The (1999)",Drama|War,1,3053,5.0,201,2.9049928


## Recommend moive to users with id: 575, 232. 
you can choose some users to recommend the moives

In [57]:
#recommend top 3 movies for each user
rec = best_model.recommendForAllUsers(3)
rec.show()

In [58]:
rec.count()

In [59]:
rec.where(rec.userId == 575).collect()

In [60]:
%sql select * from movies where movieId in (2568, 60756, 1721) order by movieId

movieId,title,genres
1721,Titanic (1997),Drama|Romance
2568,"Mod Squad, The (1999)",Action|Crime
60756,Step Brothers (2008),Comedy


In [61]:
rec.where(rec.userId == 232).collect()

In [62]:
%sql select * from movies where movieId in (1214, 2959, 2924) order by movieId

movieId,title,genres
1214,Alien (1979),Horror|Sci-Fi
2924,Drunken Master (Jui kuen) (1978),Action|Comedy
2959,Fight Club (1999),Action|Crime|Drama|Thriller


## Find the similar movies for movies with id: 463, 471
You can find the similar moives based on the ALS results

In [64]:
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml.linalg import Vectors, VectorUDT

In [65]:
%sql select * from movies where movieId = 471

movieId,title,genres
471,"Hudsucker Proxy, The (1994)",Comedy


In [66]:
a = best_model.itemFactors
display(a.cache())

id,features
10,"List(-0.17399456, -0.11281259, 0.8569175, -0.54631084, -0.55400944, 1.4655101, -0.64992434, 0.11219682, -0.58871645, -0.805032)"
20,"List(0.48783374, 0.26239246, 1.025266, -0.8355324, -0.64349926, 1.1789007, -0.38052866, 0.12471497, -0.36214808, 0.46206874)"
40,"List(0.011918607, -0.6823519, 1.2163125, 0.6471562, -1.8250132, -0.11542535, -1.0401864, -0.16205989, 0.13153061, -0.6745505)"
50,"List(0.41380128, 0.52764153, 0.3219214, -0.59345657, -0.8761349, 1.5146528, -0.88337696, 0.8208594, -1.0010954, -0.6537329)"
60,"List(-0.13563386, 0.9672694, 0.74212706, -0.73763955, -0.8961581, 1.0795335, -0.14413597, 0.6976324, -0.31207517, -0.4542721)"
70,"List(0.1868956, 0.11975827, 0.24218467, 0.28644493, -1.4317477, 1.8521532, -1.1187931, 0.25387195, -0.060563233, -0.38064763)"
100,"List(0.3087319, 0.5037343, 0.49833703, -0.14110358, -0.7671431, 0.92059064, 0.051934868, 0.93045247, 0.36248496, -0.062180847)"
110,"List(0.60796386, 0.70030755, 1.4684035, 0.12191183, -0.30424312, 1.4313833, -0.60322106, 0.3105281, -0.90391064, -0.6063106)"
140,"List(-0.4878867, 0.6201137, -0.76593447, -0.5902814, 0.33427775, 0.33656263, -1.7366278, 0.20049278, 0.20113528, -1.1813973)"
150,"List(0.5027478, -0.07749295, 0.5137244, -0.25358272, -1.2210498, 1.7142764, -0.7389047, 0.6288325, 0.05641706, -1.2028278)"


In [67]:
a.where(a.id == 471).collect()

In [68]:
a.where(a.id == 463).collect()

In [69]:
a.registerTempTable("itemMatrix")

In [70]:
%sql
select features from itemMatrix where id = 471

features
"List(0.4146921, -0.009149789, 1.2196895, -1.4785196, 0.5422292, 1.393389, 0.11748117, 0.44705722, 0.40512022, 0.38026378)"


In [71]:
%sql
select features from itemMatrix where id = 463

features


In [72]:
brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes",seed=12345, bucketLength=1.0)
a.printSchema()
#change features columns into dense vector
to_vector = udf(lambda a: Vectors.dense(a), VectorUDT())
data = a.select("id", to_vector("features").alias("features"))
data.printSchema()

In [73]:
model = brp.fit(data)
model.transform(data)

In [74]:
model.approxNearestNeighbors(data, Vectors.dense([0.4146921,-0.009149789,1.2196895,-1.4785196,0.5422292,1.393389,0.11748117,0.44705722,0.40512022,0.38026378]), 6).collect()

In [75]:
%sql
select * from movies
where movieId in (100383,4226,1500,5957,246)

movieId,title,genres
246,Hoop Dreams (1994),Documentary
1500,Grosse Pointe Blank (1997),Comedy|Crime|Romance
4226,Memento (2000),Mystery|Thriller
5957,Two Weeks Notice (2002),Comedy|Romance
100383,Side Effects (2013),Crime|Drama|Mystery|Thriller


## Write the report 
motivation
1. step1
2. step2
3. step3
4. step4  
output and conclusion

## Movie Recommendation with Spark ALS
1. Use Alternating Least Square with Spark to predict ratings for movies in MovieLens small dataset
2. Perform Data ETL and Data Exploration to learn basic information and prepare data for analysis
3. Analyze datasets in terms of movies, users, ratings and genres via Spark SQL and OLAP
4. Train an ALS model on a dataset of 10000 data points, tune hyperparameters using cross validation to get best model, use best model to predict ratings and achieve 0.56 RMSE, recommend top 3 movies to users, and find similar movies for movies using ALS’s item matrix and k-nearest neighbors