### Spark Moive Recommendation
In this notebook, Alternating Least Squares (ALS) algorithm will be used with Spark APIs to predict the ratings for the movies in [MovieLens small dataset](https://grouplens.org/datasets/movielens/latest/)

In [None]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import math
%matplotlib inline

!pip install pyspark
!pip install mlflow
#dbutils.library.installPyPI("mlflow")
#dbutils.library.restartPython()
import mlflow

In [None]:
import os
os.environ["PYSPARK_PYTHON"] = "python3"

## Part1: Data ETL and Data Exploration

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("moive analysis") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [None]:
from google.colab import drive
drive.mount('/content/drive');
DATA_PATH = "drive/My Drive/Big Data Project/Data/MovieLens27M/"

In [None]:
movies_df = spark.read.load(DATA_PATH + "movies.csv", format='csv', header = True)
ratings_df = spark.read.load(DATA_PATH + "ratings.csv", format='csv', header = True)
links_df = spark.read.load(DATA_PATH + "links.csv", format='csv', header = True)
tags_df = spark.read.load(DATA_PATH + "tags.csv", format='csv', header = True)

In [None]:
type(movies_df)

In [None]:
movies_df.count()

In [None]:
#movies_df.show(5)

movies_df.createOrReplaceTempView("movies_df")

display (spark.sql("SELECT * FROM movies_df limit 5"))

In [None]:
#ratings_df.show(5)

ratings_df.createOrReplaceTempView("ratings_df")

display (spark.sql("SELECT * FROM ratings_df limit 5"))

In [None]:
links_df.show(5)

links_df.createOrReplaceTempView("links_df")

display (spark.sql("SELECT * FROM links_df limit 5"))

In [None]:
#tags_df.show(5)

tags_df.createOrReplaceTempView("tags_df")

display (spark.sql("SELECT * FROM tags_df limit 5"))

In [None]:
tmp1 = ratings_df.groupBy("userID").count().toPandas()['count'].min()
tmp2 = ratings_df.groupBy("movieId").count().toPandas()['count'].min()
print('For the users that rated movies and the movies that were rated:')
print('Minimum number of ratings per user is {}'.format(tmp1))
print('Minimum number of ratings per movie is {}'.format(tmp2))

In [None]:
tmp1 = sum(ratings_df.groupBy("movieId").count().toPandas()['count'] == 1)
tmp2 = ratings_df.select('movieId').distinct().count()
print('{} out of {} movies are rated by only one user'.format(tmp1, tmp2))

## Part 1: Spark SQL and OLAP

In [None]:
movies_df.registerTempTable("movies")
ratings_df.registerTempTable("ratings")
links_df.registerTempTable("links")
tags_df.registerTempTable("tags")

### Q1: The number of Users

In [None]:
# %sql
num_users = spark.sql("SELECT count (distinct userID) as num_users FROM ratings")
display(num_users)

In [None]:
ratings_df.select("userId").distinct().count()

In [None]:
type(ratings_df.select("userId"))

### Q2: The number of Movies

In [None]:
#%sql 
num_movies = spark.sql("SELECT count (distinct movieID) as num_movies FROM movies")
display(num_movies)

In [None]:
movies_df.select('movieID').distinct().count()

In [None]:
movies_df.select('movieID').count()

### Q3:  How many movies are rated by users? List movies not rated before

In [None]:
rated_by_users = ratings_df.select('movieID').distinct().count()
print('How many movies are rated by users?', rated_by_users)

In [None]:
%sql 
SELECT movies.title, movies.genres, ratings.rating FROM movies left JOIN ratings ON ratings.movieId = movies.movieID WHERE ratings.rating IS null LIMIT 10

### Q4: List Movie Genres

In [None]:
%sql
SELECT DISTINCT(genres) FROM movies LIMIT 10

In [None]:
%sql
SELECT SUBSTRING_INDEX(SUBSTRING_INDEX(genres, '|', 1), '|', -1) as genre FROM movies
UNION
SELECT  SUBSTRING_INDEX(SUBSTRING_INDEX(genres, '|', 2), '|', -1) as genre FROM movies
UNION
SELECT  SUBSTRING_INDEX(SUBSTRING_INDEX(genres, '|', 3), '|', -1) as genre FROM movies
UNION
SELECT  SUBSTRING_INDEX(SUBSTRING_INDEX(genres, '|', 4), '|', -1) as genre FROM movies
UNION
SELECT  SUBSTRING_INDEX(SUBSTRING_INDEX(genres, '|', 5), '|', -1) as genre FROM movies
UNION
SELECT  SUBSTRING_INDEX(SUBSTRING_INDEX(genres, '|', 6), '|', -1) as genre FROM movies
ORDER BY genre;

--This is method I do not like

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import col, mean, udf, lit, current_timestamp, unix_timestamp, array_contains
extract_genres = udf(lambda x: x.split("|"), ArrayType(StringType()))
movies_df_clean = movies_df.select("movieId", "title", extract_genres("genres").alias("genres"))
#display(movies_df_clean)

movies_df_clean.createOrReplaceTempView("movies_df_clean")

display (spark.sql("SELECT * FROM movies_df_clean limit 5"))

In [None]:
genres_result = list(set(movies_df_clean.select('genres').rdd.flatMap(tuple).flatMap(tuple).collect()))
genres_result

### Q5: Movie for Each Category

In [None]:
genres_result = list(set(movies_df_clean.select('genres').rdd.flatMap(tuple).flatMap(tuple).collect()))
genres_result

In [None]:
movie_pdf = movies_df.toPandas()
movie_pdf['genres'].str.get_dummies(sep='|').head()

In [None]:
list_of_movie = list(movie_pdf['title'])

## Part2: Spark ALS based approach for training model
We will use an Spark ML to predict the ratings, so let's reload "ratings.csv" using ``sc.textFile`` and then convert it to the form of (user, item, rating) tuples.

In [None]:
#ratings_df.show(10)

ratings_df.createOrReplaceTempView("ratings_df")

display (spark.sql("SELECT * FROM ratings_df limit 5"))

In [None]:
movie_ratings=ratings_df.drop('timestamp')

In [None]:
# Data type convert
from pyspark.sql.types import IntegerType, FloatType
movie_ratings = movie_ratings.withColumn("userId", movie_ratings["userId"].cast(IntegerType()))
movie_ratings = movie_ratings.withColumn("movieId", movie_ratings["movieId"].cast(IntegerType()))
movie_ratings = movie_ratings.withColumn("rating", movie_ratings["rating"].cast(FloatType()))

In [None]:
#movie_ratings.show(10)

movie_ratings.createOrReplaceTempView("movie_ratings")

display (spark.sql("SELECT * FROM movie_ratings limit 10"))

### ALS Model Selection and Evaluation

With the ALS model, we can use a grid search to find the optimal hyperparameters.

In [None]:
# import package
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder

In [None]:
#Create test and train set
(training,test)=movie_ratings.randomSplit([0.8,0.2])

In [None]:
# Create ALS model
# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, rank=10, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")

In [None]:
# 1st print a list of parameters
print(als.explainParams())

In [None]:
#Tune model using ParamGridBuilder
# it will take long time in the cv period, so just use few parameter to try 

paramGrid = (ParamGridBuilder()
             .addGrid(als.regParam, [0.01])
             .addGrid(als.rank, [10])
             .addGrid(als.maxIter, [15])
             .build())

# paramGrid = (ParamGridBuilder()
#              .addGrid(als.regParam, [0.01, 0.5, 1, 1.5])
#              .addGrid(als.rank, [10, 15, 20, 25])
#              .addGrid(als.maxIter, [1, 5, 10, 15])
#              .build())

In [None]:
# Define evaluator as RMSE

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")

In [None]:
from pyspark.ml.tuning import CrossValidator
# Build Cross validation 
# Create 5-fold CrossValidator
# it takes too long that I only use 2-fold
cv = CrossValidator(estimator=als, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=2)

# Run cross validations
cvModel = cv.fit(training)
# this will likely take a fair amount of time because of the amount of models that we're creating and testing

In [None]:
# Extract the best model selected by CV
best_model = cvModel.bestModel

In [None]:
#Fit ALS model to training data

# specify parameter settings by the best model obtained via CV
print ("**Best Model**")
print ("Rank: ", best_model)
print (" MaxIter: ", str(best_model._java_obj.parent().getMaxIter()))
print (" RegParam:",  best_model._java_obj.parent().regParam())

### Model testing
And finally, make a prediction and check the testing error.

In [None]:
#Generate predictions and evaluate using RMSE
predictions=best_model.transform(test)
rmse = evaluator.evaluate(predictions)

In [None]:
#Print RMSE 
print ("RMSE = "+str(rmse))

In [None]:
#Extract best model from the tuning exercise using ParamGridBuilder

als_best = ALS(maxIter=15, rank=10, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
model = als_best.fit(training)

In [None]:
#predictions.show(10)

predictions.createOrReplaceTempView("predictions")

display (spark.sql("SELECT * FROM predictions limit 10"))

### Model apply and see the performance

In [None]:
alldata=best_model.transform(movie_ratings)
rmse = evaluator.evaluate(alldata)
print ("RMSE = "+str(rmse))

In [None]:
alldata.registerTempTable("alldata")

In [None]:
%sql SELECT * FROM alldata LIMIT 10

In [None]:
%sql SELECT * FROM movies JOIN alldata ON movies.movieId=alldata.movieId LIMIT 10

## Recommend moive to users with id: 575, 232. 
you can choose some users to recommend the moives

In [None]:
#recommend 10 movies for each users
user_recs = best_model.recommendForAllUsers(10)
#user_recs.show(10)

user_recs.createOrReplaceTempView("user_recs")

display (spark.sql("SELECT * FROM user_recs limit 10"))

In [None]:
user_recs.first()

In [None]:
user_recs.registerTempTable("als_recs_temp")

In [None]:
# seperate the value of 'recommendations' in user_recs

explode_rec = spark.sql('SELECT userId,\
                                explode(recommendations) AS MovieRec\
                                FROM als_recs_temp')
#explode_rec.show(10)


explode_rec.createOrReplaceTempView("explode_rec")

display (spark.sql("SELECT * FROM explode_rec limit 10"))

In [None]:
fianl_recs = spark.sql("SELECT userId,\
                               movieIds_and_ratings.movieId AS movieId,\
                               movieIds_and_ratings.rating AS prediction\
                               FROM als_recs_temp\
                               LATERAL VIEW explode(recommendations) exploded_table AS movieIds_and_ratings")

In [None]:
#fianl_recs.show(10)


fianl_recs.createOrReplaceTempView("fianl_recs")

display (spark.sql("SELECT * FROM fianl_recs limit 10"))

In [None]:
#Before we recommend the films, we need to filter out those users have not seen yet. Therefore, we need to choose rating = 'null' by join the movie ratings

final_rec = fianl_recs.join(movie_ratings,['userId','movieId'],'left').filter(movie_ratings.rating.isNull())
#display(final_rec)

final_rec.createOrReplaceTempView("final_rec")

display (spark.sql("SELECT * FROM final_rec LIMIT 5"))

In [None]:
final_rec.registerTempTable("final_rec")
movies_df.registerTempTable("movies_df")

### Find recommend films for userid = 575

In [None]:
%sql
SELECT userId,
       title
FROM final_rec t1
LEFT JOIN movies_df t2
ON t1.movieId = t2.movieId
WHERE t1.userId=575
LIMIT 10

### Find recommend films for userid = 273

In [None]:
%sql
SELECT userId,
       title
FROM final_rec t1
LEFT JOIN movies_df t2
ON t1.movieId = t2.movieId
WHERE t1.userId=273
LIMIT 5

## Find the similar moives for moive with id: 463, 471
You can find the similar moives based on the ALS results

In [None]:
# 1st extract productFeatures matrix
# The productFeatures matrix will be used to create an item-item collaborative filtering recommendation model
from pyspark.mllib.recommendation import ALS
import math

model_a = ALS.train(movie_ratings, rank=10, iterations=15,
                      lambda_=0.01)
model_a.productFeatures().count()

In [None]:
# look at the feature vector of movie 463
movie_feature = model_a.productFeatures().lookup(471)[0]

In [None]:
# Next define cosine similarity function to measure movie similarity
def cosineSimilarity(vec1, vec2):
  return vec1.dot(vec2) / (LA.norm(vec1) * LA.norm(vec2))

In [None]:
# Assigns the movies title file
movies_file = os.path.join("/FileStore/tables/", 'movies.csv')
movies_sc = sc.textFile(movies_file)

movies_sc_header = movies_sc.take(1)[0]

movies_data = movies_sc.filter(lambda line: line!=movies_sc_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1])).cache()

movies_titles = movies_data.map(lambda x: (int(x[0]),x[1]))

In [None]:
movies_sc_header

In [None]:
movies_data

In [None]:
movies_titles

In [None]:
# Build similarity matrix for movieid 471 using the product features matrix

similarMovies = model_a.productFeatures().map(lambda products:(products[0],
                                        cosineSimilarity(np.asarray(products[1]), movie_feature))).join(movies_titles).map(lambda r: (r[1][1], r[1][0], r[0]))

# Sort the top 10 most similar movies descendingly by cosine similarity measure
# similarMovies.takeOrdered(11, key=lambda x: -x[1])

In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml.linalg import Vectors, VectorUDT

In [None]:
a = best_model.itemFactors
# display(a.cache())

a.createOrReplaceTempView("a")

display (spark.sql("SELECT * FROM a limit 5"))

In [None]:
a.registerTempTable("movie_on_movie")

In [None]:
%sql
SELECT features FROM movie_on_movie WHERE id = 471

In [None]:
%sql
SELECT * FROM ratings WHERE movieId = 463 LIMIT 10

In [None]:
brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes",seed=12345, bucketLength=1.0)
#a.printSchema()
#change features columns into dense vector
to_vector = udf(lambda a: Vectors.dense(a), VectorUDT())
data = a.select("id", to_vector("features").alias("features"))
#data.printSchema()

In [None]:
model = brp.fit(data)
model.transform(data)

In [None]:
model.approxNearestNeighbors(data, Vectors.dense([-0.73946416, -1.03179, -0.83905196, -0.6525196, -0.3816911, -0.88358724, -0.47698575, -0.15836999, 0.36126232, -0.6475737]), 6).collect()

In [None]:
# similar moives for moive with id: 471

In [None]:
%sql
SELECT * FROM movies
WHERE movieId IN (6296,97057,3476,1059,4346)

In [None]:
%sql
SELECT features FROM movie_on_movie WHERE id = 463

In [None]:
model.approxNearestNeighbors(data, Vectors.dense([0.93929714, 0.015614069, -0.3408886, 0.3818301, 0.19762212, -1.4255825, 0.99496984, -0.065754086, 0.43202916, -0.8621043]), 6).collect()

In [None]:
# similar moives for moive with id: 463

In [None]:
%sql
SELECT * FROM movies
WHERE movieId IN (5321,49007,554,7276,7224)

Based on the above, we obtain the 5 movies that are most similar to movie with id: 471. They are:
William Shakespeare's Romeo + Juliet (1996),
"Jacob's Ladder (1990)',
'Bride of the Wind (2001)',
'Stop Making Sense (1984)',
'Mighty Wind, A (2003)',
'Kon-Tiki (2012)'.

## The Report 
### Motivation: 
I dig deep into ALS collaberative recommendation engine by using Spark MLlib and give recommendation with user based movie recommendation with a scalable matrix factorization technique.

### Step1

First I load four of datasets, namely movie, rating, links and tags and conduct a number of data explorations on these data to get some basic information, such as number of users, number of movies, number of ratings per users and per movies respectively, and distribution of movies on different genres.

### Step2

After doing data preprocessings, I build an ALS model based on the rating data to predict the ratings, which is treated as degree of preference of movies among different users. The parameters (maxIter, rank, regParam) are tuned by grid search strategy via 5-fold cross validation to obtain the model with the smallest RMSE on the validation set. This is considered to be the best model for prediction.

### Step3

By the best model obtained from the above step, making predictions of ratings on movies in the test set and calculating the RMSE to evaluate the model performance  are preparing for the next step.

### Step4  

In this step, I use the prediction results by the best model to recommend 5 movies for userID 575 and 232 respectively; and we also find 5 movies that are the most similar to movie with movieID 471 and 463 by the approximate nearest neighbor search algorithm on the movie feature vector.

### Conclusion
The RMSE of the best ALS model on the test data is 0.71, indicating the model is with good performance in predicting the ratings for movies; ALS model is able to provide both recommendations of movies based on user's preferences and also similar movies to a specific movie, which shows its effectiveness as one of most critical techniques in recommendation system. More works can be considered to further improve the model performance, such as making use of information from other data sets such as genres of movies and tag information, building ALS model incorporatng both explicit and implicit feedbacks, and try some other techniques such as KNN, Deep Learning, applying ensemble based on several methods, and so on.