### Spark Moive Recommendation 
In this notebook, we will use an Alternating Least Squares (ALS) algorithm with Spark APIs to predict the ratings for the movies in [MovieLens small dataset](https://grouplens.org/datasets/movielens/latest/)

In [0]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import math
%matplotlib inline

In [0]:
import os
os.environ["PYSPARK_PYTHON"] = "python3"

## Part1: Data ETL and Data Exploration

In [0]:
# Please donwload your data from the website then upload to databricks at first https://grouplens.org/datasets/movielens/latest/
# 参考hw1，自己完善 :) 



In [0]:
movies_df = spark.read.load("/FileStore/tables/movies.csv", format='csv', header = True)
ratings_df = spark.read.load("/FileStore/tables/ratings.csv", format='csv', header = True)
links_df = spark.read.load("/FileStore/tables/links.csv", format='csv', header = True)
tags_df = spark.read.load("/FileStore/tables/tags.csv", format='csv', header = True)

In [0]:
movies_df.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [0]:
ratings_df.show(10)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
+------+-------+------+---------+
only showing top 10 rows



In [0]:
links_df.show(5)

+-------+-------+------+
|movieId| imdbId|tmdbId|
+-------+-------+------+
|      1|0114709|   862|
|      2|0113497|  8844|
|      3|0113228| 15602|
|      4|0114885| 31357|
|      5|0113041| 11862|
+-------+-------+------+
only showing top 5 rows



In [0]:
tags_df.show(5)

+------+-------+---------------+----------+
|userId|movieId|            tag| timestamp|
+------+-------+---------------+----------+
|     2|  60756|          funny|1445714994|
|     2|  60756|Highly quotable|1445714996|
|     2|  60756|   will ferrell|1445714992|
|     2|  89774|   Boxing story|1445715207|
|     2|  89774|            MMA|1445715200|
+------+-------+---------------+----------+
only showing top 5 rows



In [0]:
tmp1 = ratings_df.groupBy("userID").count().toPandas()['count'].min()
tmp2 = ratings_df.groupBy("movieId").count().toPandas()['count'].min()
print('For the users that rated movies and the movies that were rated:')
print('Minimum number of ratings per user is {}'.format(tmp1))
print('Minimum number of ratings per movie is {}'.format(tmp2))

For the users that rated movies and the movies that were rated:
Minimum number of ratings per user is 20
Minimum number of ratings per movie is 1


In [0]:
tmp1 = sum(ratings_df.groupBy("movieId").count().toPandas()['count'] == 1)
tmp2 = ratings_df.select('movieId').distinct().count()
print('{} out of {} movies are rated by only one user'.format(tmp1, tmp2))

3446 out of 9724 movies are rated by only one user


## Part 1: Spark SQL and OLAP 

In [0]:
movies_df.registerTempTable("movies")
ratings_df.registerTempTable("ratings")
links_df.registerTempTable("links")
tags_df.registerTempTable("tags")



### Q1: The number of Users

In [0]:
q1_result=spark.sql("Select Count(Distinct userId) as Number_of_Users from ratings")
q1_result.show()

+---------------+
|Number_of_Users|
+---------------+
|            610|
+---------------+



### Q2: The number of Movies

In [0]:
q3_result_1=spark.sql("Select Count(movieId) as Number_of_Rated_Moives From movies Where movieID in (Select movieId From ratings)")
q3_result_1.show()

+----------------------+
|Number_of_Rated_Moives|
+----------------------+
|                  9724|
+----------------------+



### Q3:  How many movies are rated by users? List movies not rated before

In [0]:
q3_result_2=spark.sql("Select movieId, title From movies Where movieID not in (Select movieId From ratings)")
q3_result_2.show()

+-------+--------------------+
|movieId|               title|
+-------+--------------------+
|   1076|Innocents, The (1...|
|   2939|      Niagara (1953)|
|   3338|For All Mankind (...|
|   3456|Color of Paradise...|
|   4194|I Know Where I'm ...|
|   5721|  Chosen, The (1981)|
|   6668|Road Home, The (W...|
|   6849|      Scrooge (1970)|
|   7020|        Proof (1991)|
|   7792|Parallax View, Th...|
|   8765|This Gun for Hire...|
|  25855|Roaring Twenties,...|
|  26085|Mutiny on the Bou...|
|  30892|In the Realms of ...|
|  32160|Twentieth Century...|
|  32371|Call Northside 77...|
|  34482|Browning Version,...|
|  85565|  Chalet Girl (2011)|
+-------+--------------------+



### Q4: List Movie Genres

In [0]:
q4_result=spark.sql("Select Distinct explode(split(genres,'[|]')) as genres From movies Order by 1")
q4_result.show()

+------------------+
|            genres|
+------------------+
|(no genres listed)|
|            Action|
|         Adventure|
|         Animation|
|          Children|
|            Comedy|
|             Crime|
|       Documentary|
|             Drama|
|           Fantasy|
|         Film-Noir|
|            Horror|
|              IMAX|
|           Musical|
|           Mystery|
|           Romance|
|            Sci-Fi|
|          Thriller|
|               War|
|           Western|
+------------------+



In [0]:
%sql

### Q5: Movie for Each Category

In [0]:
q5_result_1=spark.sql("Select genres,Count(movieId) as Number_of_Moives From(Select explode(split(genres,'[|]')) as genres, movieId From movies) Group By 1 Order by 2 DESC")
q5_result_1.show()

+------------------+----------------+
|            genres|Number_of_Moives|
+------------------+----------------+
|             Drama|            4361|
|            Comedy|            3756|
|          Thriller|            1894|
|            Action|            1828|
|           Romance|            1596|
|         Adventure|            1263|
|             Crime|            1199|
|            Sci-Fi|             980|
|            Horror|             978|
|           Fantasy|             779|
|          Children|             664|
|         Animation|             611|
|           Mystery|             573|
|       Documentary|             440|
|               War|             382|
|           Musical|             334|
|           Western|             167|
|              IMAX|             158|
|         Film-Noir|              87|
|(no genres listed)|              34|
+------------------+----------------+



In [0]:
%sql 

## Part2: Spark ALS based approach for training model
We will use an Spark ML to predict the ratings, so let's reload "ratings.csv" using ``sc.textFile`` and then convert it to the form of (user, item, rating) tuples.

In [0]:
ratings_df.show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
|     1|    163|   5.0|964983650|
|     1|    216|   5.0|964981208|
|     1|    223|   3.0|964980985|
|     1|    231|   5.0|964981179|
|     1|    235|   4.0|964980908|
|     1|    260|   5.0|964981680|
|     1|    296|   3.0|964982967|
|     1|    316|   3.0|964982310|
|     1|    333|   5.0|964981179|
|     1|    349|   4.0|964982563|
+------+-------+------+---------+
only showing top 20 rows



In [0]:
movie_ratings=ratings_df.drop('timestamp')

In [0]:
# Data type convert
from pyspark.sql.types import IntegerType, FloatType
movie_ratings = movie_ratings.withColumn("userId", movie_ratings["userId"].cast(IntegerType()))
movie_ratings = movie_ratings.withColumn("movieId", movie_ratings["movieId"].cast(IntegerType()))
movie_ratings = movie_ratings.withColumn("rating", movie_ratings["rating"].cast(FloatType()))

In [0]:
movie_ratings.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|      6|   4.0|
|     1|     47|   5.0|
|     1|     50|   5.0|
|     1|     70|   3.0|
|     1|    101|   5.0|
|     1|    110|   4.0|
|     1|    151|   5.0|
|     1|    157|   5.0|
|     1|    163|   5.0|
|     1|    216|   5.0|
|     1|    223|   3.0|
|     1|    231|   5.0|
|     1|    235|   4.0|
|     1|    260|   5.0|
|     1|    296|   3.0|
|     1|    316|   3.0|
|     1|    333|   5.0|
|     1|    349|   4.0|
+------+-------+------+
only showing top 20 rows



### ALS Model Selection and Evaluation

With the ALS model, we can use a grid search to find the optimal hyperparameters.

In [0]:
# import package
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder

In [0]:
#Create test and train set
(training, test) = movie_ratings.randomSplit([0.8, 0.2])

In [0]:
#Create ALS model
als = ALS(maxIter=5, rank=10, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")

In [0]:
#Tune model using ParamGridBuilder
paramGrid = (ParamGridBuilder()
             .addGrid(als.regParam, [0.1, 0.3])  # Try fewer values, e.g., 0.1 and 0.3
             .addGrid(als.rank, [10])            # Test only rank=10 to start
             .addGrid(als.maxIter, [5])          # Fix maxIter to 5
             .build())


In [0]:
# Define evaluator as RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")

In [0]:
# Build Cross validation 
cv = CrossValidator(estimator=als, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=3)


In [0]:
# Fit the ALS model
cvModel = cv.fit(training)

In [0]:
#Extract best model from the tuning exercise using ParamGridBuilder
bestModel=cvModel.bestModel

### Model testing
And finally, make a prediction and check the testing error.

In [0]:
#Generate predictions and evaluate using RMSE
predictions=bestModel.transform(test)
rmse = evaluator.evaluate(predictions)

In [0]:
#Print evaluation metrics and model parameters
print ("RMSE = "+str(rmse))
print ("**Best Model**")
print (" Rank:"), 
print (" MaxIter:"), 
print (" RegParam:"), 

RMSE = 0.9146168868010629
**Best Model**
 Rank:
 MaxIter:
 RegParam:
Out[39]: (None,)

In [0]:
predictions.show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|     1|      6|   4.0|  4.414794|
|     1|     50|   5.0|   4.81697|
|     1|     70|   3.0|  4.018732|
|     1|    223|   3.0| 4.4153004|
|     1|    333|   5.0| 4.1544886|
|     1|    367|   4.0| 3.5648677|
|     1|    441|   4.0| 4.4232807|
|     1|    552|   4.0| 3.5246463|
|     1|    590|   4.0| 4.2634654|
|     1|    661|   5.0| 3.7678907|
|     1|    943|   4.0| 4.4622855|
|     1|   1009|   3.0| 3.3845632|
|     1|   1023|   5.0| 3.8209221|
|     1|   1031|   5.0| 3.9315696|
|     1|   1032|   5.0| 3.7770712|
|     1|   1090|   4.0|  4.574177|
|     1|   1127|   4.0|  3.861291|
|     1|   1197|   5.0|  4.684467|
|     1|   1198|   5.0|  4.681014|
|     1|   1213|   5.0| 4.6583586|
+------+-------+------+----------+
only showing top 20 rows



### Model apply and see the performance

In [0]:
alldata=bestModel.transform(movie_ratings)
rmse = evaluator.evaluate(alldata)
print ("RMSE = "+str(rmse))

RMSE = 0.8303056903433411


In [0]:
alldata.registerTempTable("alldata")

In [0]:
%sql select * from alldata

userId,movieId,rating,prediction
1,1,4.0,4.2983937
1,3,4.0,3.5502489
1,6,4.0,4.414794
1,47,5.0,4.4428205
1,50,5.0,4.81697
1,70,3.0,4.018732
1,101,5.0,4.3310103
1,110,4.0,4.4655643
1,151,5.0,3.9751935
1,157,5.0,3.4518933


In [0]:
%sql select * from movies join alldata on movies.movieId=alldata.movieId

movieId,title,genres,userId,movieId.1,rating,prediction
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy,1,1,4.0,4.2983937
3,Grumpier Old Men (1995),Comedy|Romance,1,3,4.0,3.5502489
6,Heat (1995),Action|Crime|Thriller,1,6,4.0,4.414794
47,Seven (a.k.a. Se7en) (1995),Mystery|Thriller,1,47,5.0,4.4428205
50,"Usual Suspects, The (1995)",Crime|Mystery|Thriller,1,50,5.0,4.81697
70,From Dusk Till Dawn (1996),Action|Comedy|Horror|Thriller,1,70,3.0,4.018732
101,Bottle Rocket (1996),Adventure|Comedy|Crime|Romance,1,101,5.0,4.3310103
110,Braveheart (1995),Action|Drama|War,1,110,4.0,4.4655643
151,Rob Roy (1995),Action|Drama|Romance|War,1,151,5.0,3.9751935
157,Canadian Bacon (1995),Comedy|War,1,157,5.0,3.4518933


## Recommend moive to users with id: 575, 232. 
you can choose some users to recommend the moives 

In [0]:
# Install and import Koalas if needed
!pip install koalas
import databricks.koalas as ks


You should consider upgrading via the '/local_disk0/.ephemeral_nfs/envs/pythonEnv-ad835de6-aba4-4ad2-b73e-ffe98ed5af83/bin/python -m pip install --upgrade pip' command.[0m


In [0]:
userRecs = bestModel.recommendForAllUsers(10)

# Convert PySpark DataFrames to Koalas DataFrames
userRecs_ks = userRecs.to_koalas()
movies_ks = movies_df.to_koalas()

def movieRecommendation(inputId):
  recs_list=[]
  for recs in userRecs_ks.loc[str(inputId), 'recommendations']:
    recs_list.append(str(recs[0]))
  return (movies_ks[movies_ks['movieId'].isin(recs_list)])


In [0]:
# Get top movie recommendations for user with ID 148
recommended_movies_148 = movieRecommendation(85)
print("Top recommendations for user 148:")
print(recommended_movies_148)

[0;31m---------------------------------------------------------------------------[0m
[0;31mKeyError[0m                                  Traceback (most recent call last)
File [0;32m<command-3567679698529084>:2[0m
[1;32m      1[0m [38;5;66;03m# Get top movie recommendations for user with ID 148[39;00m
[0;32m----> 2[0m recommended_movies_148 [38;5;241m=[39m movieRecommendation([38;5;241m85[39m)
[1;32m      3[0m [38;5;28mprint[39m([38;5;124m"[39m[38;5;124mTop recommendations for user 148:[39m[38;5;124m"[39m)
[1;32m      4[0m [38;5;28mprint[39m(recommended_movies_148)

File [0;32m<command-3567679698529085>:10[0m, in [0;36mmovieRecommendation[0;34m(inputId)[0m
[1;32m      8[0m recs_list[38;5;241m=[39m[]
[1;32m      9[0m [38;5;28;01mfor[39;00m recs [38;5;129;01min[39;00m userRecs_ks[38;5;241m.[39mloc[[38;5;28mstr[39m(inputId), [38;5;124m'[39m[38;5;124mrecommendations[39m[38;5;124m'[39m]:
[0;32m---> 10[0m   recs_list[38;5;241m.[39mapp

In [0]:
print(userRecs_ks.index.unique())


Int64Index([148, 463, 471, 496, 243, 392, 540,  31, 516,  85,
            ...
            521,  36, 187, 208, 315, 393,  89, 401, 422, 517],
           dtype='int64', name='userId', length=610)


## Find the similar moives for moive with id: 463, 471
You can find the similar moives based on the ALS results

In [0]:
itemFactors=bestModel.itemFactors.to_koalas()

In [0]:
def similarMovies(inputId, matrix='cosine_similarity'):
  try:
    movieFeature=itemFactors.loc[itemFactors.id==str(inputId),'features'].to_numpy()[0]
  except:
    return 'There is no movie with id ' + str(inputId)
  
  if matrix=='cosine_similarity':
    similarMovie=pd.DataFrame(columns=('movieId','cosine_similarity'))
    for id,feature in itemFactors.to_numpy():
      cs=np.dot(movieFeature,feature)/(np.linalg.norm(movieFeature) * np.linalg.norm(feature))
      similarMovie=similarMovie.append({'movieId':str(id), 'cosine_similarity':cs}, ignore_index=True)
    similarMovie_cs=similarMovie.sort_values(by=['cosine_similarity'],ascending = False)[1:11]
    joint=similarMovie_cs.merge(movies_ks.to_pandas(), left_on='movieId', right_on = 'movieId', how = 'inner')
  if matrix=='euclidean_distance':
    similarMovie=pd.DataFrame(columns=('movieId','euclidean_distance'))
    for id,feature in itemFactors.to_numpy():
      ed=np.linalg.norm(np.array(movieFeature)-np.array(feature))
      similarMovie=similarMovie.append({'movieId':str(id), 'euclidean_distance':ed}, ignore_index=True)
    similarMovie_ed=similarMovie.sort_values(by=['euclidean_distance'])[1:11]
    joint=similarMovie_ed.merge(movies_ks.to_pandas(), left_on='movieId', right_on = 'movieId', how = 'inner')
  return joint[['movieId','title','genres']]

In [0]:
similarMovies(463)

Out[80]: 'There is no movie with id 463'

In [0]:
print('Similar movies based on cosine similarity matrix are as follows.')
similarMovies(471, 'cosine_similarity')

Similar movies based on cosine similarity matrix are as follows.


  similarMovie=similarMovie.append({'movieId':str(id), 'cosine_similarity':cs}, ignore_index=True)
  similarMovie=similarMovie.append({'movieId':str(id), 'cosine_similarity':cs}, ignore_index=True)
  similarMovie=similarMovie.append({'movieId':str(id), 'cosine_similarity':cs}, ignore_index=True)
  similarMovie=similarMovie.append({'movieId':str(id), 'cosine_similarity':cs}, ignore_index=True)
  similarMovie=similarMovie.append({'movieId':str(id), 'cosine_similarity':cs}, ignore_index=True)
  similarMovie=similarMovie.append({'movieId':str(id), 'cosine_similarity':cs}, ignore_index=True)
  similarMovie=similarMovie.append({'movieId':str(id), 'cosine_similarity':cs}, ignore_index=True)
  similarMovie=similarMovie.append({'movieId':str(id), 'cosine_similarity':cs}, ignore_index=True)
  similarMovie=similarMovie.append({'movieId':str(id), 'cosine_similarity':cs}, ignore_index=True)
  similarMovie=similarMovie.append({'movieId':str(id), 'cosine_similarity':cs}, ignore_index=True)
  similarM

Unnamed: 0,movieId,title,genres
0,475,In the Name of the Father (1993),Drama
1,3479,Ladyhawke (1985),Adventure|Fantasy|Romance
2,224,Don Juan DeMarco (1995),Comedy|Drama|Romance
3,1305,"Paris, Texas (1984)",Drama|Romance
4,2147,"Clan of the Cave Bear, The (1986)",Adventure|Drama|Fantasy
5,3194,"Way We Were, The (1973)",Drama|Romance
6,46967,Scoop (2006),Comedy|Fantasy|Mystery
7,49347,Fast Food Nation (2006),Drama
8,27821,"Interpreter, The (2005)",Drama|Thriller
9,345,"Adventures of Priscilla, Queen of the Desert, ...",Comedy|Drama



## Write the report 
motivation: As artificial intelligence pervails in internet industry, more and more ecommerce platforms start to characterize their recommendation systems in order to provide better service. Collaborative filtering is one of the most popular recommendation algorithm which can be implemented with Alternating Least Squares (ALS) model in Spark ML. It would be a interesting and significant attempt to create a movie recommender for movie rating sites users.

step1. Data ETL and Data Exploration

I firstly loaded the rating data, established corresponding spark dataframes and checked out the basic information of the dataset.

step2. Online Analytical Processing

I performed analysis on the dataset from multi angle and gained some intuitive insights.

step3. Model Selection

I built up the ALS model and tuned the hyperparameter using 5-fold cross-validation, applying the optimal hyperparameters on the best final model.

step4. Model Evaluation

Finally, I evaluated the recommendation model by measuring the root-mean-square error of rating prediction on the testset.

step5. Model Application: Recommend moive to users

For given users, I wrote a function to dirctly recommend 10 movies which they may be interested in based on the model.

step6. Model Application: Find the similar moives

I also applid the ALS results on finding the similar moives for a given movie. I used two matrix to evaluate the similarity between movies: cosine similarity and euclidean distance, which can be used sperately depends on situations.

Output and Conclusion

In this project, I built a ALS model with Spark APIs based on MovieLens dataset, predicted the ratings for the movies and made specific recommendation to users accordingly. The RMSE of the best model is approximately 0.88.