### Spark Moive Recommendation
In this notebook, we will use an Alternating Least Squares (ALS) algorithm with Spark APIs to predict the ratings for the movies in [MovieLens small dataset](https://grouplens.org/datasets/movielens/latest/)

To recommend movies to a new user, we can let him / her choose speific interesting genres first, and then give him / her movies recommendation in those genres.

In [3]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, FloatType
import math
dbutils.library.installPyPI("koalas")
import databricks.koalas as ks
from numpy import dot
from numpy.linalg import norm
%matplotlib inline

In [4]:
import os
os.environ["PYSPARK_PYTHON"] = "python3"

## Part1: Data ETL and Data Exploration

In [6]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("moive analysis") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [7]:
movies_df = spark.read.load("/FileStore/tables/movies.csv", format='csv', header = True)
ratings_df = spark.read.load("/FileStore/tables/ratings.csv", format='csv', header = True)
links_df = spark.read.load("/FileStore/tables/links.csv", format='csv', header = True)
tags_df = spark.read.load("/FileStore/tables/tags.csv", format='csv', header = True)

In [8]:
movies_df.show(5)

In [9]:
ratings_df.show(5)

In [10]:
links_df.show(5)

In [11]:
tags_df.show(5)

In [12]:
print("If there is missing data in each table?")
print('movie_df: {}'.format(movies_df.count() == movies_df.na.drop().count()))
print('ratings_df: {}'.format(ratings_df.count() == ratings_df.na.drop().count()))
print('links_df: {}'.format(links_df.count() == links_df.na.drop().count()))
print('tags_df: {}'.format(tags_df.count() == tags_df.na.drop().count()))

In [13]:
tmp1 = ratings_df.groupBy("userID").count().toPandas()['count'].min()
tmp2 = ratings_df.groupBy("movieId").count().toPandas()['count'].min()
print('For the users that rated movies and the movies that were rated:')
print('Minimum number of ratings per user is {}'.format(tmp1))
print('Minimum number of ratings per movie is {}'.format(tmp2))

In [14]:
tmp1 = sum(ratings_df.groupBy("movieId").count().toPandas()['count'] == 1)
tmp2 = ratings_df.select('movieId').distinct().count()
print('{} out of {} movies are rated by only one user'.format(tmp1, tmp2))

In [15]:
movies_df.registerTempTable("movies")
ratings_df.registerTempTable("ratings")
links_df.registerTempTable("links")
tags_df.registerTempTable("tags")

### Q1: The number of Users

In [17]:
user_count=ratings_df.select('userID').distinct().count()
print('The total number of distinct user is: {}'.format(user_count))

### Q2: The number of Movies

In [19]:
movie_count=movies_df.select('movieID').distinct().count()
print('The total number of distinct movie is: {}'.format(movie_count))

### Q3:  How many movies are rated by users? List movies not rated before

In [21]:
# numbers of movies that are rated by users
rated = ratings_df.select('movieID')\
.where('rating is not null')\
.withColumnRenamed('movieId', 'ratedId').distinct()
rated_count = rated.distinct().count()
print('{} movies are rated by users.'.format(rated_count))

In [22]:
#List movies that are not rated by users
unrated=movies_df.join(rated, movies_df.movieId==rated.ratedId, "left_outer")\
.where('ratedId is null')\
.select('movieId','title','genres')
unrated.show()

In [23]:
unrated_count = unrated.distinct().count()
percent_of_unrated = round(unrated_count/movie_count*100,2)
print('{}% of movies are not rated by users.'.format(percent_of_unrated))

In [24]:
# count of different rating scores
movie_dis = ratings_df.groupBy('movieId').count()
movie_dis.show()

In [25]:
#Visualize the pertentile distribution of movies with regard to rating numbers
fig = plt.figure(figsize=(8,6))
sns.set(style="whitegrid")
sns.distplot(movie_dis.toPandas()['count'], color='green', hist=True)
plt.title('Distribution of Movie Ratings') 
plt.ylabel('percentage')
plt.xlabel('number of ratings per movie')
plt.show()

The movie rating numbers distribution per movie is left-skewed, meaning most of the movies have very small amount of ratings (under around 100).

In [27]:
# count of different rating scores
user_dis = ratings_df.groupBy('userId').count()
user_dis.toPandas().head(20)

Unnamed: 0,userId,count
0,296,27
1,467,22
2,125,360
3,451,34
4,7,152
5,51,359
6,124,50
7,447,78
8,591,54
9,307,975


In [28]:
#Visualize the pertentile distribution of users with regard to rating numbers
fig = plt.figure(figsize=(8,6))
sns.set(style="whitegrid")
sns.distplot(user_dis.toPandas()['count'], color='salmon', hist=True)
plt.title('Distribution of Movie Ratings') 
plt.ylabel('percentage')
plt.xlabel('number of ratings per movie')
plt.show()

The movie rating numbers per user distribution is left skewed, most users have rating numbers under around 200.

In [30]:
#Average rating of each movie
temp = spark.sql(
'''select movieId as movie_id, avg(rating) as average_rating from ratings
group by movieId''').toPandas()
temp.head(10)

Unnamed: 0,movie_id,average_rating
0,296,4.197068
1,1090,3.984127
2,115713,3.910714
3,3210,3.47619
4,88140,3.546875
5,829,2.666667
6,2088,2.5
7,2294,3.244444
8,4821,3.1
9,48738,3.975


### Q4: List Movie Genres

In [32]:
movies_df.select('genres').distinct().show()


In [33]:
#Manipulate the genre colummn
movies_temp=movies_df.where('genres is not null')\
.select('title', F.explode(F.split('genres', '\|'))\
.alias('genres'))
movies_temp.toPandas().head(10)

Unnamed: 0,title,genres
0,Toy Story (1995),Adventure
1,Toy Story (1995),Animation
2,Toy Story (1995),Children
3,Toy Story (1995),Comedy
4,Toy Story (1995),Fantasy
5,Jumanji (1995),Adventure
6,Jumanji (1995),Children
7,Jumanji (1995),Fantasy
8,Grumpier Old Men (1995),Comedy
9,Grumpier Old Men (1995),Romance


### Q5: Movie for Each Category

In [35]:
movies_pdf = movies_temp.toPandas()
category = pd.DataFrame((movies_pdf.genres).value_counts()).reset_index()
category.columns = ['Category','Number']
category.head(10)

Unnamed: 0,Category,Number
0,Drama,4361
1,Comedy,3756
2,Thriller,1894
3,Action,1828
4,Romance,1596
5,Adventure,1263
6,Crime,1199
7,Sci-Fi,980
8,Horror,978
9,Fantasy,779


In [36]:
plt.figure(figsize=(60,10))
tmp=sns.set(style="whitegrid")
tmp=sns.barplot(x='Category', y='Number', data=category)
tmp.set_title('Number of movies per category')
display(tmp)

In [37]:
movies_genres=movies_temp.groupBy('genres')\
.agg(F.collect_list("title")\
.alias('titles'))
genre_set = set(movies_genres.toPandas()['genres'])
genre_set

## Part2: Spark ALS based approach for training model
We will use an Spark ML to predict the ratings, so let's reload "ratings.csv" using ``sc.textFile`` and then convert it to the form of (user, item, rating) tuples.

In [39]:
ratings_df.show()

In [40]:
#Convert data types
movie_ratings=ratings_df.drop('timestamp')
movie_ratings=movie_ratings.withColumn('userId',movie_ratings['userId'].cast(IntegerType()))
movie_ratings=movie_ratings.withColumn('movieId',movie_ratings['movieId'].cast(IntegerType()))
movie_ratings=movie_ratings.withColumn('rating',movie_ratings['rating'].cast(FloatType()))

In [41]:
movie_ratings.show()

### ALS Model Selection and Evaluation

With the ALS model, we can use a grid search to find the optimal hyperparameters.

In [43]:
# import package
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder

In [44]:
#Create test and train set
(training,test)=movie_ratings.randomSplit([0.8,0.2])
#Create ALS model
als = ALS(maxIter=5, rank=10, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
#Tune model using ParamGridBuilder
paramGrid = ParamGridBuilder()\
            .addGrid(als.regParam, [0.03,0.001,0.1])\
            .addGrid(als.maxIter, [3,5,10])\
            .addGrid(als.rank, [5,10,15])\
            .build()
# Define evaluator as RMSE
evaluator = RegressionEvaluator(metricName='rmse', labelCol = 'rating', predictionCol = 'prediction')
# Build Cross validation 
crossval = CrossValidator(estimator=als,
                         estimatorParamMaps=paramGrid,
                         evaluator=evaluator,
                         numFolds=5)
#Fit ALS model to training data
model = als.fit(training)
#Extract best model from the tuning exercise using ParamGridBuilder
cvModel = crossval.fit(training)
predictions = cvModel.transform(training)
rmse = evaluator.evaluate(predictions)
print('Root-mean-squre error = ' + str(rmse))

### Model testing
And finally, make a prediction and check the testing error.

In [46]:
#Generate predictions and evaluate using RMSE
best_model = cvModel.bestModel
predictions=best_model.transform(test)
rmse = evaluator.evaluate(predictions)

In [47]:
#Print evaluation metrics and model parameters
print ("RMSE = "+str(rmse))
print ("**Best Model**")
print (" Rank:"+str(best_model.rank))
print (" MaxIter:"+str(best_model._java_obj.parent().getMaxIter()))
print (" RegParam:"+str(best_model._java_obj.parent().getRegParam()))

In [48]:
predictions.show()

### Model apply and see the performance

In [50]:
#Fit the best model with the full dataset
alldata=best_model.transform(movie_ratings)
rmse = evaluator.evaluate(alldata)
print ("RMSE = "+str(rmse))

In [51]:
alldata=alldata.withColumn('rounded_predictions', F.round(F.col('prediction')*2)/2)
alldata.registerTempTable("alldata")

In [52]:
#Show the predictions of all movies' ratings
alldata.show()

In [53]:
# Join the prediction result with the movies dataset
all_movies_rating = alldata.join(movies_df,alldata.movieId == movies_df.movieId, 'left')
all_movies_rating.show()

In [54]:
#Rank the movie by the movie's total rating counts
movie_rating = ratings_df.groupBy('movieId').count()\
.orderBy('count', ascending = False).toPandas()
movie_rating.head(10)

In [55]:
movie_rating.tail(10)

In [56]:
#plot actual and predicted ratings for movie with most number of ratings - take user 356 as an example
movie_top = alldata.where('movieId=356')\
                       .select('rating','rounded_predictions')\
                       .orderBy('rating')
rmse_movie = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='rounded_predictions')
print(rmse_movie.evaluate(movie_top))
movie_top.show()

In [57]:
movie_top = movie_top.toPandas()
fig = plt.figure()
plt.plot(movie_top.index, movie_top.rating, 'o', label='rating')
plt.plot(movie_top.index, movie_top.rounded_predictions, '.', label='rounded_predictions')
plt.xlabel('index')
plt.ylabel('ratings')
plt.legend()
plt.title('Actual & predicted ratings for movie with 329 ratings')
display(fig)

In [58]:
rating_rank = ratings_df.groupBy('movieId').count()

In [59]:
#plot actual and predicted ratings for movie with 1 rating
movie_least = alldata.join(rating_rank, rating_rank.movieId==alldata.movieId, 'left')\
.where('count=1').select('rating', 'rounded_predictions').orderBy('rating')
rmse_movie2 = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='rounded_predictions')
print(rmse_movie2.evaluate(movie_least))

In [60]:
movie_least = movie_least.toPandas()
fig = plt.figure()
plt.plot(movie_least.index, movie_least.rating, 'o', label='rating')
plt.plot(movie_least.index, movie_least.rounded_predictions, '-', label='rounded_predictions')
plt.xlabel('index')
plt.ylabel('ratings')
plt.legend()
plt.title('Actual & predicted ratings for movie with 1 ratings')
display(fig)

In [61]:
#Ranks the user by the user's total rating counts
user_rating = ratings_df.groupBy('userId').count()\
.orderBy('count', ascending = False).toPandas()
user_rating.head(10)

In [62]:
user_rating.tail(10)

In [63]:
#plot actual and predicted ratings for movie of the most active user - take user 414 as an example
user_top = alldata.where('userId=414')\
                       .select('rating','rounded_predictions')\
                       .orderBy('rating')
rmse_movie = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='rounded_predictions')
print(rmse_movie.evaluate(user_top))
user_top.show()

In [64]:
user_top  = user_top.toPandas()
fig = plt.figure()
plt.plot(user_top.index, user_top.rating, 'o', label='rating')
plt.plot(user_top.index, user_top.rounded_predictions, '.', label='rounded_predictions')
plt.xlabel('index')
plt.ylabel('ratings')
plt.legend()
plt.title('Actual & predicted ratings for the most active user')
display(fig)

In [65]:
#plot actual and predicted ratings for movie of the least active user - take user 189 as an example
user_least = alldata.where('userId=189')\
                       .select('rating','rounded_predictions')\
                       .orderBy('rating')
rmse_movie = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='rounded_predictions')
print(rmse_movie.evaluate(user_least))
user_least.show()

In [66]:
user_least  = user_least .toPandas()
fig = plt.figure()
plt.plot(user_least .index, user_least .rating, 'o', label='rating')
plt.plot(user_least .index, user_least .rounded_predictions, '-', label='rounded_predictions')
plt.xlabel('index')
plt.ylabel('ratings')
plt.legend()
plt.title('Actual & predicted ratings for the least active user')
display(fig)

In [67]:
#Plot the mse of movie ratings by movies of the same rating numbers
alldata_df = alldata.toPandas()
rating_num = alldata_df.groupby(['movieId']).size().reset_index(name='number_of_ratings')
rating_movie = alldata_df.merge(rating_num, on=['movieId'])
rating_movie['MSE']=(rating_movie['rating']-rating_movie['prediction'])**2
rating_movie=rating_movie[['number_of_ratings','MSE']].groupby(by=['number_of_ratings'])['MSE'].mean().reset_index()
rating_movie['Percentile_rank']=rating_movie.number_of_ratings.rank(pct=True)
rating_movie['Percentile_rank']=rating_movie['Percentile_rank'].apply(lambda x: round(x,2))
rating_movie.head(10)


In [68]:
fig = plt.figure()
temp=sns.scatterplot(x='Percentile_rank', y='MSE', data=rating_movie,hue='MSE', legend=False, palette='Blues')
temp.set_title('mean squred error of movies with the same rating numbers')
display(temp)

In [69]:
#Plot the mse of movie ratings by user of the same rating numbers
rating_num = alldata_df.groupby(['userId']).size().reset_index(name='number_of_ratings')
rating_user = alldata_df.merge(rating_num, on=['userId'])
rating_user['MSE']=(rating_user['rating']-rating_user['prediction'])**2
rating_user=rating_user[['number_of_ratings','MSE']].groupby(by=['number_of_ratings'])['MSE'].mean().reset_index()
rating_user['Percentile_rank']=rating_user.number_of_ratings.rank(pct=True)
rating_user['Percentile_rank']=rating_user['Percentile_rank'].apply(lambda x: round(x,2))
rating_user.head(10)

In [70]:
fig = plt.figure()
tmp=sns.scatterplot(x='Percentile_rank', y='MSE', data=rating_user,hue='MSE', legend=False, palette='Blues')
tmp.set_title('mean squred error of users with the same rating numbers')
display(tmp)

## Recommend moive to users with id: 575, 232. 
you can choose some users to recommend the moives

In [72]:
all_movies_rating.where(all_movies_rating.userId==575)\
.select('userId', 'rating', 'title', F.explode(F.split('genres', '\|'))\
.alias('genres'))\
.orderBy('rating', ascending = False).show()
#The user 575 gives the highest ratings to movies of genre Comedy, Dram, Crime and Thriller.

In [73]:
all_movies_rating.where(all_movies_rating.userId==232)\
.select('userId', 'rating', 'title', F.explode(F.split('genres', '\|'))\
.alias('genres'))\
.orderBy('rating', ascending = False).show()
#The user 232 gives the highest ratings to movies of genre Adventure, Crime, IMAX, Sci-Fi, Animation, Fantasy, Action, Drama....

In [74]:
#Generate the top-10 movie recommendations for all users
recommend_for_all=best_model.recommendForAllUsers(10)

In [75]:
#Recommend top-10 movies to user 575
user=575

recommend_for_all=best_model.recommendForAllUsers(10)

recommend_top10=recommend_for_all.where(recommend_for_all.userId==user)\
.select('userId', F.explode('recommendations'))\
.select('userId', F.col('col').movieId.alias('movieId'), F.col('col').rating.alias('rating'))

recommend_top10=recommend_top10.join(movies_df, recommend_top10.movieId==movies_df.movieId, 'left')\
.select('userId', movies_df.movieId, 'title','genres')

recommend_top10.show()

In [76]:
#Recommend top-10 movies to user 232
user=232

recommend_for_all=best_model.recommendForAllUsers(10)

recommend_top10=recommend_for_all.where(recommend_for_all.userId==user)\
.select('userId', F.explode('recommendations'))\
.select('userId', F.col('col').movieId.alias('movieId'), F.col('col').rating.alias('rating'))

recommend_top10=recommend_top10.join(movies_df, recommend_top10.movieId==movies_df.movieId, 'left')\
.select('userId', movies_df.movieId, 'title','genres')

recommend_top10.show()

## Find the similar moives for moive with id: 463, 471
You can find the similar moives based on the ALS results

In [78]:
movie_factors = best_model.itemFactors
movie_factors = movie_factors.toPandas()
movie_factors.head(10)

In [79]:

movie_factors.loc[movie_factors['id'] == 471, 'features']



In [80]:
list(movie_factors['features'][0])

In [81]:
movies_kdf = movies_df.toPandas()
movies_kdf.head(10)

In [82]:
def similar_movies(features, movieId):
  try:
    target_id_feature = movie_factors.loc[movie_factors['id'] == movieId, 'features'].to_numpy()[0]
  except:
    return 'There is no mivie with id ' + str(movieId)
  similarities=[]
  for feature in movie_factors['features']:
    similarity=float(np.dot(target_id_feature, feature)/(np.linalg.norm(target_id_feature)*np.linalg.norm(feature)))
    similarities.append(similarity)
  ks_similarity = ks.DataFrame({'similarity' : similarities}, index = movie_factors.id.to_numpy()).reset_index().toPandas()
  ks_similarity.columns = ['movieId','similarity']
  ks_similarity['movieId'] = ks_similarity['movieId'].apply(lambda x: str(x))
  movie_similarity = movies_kdf.merge(ks_similarity, on = ['movieId']).sort_values(by='similarity',ascending = False).head(11)
  return movie_similarity
  
#.sort_values(by='similarity',ascending = False).head(11)

In [83]:
#Find the top10 similar movies to movie 463, including itself
similar_movies(movie_factors['features'], 463)



In [84]:
#Find the top10 similar movies to movie 471, including itself
similar_movies(movie_factors['features'], 471)

#Report
**Motivation:** I am a big fan of movie and I like rating movies and writing movies on social media apps. While I am using those movie apps, I realized that the more I rated movies, the more accurate the system will recommend good movies to me that fit my taste. I think this is a super fascinating feature and I would love to know how this recommendation really works, so that I could have the ability to build a recommendation system in the future to provide customized user experiences and help the company increase user loyalty.

**Data and Method：** In this work, ALS is used to build a recommendation system based on data from grouplens (Small: 100,000 ratings and 3,600 tag applications applied to 9,000 movies by 600 users. Last updated 9/2018.) One thing to note is that data sparsity is an important challenge within the industry. There can be a lot of users who only have one or two reviews and ratings, and movies which only have a few rating from users. ALS, as a matrix factorization based methodology, is able to increase the scalability and prediction accuracy in this regard.

**Step 1: Data Exploration and Feature Engineering**
<br>
(1) There are 1742 movies in total and 0.18% of them are not rated by any user (610 in total).<br>
(2) The rating number distributions per movie and per user are pretty left-skewed. But there are some special cases where movies and users have an exceptionally large amount ratings. The impact of this will be analyzed in the modeling section.<br>
(3) In terms of movie genres, 'Drama','Comody' and 'Thriller' are the top 3 movie genres in the dataset.<br>
(4) Manipulate the data types, split movie genres into list format for easier usage.<br>

**Step 2: Model Training and Evaluation**
<br>
(1) Build a recommendation model based on historical movie rating data and solve it by alternating least squares model, based on matrix factorization and collaborative filtering method.<br>
(2) Set the cold start parameter in ALS model to 'drop' - in this way, I dropped any rows in the dataset that contain 'NaN' values. The evaluation metric was computed over the non-NaN data so the result would be valid.<br>
(2) Parameters tuned in the model are rank (how many latent features for each user and movie), regParam (regulation paramter), maxIter (max iteration times).<br>
(3) Tuned the model parameters based on grid search and selected the best model with 5-fold cross validation, using rmse (mean root squared errors) as the performance evaluation metric.<br>
(4) The best model has 5 latent features, with rmse = 0.64 on the training dataset and 0.88 on the testing dataset. <br>

**Step 3: Model Testing and Analysis**
<br>
(1) Applied the best model onto the full dataset. The rmse is 0.69.
(2) Explored the model prediction accuracy by visualizing the prediction vs. actual rating of users and movies mith the most/least rating numbers. Then, visualze the mse (mean squared error) of predictions with regard to rating numbers per user/movie.<br>
(3) Conclusion of the above analysis:<br>
Active users give more accurate predictions - this is actually quite reasonable in terms of intuition.
Movie's prediction accuracy is relatively stalbe, and there isn't a downward trend of MSE as the number of rating increases. But intuitively, we would expect that movies with more rating information give more accurate predictions. <br>
If we think about it in this way - very popular movies will be rated by all kinds of users, among which some are active users who have much information on the app, and some are non-active users who have very limited amount of rating record and information on the app. The latter portion can introduce bias into popular movie's predictions, and this may partially explain why movies with more ratings do not offer significantly more accurate predictions.<br>
In this exercise, I don't have a super large dataset, so in order to protect the sufficiency of data for modeling, I did't exlcude any user information. Alternatetively in the next time, I could do a knn clustering on users and movies to decrease the dimension of data before training the model, which would partially solve the sparsity issue. <br>

**Step 4: Model Application on Movie Recommendation**
<br>
(1) Recommended the top 10 movies to certain users, based on user similarity (collaborative filtering) in terms of the 5 latent features of movie defined by the model.<br>
(2) Identified the top 10 movies that are similar to certain movies, based on their cosine similarity calculated from the scores of their 5 lantent features.