<a href="https://colab.research.google.com/github/Zhangmingyang-Su/Big-Data-Project/blob/master/Movie_Recommendation_System_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Spark Moive Recommendation
In this notebook, we will use an Alternating Least Squares (ALS) algorithm with Spark APIs to predict the ratings for the movies in [MovieLens small dataset](https://grouplens.org/datasets/movielens/latest/)

In [0]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import math
%matplotlib inline

In [0]:
import os
os.environ["PYSPARK_PYTHON"] = "python3"

## Part1: Data ETL and Data Exploration

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("moive analysis") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [0]:
movies_df = spark.read.load("/FileStore/tables/movies.csv", format='csv', header = True)
ratings_df = spark.read.load("/FileStore/tables/ratings.csv", format='csv', header = True)
links_df = spark.read.load("/FileStore/tables/links.csv", format='csv', header = True)
tags_df = spark.read.load("/FileStore/tables/tags.csv", format='csv', header = True)

In [0]:
movies_df.show(5)

In [0]:
ratings_df.show(5)

In [0]:
links_df.show(5)

In [0]:
tags_df.show(5)

In [0]:
tmp1 = ratings_df.groupBy("userID").count().toPandas()['count'].min()
tmp2 = ratings_df.groupBy("movieId").count().toPandas()['count'].min()
print('For the users that rated movies and the movies that were rated:')
print('Minimum number of ratings per user is {}'.format(tmp1))
print('Minimum number of ratings per movie is {}'.format(tmp2))

In [0]:
tmp1 = sum(ratings_df.groupBy("movieId").count().toPandas()['count'] == 1)
tmp2 = ratings_df.select('movieId').distinct().count()
print('{} out of {} movies are rated by only one user'.format(tmp1, tmp2))



```
# This is formatted as code
```

## Part 2: Spark SQL and OLAP

In [0]:
movies_df.registerTempTable("movies")
ratings_df.registerTempTable("ratings")
links_df.registerTempTable("links")
tags_df.registerTempTable("tags")

### Q1: The number of Users

In [0]:
%sql select count(distinct userId) as users_number from ratings

users_number
610


### Q2: The number of Movies

In [0]:
%sql select count(movieId) as movies_number from movies

movies_number
9742


### Q3:  How many movies are rated by users? List movies not rated before

In [0]:
%sql select title, genres
from movies
where movieId not in (select m.movieId from ratings r join movies m on m.movieId = r.movieId) 

title,genres
"Innocents, The (1961)",Drama|Horror|Thriller
Niagara (1953),Drama|Thriller
For All Mankind (1989),Documentary
"Color of Paradise, The (Rang-e khoda) (1999)",Drama
I Know Where I'm Going! (1945),Drama|Romance|War
"Chosen, The (1981)",Drama
"Road Home, The (Wo de fu qin mu qin) (1999)",Drama|Romance
Scrooge (1970),Drama|Fantasy|Musical
Proof (1991),Comedy|Drama|Romance
"Parallax View, The (1974)",Thriller


In [0]:
%sql select count(distinct(m.movieId)) as Number_movies_by_users from ratings r join movies m on m.movieId = r.movieId

Number_movies_by_users
9724


### Q4: List Movie Genres

In [0]:
%sql
select title, genres from movies

title,genres
Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
Jumanji (1995),Adventure|Children|Fantasy
Grumpier Old Men (1995),Comedy|Romance
Waiting to Exhale (1995),Comedy|Drama|Romance
Father of the Bride Part II (1995),Comedy
Heat (1995),Action|Crime|Thriller
Sabrina (1995),Comedy|Romance
Tom and Huck (1995),Adventure|Children
Sudden Death (1995),Action
GoldenEye (1995),Action|Adventure|Thriller


### Q5: Movie for Each Category

In [0]:
%sql select t.tag, count(*) as number_movies from tags t join movies m on t.movieId = m.movieId group by 1 order by 2 DESC

tag,number_movies
In Netflix queue,131
atmospheric,36
thought-provoking,24
superhero,24
Disney,23
funny,23
surreal,23
religion,22
quirky,21
dark comedy,21


In [0]:
%sql 

## Part3: Spark ALS based approach for training model

*   List item
*   List item


We will use an Spark ML to predict the ratings, so let's reload "ratings.csv" using ``sc.textFile`` and then convert it to the form of (user, item, rating) tuples.

In [0]:
ratings_df.show()

In [0]:
movie_ratings=ratings_df.drop('timestamp')
movie_ratings.show()

In [0]:
# Data type convert
from pyspark.sql.types import IntegerType, FloatType
movie_ratings = movie_ratings.withColumn("userId", movie_ratings["userId"].cast(IntegerType()))
movie_ratings = movie_ratings.withColumn("movieId", movie_ratings["movieId"].cast(IntegerType()))
movie_ratings = movie_ratings.withColumn("rating", movie_ratings["rating"].cast(FloatType()))

In [0]:
movie_ratings.show()



```
# This is formatted as code
```

## Part4: ALS Model Selection and Evaluation

With the ALS model, we can use a grid search to find the optimal hyperparameters.

In [0]:
# import package
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder

In [0]:
#Create test and train set
(training,test)=movie_ratings.randomSplit([0.8,0.2])

In [0]:
#Create ALS model
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")

In [0]:
#Tune model using ParamGridBuilder
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 20]) \
            .addGrid(als.maxIter, [5, 10]) \
            .addGrid(als.regParam, [0.01]) \
            .build()

In [0]:
# Define evaluator as RMSE
RMSE_evaluator= RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction") 
print ("Num models to be tested: ", len(param_grid))

In [0]:
# Build Cross validation 
crossval = CrossValidator(estimator=als,
                          estimatorParamMaps=param_grid,
                          evaluator=RMSE_evaluator,
                          numFolds=2)

In [0]:
#Fit ALS model to training data
cvModel = crossval.fit(training)

In [0]:
#Extract best model from the tuning exercise using ParamGridBuilder
best_model = cvModel.bestModel

*italicized text*### Model testing
And finally, make a prediction and check the testing error.

In [0]:
#Generate predictions and evaluate using RMSE
predictions=best_model.transform(test)
rmse = RMSE_evaluator.evaluate(predictions)

In [0]:
#Print evaluation metrics and model parameters
print ("RMSE = "+str(rmse))
print ("**Best Model**")
print (" Rank:" + str(best_model.rank)), 
print (" MaxIter:" + str(best_model._java_obj.parent().getMaxIter())), 
print (" RegParam:" + str(best_model._java_obj.parent().getRegParam())), 

In [0]:
predictions.show()

### Model apply and see the performance

In [0]:
alldata=best_model.transform(movie_ratings)
rmse = RMSE_evaluator.evaluate(alldata)
print ("RMSE = "+str(rmse))

In [0]:
alldata.registerTempTable("alldata")

In [0]:
%sql select * from alldata

userId,movieId,rating,prediction
191,148,5.0,5.3208623
133,471,4.0,3.916665
597,471,2.0,4.0604944
385,471,4.0,3.1902223
436,471,3.0,4.0343404
602,471,4.0,4.8920736
91,471,1.0,2.4611886
409,471,3.0,3.886233
372,471,3.0,2.3894331
599,471,2.5,3.1700532


In [0]:
%sql select * from movies join alldata on movies.movieId=alldata.movieId

movieId,title,genres,userId,movieId.1,rating,prediction
148,"Awfully Big Adventure, An (1995)",Drama,191,148,5.0,5.3208623
471,"Hudsucker Proxy, The (1994)",Comedy,133,471,4.0,3.916665
471,"Hudsucker Proxy, The (1994)",Comedy,597,471,2.0,4.0604944
471,"Hudsucker Proxy, The (1994)",Comedy,385,471,4.0,3.1902223
471,"Hudsucker Proxy, The (1994)",Comedy,436,471,3.0,4.0343404
471,"Hudsucker Proxy, The (1994)",Comedy,602,471,4.0,4.8920736
471,"Hudsucker Proxy, The (1994)",Comedy,91,471,1.0,2.4611886
471,"Hudsucker Proxy, The (1994)",Comedy,409,471,3.0,3.886233
471,"Hudsucker Proxy, The (1994)",Comedy,372,471,3.0,2.3894331
471,"Hudsucker Proxy, The (1994)",Comedy,599,471,2.5,3.1700532


## Recommend moive to users with id: 575, 232. 
you can choose some users to recommend the moives

In [0]:
from pyspark.sql.functions import lit

def recommendMovies(model, user, nbRecommendations):
    # Create a Spark DataFrame with the specified user and all the movies listed in the ratings DataFrame
    dataSet = movie_ratings.select('movieId').distinct().withColumn('userId', lit(user))

    # Create a Spark DataFrame with the movies that have already been rated by this user
    moviesAlreadyRated = movie_ratings.filter(movie_ratings.userId == user).select('movieId', 'userId')

    # Apply the recommender system to the data set without the already rated movies to predict ratings
    predictions = best_model.transform(dataSet.subtract(moviesAlreadyRated)).dropna().orderBy('prediction', ascending=False).limit(nbRecommendations).select('movieId', 'prediction')

    # Join with the movies DataFrame to get the movies titles and genres
    recommendations = predictions.join(movies_df, predictions.movieId == movies_df.movieId).select(predictions.movieId, movies_df.title, movies_df.genres, predictions.prediction)

#     recommendations.show(truncate=False)
    return recommendations

In [0]:
# recommend movie to user with id 575
print('Top 10 Recommendations for user 575:')
recommendMovies(best_model, 575, 10).toPandas()

Unnamed: 0,movieId,title,genres,prediction
0,1095,Glengarry Glen Ross (1992),Drama,6.862888
1,1711,Midnight in the Garden of Good and Evil (1997),Crime|Drama|Mystery,6.109157
2,2290,Stardust Memories (1980),Comedy|Drama,6.411269
3,2387,Very Bad Things (1998),Comedy|Crime,6.177738
4,2476,Heartbreak Ridge (1986),Action|War,5.975768
5,3742,Battleship Potemkin (1925),Drama|War,5.991602
6,3836,Kelly's Heroes (1970),Action|Comedy|War,6.188545
7,5747,Gallipoli (1981),Drama|War,6.395149
8,59900,You Don't Mess with the Zohan (2008),Comedy,6.126216
9,92535,Louis C.K.: Live at the Beacon Theater (2011),Comedy,6.10204


In [0]:
# recommend movie to user with id 232
print('Top 10 Recommendations for user 232:')
recommendMovies(best_model, 232, 10).toPandas()

Unnamed: 0,movieId,title,genres,prediction
0,599,"Wild Bunch, The (1969)",Adventure|Western,4.887916
1,3972,"Legend of Drunken Master, The (Jui kuen II) (1...",Action|Comedy,4.855138
2,4437,Suspiria (1977),Horror,4.829184
3,26133,"Charlie Brown Christmas, A (1965)",Animation|Children|Comedy,5.299094
4,53123,Once (2006),Drama|Musical|Romance,4.901923
5,56782,There Will Be Blood (2007),Drama|Western,4.819464
6,91658,"Girl with the Dragon Tattoo, The (2011)",Drama|Thriller,4.922301
7,102123,This Is the End (2013),Action|Comedy,4.832106
8,142488,Spotlight (2015),Thriller,4.789491
9,148626,"Big Short, The (2015)",Drama,4.785877


## Find the similar moives for moive with id: 4782
You can find the similar moives based on the ALS results

In [0]:
# find all the item-based recommendtions
movieRecs = best_model.recommendForAllItems(10)
movieRecs.show()

In [0]:
movieRecs.registerTempTable("MR")
movies_df.registerTempTable("movies")

In [0]:
# find similar movie with movieId 4782 based on item-based recommendation. By the way, i am not sure about this.

In [0]:
%sql 
select a1.title, 
       a1.genres, 
       a.movieId_1, 
       a.movieId_2 
from movies a1 join
(select m1.movieId as movieId_1, 
       m2.movieId as movieId_2 
from MR m1, MR m2
where m1.recommendations = m2.recommendations and m1.movieId != m2.movieId) a 
on a1.movieId = a.movieId_1
where a.movieId_1 = 4782
union all
select a1.title, 
       a1.genres, 
       a.movieId_1, 
       a.movieId_2 
from movies a1 join
(select m1.movieId as movieId_1, 
       m2.movieId as movieId_2 
from MR m1, MR m2
where m1.recommendations = m2.recommendations and m1.movieId != m2.movieId) a 
on a1.movieId = a.movieId_1
where a.movieId_1 = 5456

title,genres,movieId_1,movieId_2
Sidewalks of New York (2001),Comedy|Romance,4782,5456
Wagons East (1994),Comedy|Western,5456,4782


## Write the report 
motivation
1. step1
2. step2
3. step3
4. step4  
output and conclusion

## Report Conclusion
1.First of all, loaded data to the Spark Filestore, then trying to do Data ETL and Exploration.  
2.Used Spark SQL to extract movie, rating based on content infomation.  
3.Preprocessed data such as data cleaning, converting data type.  
4.Implemented ALS model to traing the data, and using grid search method to tune the hyperparameter with cross-validation such as rank, maxiter, Regparam. Finally, we extract best model to make a prediction.  
5.After built model, using RMSE evaluator to evaluate the performance and apply the ALS model.  
6.Applied ALS model, recommending top 10 movies to specific user based on User-based method, and recommend similar movies for specific movies based on Item-based method.