# **Movie Recommendation Engine Based on ALS Method in Apache Spark**
In this notebook, I will use an Alternating Least Squares (ALS) algorithm with Spark APIs to predict the ratings for the movies in MovieLens small dataset.


## Data ETL and Data Exploration

In [None]:
!ls

sample_data


In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop2.7.tgz
!tar xf spark-3.2.1-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
!ls

sample_data  spark-3.2.1-bin-hadoop2.7	spark-3.2.1-bin-hadoop2.7.tgz


In [None]:
spark.version

'3.2.1'

In [None]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import math
# Magic function that renders the figure in a notebook
%matplotlib inline

os.environ["PYSPARK_PYTHON"] = "python3"

In [None]:
!wget --no-check-certificate https://files.grouplens.org/datasets/movielens/ml-latest-small.zip
!unzip ml-latest-small.zip
!ls ml-latest-small/

--2022-02-12 02:12:39--  https://files.grouplens.org/datasets/movielens/ml-latest-small.zip
Resolving files.grouplens.org (files.grouplens.org)... 128.101.65.152
Connecting to files.grouplens.org (files.grouplens.org)|128.101.65.152|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 978202 (955K) [application/zip]
Saving to: ‘ml-latest-small.zip’


2022-02-12 02:12:39 (8.33 MB/s) - ‘ml-latest-small.zip’ saved [978202/978202]

Archive:  ml-latest-small.zip
   creating: ml-latest-small/
  inflating: ml-latest-small/links.csv  
  inflating: ml-latest-small/tags.csv  
  inflating: ml-latest-small/ratings.csv  
  inflating: ml-latest-small/README.txt  
  inflating: ml-latest-small/movies.csv  
links.csv  movies.csv  ratings.csv  README.txt	tags.csv


In [None]:
movies_df = spark.read.csv("./ml-latest-small/movies.csv", inferSchema=True, header=True)
ratings_df = spark.read.csv("./ml-latest-small/ratings.csv", inferSchema=True, header=True)
tags_df = spark.read.csv("./ml-latest-small/tags.csv", inferSchema=True, header=True)
links_df = spark.read.csv("./ml-latest-small/links.csv", inferSchema=True, header=True)

In [None]:
movies_df.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [None]:
ratings_df.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [None]:
links_df.show(5)

+-------+------+------+
|movieId|imdbId|tmdbId|
+-------+------+------+
|      1|114709|   862|
|      2|113497|  8844|
|      3|113228| 15602|
|      4|114885| 31357|
|      5|113041| 11862|
+-------+------+------+
only showing top 5 rows



In [None]:
tags_df.show(5)

+------+-------+---------------+----------+
|userId|movieId|            tag| timestamp|
+------+-------+---------------+----------+
|     2|  60756|          funny|1445714994|
|     2|  60756|Highly quotable|1445714996|
|     2|  60756|   will ferrell|1445714992|
|     2|  89774|   Boxing story|1445715207|
|     2|  89774|            MMA|1445715200|
+------+-------+---------------+----------+
only showing top 5 rows



In [None]:
tmp1 = ratings_df.groupBy("userID").count().toPandas()['count'].min()
tmp2 = ratings_df.groupBy("movieId").count().toPandas()['count'].min()
print('For the users that rated movies and the movies that were rated:')
print('Minimum number of ratings per user is {}'.format(tmp1))
print('Minimum number of ratings per movie is {}'.format(tmp2))

For the users that rated movies and the movies that were rated:
Minimum number of ratings per user is 20
Minimum number of ratings per movie is 1


In [None]:
tmp3 = sum(ratings_df.groupBy("movieId").count().toPandas()['count'] == 1)
tmp4 = ratings_df.select('movieId').distinct().count()
print('{} out of {} movies are rated by only one user'.format(tmp3, tmp4))

3446 out of 9724 movies are rated by only one user


## Part 1: Spark SQL and OLAP

In [None]:
movies_df.registerTempTable("movies")
ratings_df.registerTempTable("ratings")
links_df.registerTempTable("links")
tags_df.registerTempTable("tags")



### Q1: The number of users

In [None]:
q1_result = spark.sql("SELECT COUNT(DISTINCT userId) AS number_of_users FROM ratings")
q1_result.show()

+---------------+
|number_of_users|
+---------------+
|            610|
+---------------+



### Q2: The number of movies

In [None]:
q2_result = spark.sql("SELECT COUNT(DISTINCT movieId) AS number_of_movies FROM movies")
q2_result.show()

+----------------+
|number_of_movies|
+----------------+
|            9742|
+----------------+



### Q3: How many movies are rated by users? List the movies not be rated before

In [None]:
q3_result_rated_movies = spark.sql("SELECT COUNT(DISTINCT movieId) AS number_of_rated_movies FROM ratings")
q3_result_rated_movies.show()

+----------------------+
|number_of_rated_movies|
+----------------------+
|                  9724|
+----------------------+



In [None]:
q3_result_nrated_movies_list = spark.sql("SELECT * FROM movies WHERE movieId NOT IN (SELECT DISTINCT movieId FROM ratings)")
q3_result_nrated_movies_list.show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|   1076|Innocents, The (1...|Drama|Horror|Thri...|
|   2939|      Niagara (1953)|      Drama|Thriller|
|   3338|For All Mankind (...|         Documentary|
|   3456|Color of Paradise...|               Drama|
|   4194|I Know Where I'm ...|   Drama|Romance|War|
|   5721|  Chosen, The (1981)|               Drama|
|   6668|Road Home, The (W...|       Drama|Romance|
|   6849|      Scrooge (1970)|Drama|Fantasy|Mus...|
|   7020|        Proof (1991)|Comedy|Drama|Romance|
|   7792|Parallax View, Th...|            Thriller|
|   8765|This Gun for Hire...|Crime|Film-Noir|T...|
|  25855|Roaring Twenties,...|Crime|Drama|Thriller|
|  26085|Mutiny on the Bou...|Adventure|Drama|R...|
|  30892|In the Realms of ...|Animation|Documen...|
|  32160|Twentieth Century...|              Comedy|
|  32371|Call Northside 77...|Crime|Drama|Film-...|
|  34482|Bro

### Q4: List the movie genres

In [None]:
# Solution 1 for Q4 - Use lateral view explode to split one row into many rows
q4_result_solution1 = spark.sql("SELECT DISTINCT genres_list FROM movies LATERAL VIEW EXPLODE(SPLIT(genres, '[|]')) AS genres_list ORDER BY genres_list")
#q4_result_solution1 = spark.sql("SELECT DISTINCT EXPLODE(SPLIT(genres, '[|]')) AS genres FROM movies ORDER BY genres")
q4_result_solution1.show()

+------------------+
|       genres_list|
+------------------+
|(no genres listed)|
|            Action|
|         Adventure|
|         Animation|
|          Children|
|            Comedy|
|             Crime|
|       Documentary|
|             Drama|
|           Fantasy|
|         Film-Noir|
|            Horror|
|              IMAX|
|           Musical|
|           Mystery|
|           Romance|
|            Sci-Fi|
|          Thriller|
|               War|
|           Western|
+------------------+



In [None]:
# Solution 2 for Q4 - Split the genres manually
genres = movies_df.select("genres").distinct().toPandas()
# Store the distinct genres by using set
genres_set = set()
# Iterate each genre in each row and add it to the set
for row in genres["genres"]:
  for genre in row.split("|"):
    genres_set.add(genre)
# Convert the set to a list and print as a dataframe after sorting
genres_list = sorted(list(genres_set))
pd.DataFrame(genres_list, columns=["genres_list"])

Unnamed: 0,genres_list
0,(no genres listed)
1,Action
2,Adventure
3,Animation
4,Children
5,Comedy
6,Crime
7,Documentary
8,Drama
9,Fantasy


### Q5: Movies for each categoty

In [None]:
q5_result_category_numbers = spark.sql("SELECT genres_list, COUNT(*) AS numder_of_movies FROM movies LATERAL VIEW EXPLODE(SPLIT(genres, '[|]')) AS genres_list GROUP BY genres_list ORDER BY genres_list")
q5_result_category_numbers.show()

+------------------+----------------+
|       genres_list|numder_of_movies|
+------------------+----------------+
|(no genres listed)|              34|
|            Action|            1828|
|         Adventure|            1263|
|         Animation|             611|
|          Children|             664|
|            Comedy|            3756|
|             Crime|            1199|
|       Documentary|             440|
|             Drama|            4361|
|           Fantasy|             779|
|         Film-Noir|              87|
|            Horror|             978|
|              IMAX|             158|
|           Musical|             334|
|           Mystery|             573|
|           Romance|            1596|
|            Sci-Fi|             980|
|          Thriller|            1894|
|               War|             382|
|           Western|             167|
+------------------+----------------+



In [None]:
q5_result_category_movies = spark.sql("SELECT genres_list, CONCAT_WS(',', COLLECT_SET(title)) AS movies_list FROM movies LATERAL VIEW EXPLODE(SPLIT(genres, '[|]')) AS genres_list GROUP BY genres_list ORDER BY genres_list")
q5_result_category_movies.show()

+------------------+--------------------+
|       genres_list|         movies_list|
+------------------+--------------------+
|(no genres listed)|T2 3-D: Battle Ac...|
|            Action|Stealing Rembrand...|
|         Adventure|Ice Age: Collisio...|
|         Animation|Ice Age: Collisio...|
|          Children|Ice Age: Collisio...|
|            Comedy|Hysteria (2011),H...|
|             Crime|Stealing Rembrand...|
|       Documentary|The Barkley Marat...|
|             Drama|Airport '77 (1977...|
|           Fantasy|Masters of the Un...|
|         Film-Noir|Rififi (Du rififi...|
|            Horror|Underworld: Rise ...|
|              IMAX|Harry Potter and ...|
|           Musical|U2: Rattle and Hu...|
|           Mystery|Before and After ...|
|           Romance|Vampire in Brookl...|
|            Sci-Fi|Push (2009),SORI:...|
|          Thriller|Element of Crime,...|
|               War|General, The (192...|
|           Western|Man Who Shot Libe...|
+------------------+--------------

## Part 2: Spark ALS based approach for training model
We will use an Spark ML to predict the ratings, so let's reload "ratings.csv" using sc.textFile and then convert it to the form of (user, item, rating) tuples.

In [None]:
ratings_df.show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
|     1|    163|   5.0|964983650|
|     1|    216|   5.0|964981208|
|     1|    223|   3.0|964980985|
|     1|    231|   5.0|964981179|
|     1|    235|   4.0|964980908|
|     1|    260|   5.0|964981680|
|     1|    296|   3.0|964982967|
|     1|    316|   3.0|964982310|
|     1|    333|   5.0|964981179|
|     1|    349|   4.0|964982563|
+------+-------+------+---------+
only showing top 20 rows



In [None]:
movie_ratings=ratings_df.drop('timestamp')
movie_ratings.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|      6|   4.0|
|     1|     47|   5.0|
|     1|     50|   5.0|
|     1|     70|   3.0|
|     1|    101|   5.0|
|     1|    110|   4.0|
|     1|    151|   5.0|
|     1|    157|   5.0|
|     1|    163|   5.0|
|     1|    216|   5.0|
|     1|    223|   3.0|
|     1|    231|   5.0|
|     1|    235|   4.0|
|     1|    260|   5.0|
|     1|    296|   3.0|
|     1|    316|   3.0|
|     1|    333|   5.0|
|     1|    349|   4.0|
+------+-------+------+
only showing top 20 rows



### ALS Model Selection and Evaluation

In [None]:
# Import packages
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder

In [None]:
# Create test and train set
(training,test)=movie_ratings.randomSplit([0.8,0.2])

In [None]:
# Create ALS model
als_model = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")

In [None]:
#Tune model using ParamGridBuilder
params = ParamGridBuilder() \
            .addGrid(als_model.maxIter, [3, 5, 10]) \
            .addGrid(als_model.rank, [5, 10, 15]) \
            .addGrid(als_model.regParam, [0.01, 0.05, 0.1]) \
            .addGrid(als_model.alpha, [0.1, 0.5, 1]) \
            .build()

In [None]:
# Define evaluator as RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

In [None]:
# Build Cross validation 
cv = CrossValidator(estimator=als_model, estimatorParamMaps=params, evaluator=evaluator, numFolds=5)

In [None]:
#Fit ALS model to training data
cv_model = cv.fit(training)

In [None]:
#Extract best model from the tuning exercise using ParamGridBuilder
best_model = cv_model.bestModel
# Print each parameter of the best model
best_params = cv_model.getEstimatorParamMaps()[np.argmin(cv_model.avgMetrics)]
for i,j in best_params.items():
  print('-> '+i.name+': '+str(j))

-> maxIter: 10
-> rank: 5
-> regParam: 0.1
-> alpha: 0.5


### Model testing
And finally, make a prediction and check the testing error.

In [None]:
#Generate predictions and evaluate using RMSE
predictions=best_model.transform(test)
rmse = evaluator.evaluate(predictions)

In [None]:
#Print evaluation metrics and model parameters
print("RMSE = "+str(rmse))
print("**Best Model**")
print(" Rank: ", str(best_model._java_obj.parent().getRank())) 
print(" MaxIter: ", str(best_model._java_obj.parent().getMaxIter())) 
print(" RegParam: ", str(best_model._java_obj.parent().getRegParam())) 
print(" Alpha: ", str(best_model._java_obj.parent().getAlpha()))

RMSE = 0.8866523916895872
**Best Model**
 Rank:  5
 MaxIter:  10
 RegParam:  0.1
 Alpha:  0.5


In [None]:
predictions.show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   580|  44022|   3.5| 3.2944596|
|   133|    471|   4.0| 2.7501032|
|   362|   1591|   4.0| 2.4876447|
|   155|   3175|   4.0| 3.5329058|
|    34|   3997|   2.0|  2.841766|
|   368|   1580|   3.0| 2.8941004|
|   101|   3175|   4.0| 3.3813784|
|   115|   3175|   4.0| 3.5748672|
|   385|   1238|   3.0|  3.833984|
|    28|   1645|   2.5|   2.98396|
|   159|   1088|   4.0|  3.105312|
|   606|   1959|   3.5| 3.8424022|
|   606|   6466|   4.0|  4.148471|
|   602|    471|   4.0| 3.1445143|
|    91|    471|   1.0| 2.7188714|
|    93|   1580|   5.0|  4.364394|
|    93|   1591|   4.0| 2.2007782|
|   233|   1580|   3.0| 2.8405023|
|   599|   1959|   3.0| 2.7461076|
|   599|   3997|   0.5|  1.564797|
+------+-------+------+----------+
only showing top 20 rows



### Model apply and see the performance

In [None]:
alldata=best_model.transform(movie_ratings)
rmse_alldata = evaluator.evaluate(alldata)
print ("RMSE = "+str(rmse_alldata))

RMSE = 0.6934922039563032


In [None]:
alldata.registerTempTable("alldata")



In [None]:
spark.sql("SELECT * FROM alldata").show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   463|   1088|   3.5| 3.2347317|
|   137|   1580|   3.5|  3.195099|
|   580|   1580|   4.0| 3.6432002|
|   580|   3175|   2.5| 3.5013561|
|   580|  44022|   3.5| 3.2944596|
|   133|    471|   4.0| 2.7501032|
|   322|   1580|   3.5| 3.0273738|
|   362|   1591|   4.0| 2.4876447|
|   362|   1645|   5.0| 3.6733685|
|   593|   1580|   1.5| 2.5874534|
|   597|    471|   2.0| 4.2311735|
|   597|   1580|   3.0| 3.4178874|
|   597|   1959|   4.0| 4.1104193|
|   597|   2366|   5.0|  4.287929|
|   108|   1959|   5.0| 4.2124724|
|   155|   1580|   4.0| 3.7836523|
|   155|   3175|   4.0| 3.5329058|
|    34|   1580|   2.5|  3.022368|
|    34|   3997|   2.0|  2.841766|
|   368|   1580|   3.0| 2.8941004|
+------+-------+------+----------+
only showing top 20 rows



In [None]:
spark.sql("SELECT * FROM movies JOIN alldata ON movies.movieId=alldata.movieId").show()

+-------+--------------------+--------------------+------+-------+------+----------+
|movieId|               title|              genres|userId|movieId|rating|prediction|
+-------+--------------------+--------------------+------+-------+------+----------+
|   1088|Dirty Dancing (1987)|Drama|Musical|Rom...|   463|   1088|   3.5| 3.2347317|
|   1580|Men in Black (a.k...|Action|Comedy|Sci-Fi|   137|   1580|   3.5|  3.195099|
|   1580|Men in Black (a.k...|Action|Comedy|Sci-Fi|   580|   1580|   4.0| 3.6432002|
|   3175| Galaxy Quest (1999)|Adventure|Comedy|...|   580|   3175|   2.5| 3.5013561|
|  44022|Ice Age 2: The Me...|Adventure|Animati...|   580|  44022|   3.5| 3.2944596|
|    471|Hudsucker Proxy, ...|              Comedy|   133|    471|   4.0| 2.7501032|
|   1580|Men in Black (a.k...|Action|Comedy|Sci-Fi|   322|   1580|   3.5| 3.0273738|
|   1591|        Spawn (1997)|Action|Adventure|...|   362|   1591|   4.0| 2.4876447|
|   1645|The Devil's Advoc...|Drama|Mystery|Thr...|   362|   1645

## Recommend moive to users with id: 575, 232.

In [None]:
from pyspark.sql.functions import StringType

In [None]:
# Define the function to recommend topK movies to specific user
def topKRecommend(k, user_id, model):
  '''
  k: The number of movies recommended to the user
  user_id: The id of the user who's going to be recommended movies
  model: The model used for recommendation
  '''
  all_recommend = model.recommendForAllUsers(k)
  user_recommend = all_recommend.where(all_recommend.userId == user_id).toPandas()

  if user_recommend.empty:
    print("The user with id {} cannot be found.".format(user_id))
    return None

  rec_list = []
  for rec_pair in user_recommend.iloc[0,1]:
    rec_list.append(list(rec_pair))

  rec_df = spark.createDataFrame(pd.DataFrame(rec_list, columns=["movieId", "rec_rate"]))
  #movies_df = movies_df.join(rec_df)
  return rec_df

def combineMovieDetails(details_df, rec_df):
  '''
  details_df: The dataframe including the detail information of movies
  rec_df: The dataframe including the recommended movies and the prediction score
  return: a dataframe including the detail information for all recommended movies and corresponding prediction score
  '''
  if not rec_df:
    return None
  rec_df = rec_df.select(rec_df["movieId"].cast(StringType()), rec_df["rec_rate"])
  return details_df.select(details_df["movieId"].cast(StringType()), details_df["title"], details_df["genres"]).join(rec_df, "movieId").orderBy("rec_rate", ascending = False)


In [None]:
# The 10 movies recommended to user with id 575
rec_movies_575 = combineMovieDetails(movies_df, topKRecommend(10, 575, best_model))
rec_movies_575.show()



+-------+--------------------+--------------------+------------------+
|movieId|               title|              genres|          rec_rate|
+-------+--------------------+--------------------+------------------+
|  33649|  Saving Face (2004)|Comedy|Drama|Romance| 4.940224647521973|
| 183897| Isle of Dogs (2018)|    Animation|Comedy|4.9130096435546875|
|   3379| On the Beach (1959)|               Drama|   4.8609299659729|
|   7587|Samouraï, Le (God...|Crime|Drama|Thriller| 4.853343486785889|
|    945|      Top Hat (1935)|Comedy|Musical|Ro...| 4.813371181488037|
| 104339|In a World... (2013)|              Comedy| 4.804159164428711|
|   5075|  Waydowntown (2000)|              Comedy| 4.753145217895508|
|   5666|Rules of Attracti...|Comedy|Drama|Roma...| 4.745994567871094|
| 131724|The Jinx: The Lif...|         Documentary| 4.743083477020264|
|   6666|Discreet Charm of...|Comedy|Drama|Fantasy| 4.717129707336426|
+-------+--------------------+--------------------+------------------+



In [None]:
# The 15 movies recommended to user with id 232
rec_movie_232 = combineMovieDetails(movies_df, topKRecommend(15, 232, best_model))
rec_movie_232.show()



+-------+--------------------+--------------------+------------------+
|movieId|               title|              genres|          rec_rate|
+-------+--------------------+--------------------+------------------+
|   7587|Samouraï, Le (God...|Crime|Drama|Thriller| 5.061102390289307|
|   3379| On the Beach (1959)|               Drama| 4.939198970794678|
|    945|      Top Hat (1935)|Comedy|Musical|Ro...| 4.902270317077637|
|  33649|  Saving Face (2004)|Comedy|Drama|Romance| 4.842164516448975|
|   6666|Discreet Charm of...|Comedy|Drama|Fantasy| 4.784900665283203|
| 134796|  Bitter Lake (2015)|         Documentary|4.7595953941345215|
|   7071|Woman Under the I...|               Drama|4.7595953941345215|
| 138966|Nasu: Summer in A...|           Animation|4.7595953941345215|
| 179135|Blue Planet II (2...|         Documentary|4.7595953941345215|
| 184245|De platte jungle ...|         Documentary|4.7595953941345215|
|  26073|Human Condition I...|           Drama|War|4.7595953941345215|
|  742

## Find the similar moives for moive with id: 463, 471


In [None]:
import collections
from pyspark.sql.functions import col
# Define the function to find the similar movies
def findSimilarMovies(k, target_id, model, method, movie_details):
  '''
  k: The number of movies similar to the target movie
  target_id: The id of the target movie
  model: The ALS model which is used to find similar movies
  method: The method to calculate the similarity between movies
  movie_details: The dataframe including the detail information of all movies
  return: A dataframe including the information of the similar movies
  '''
  movie_factors = model.itemFactors.toPandas()
  movie_details = movie_details.select(movie_details["movieId"].cast(StringType()), movie_details["title"], movie_details["genres"])
  # Check whether there is a movie with the target id
  try:
    target_feature = movie_factors.loc[movie_factors.id==target_id, 'features'].to_numpy()[0]
  except:
    print("Cannot find movie with id {}".format(target_id))
    return 

  # Find the simialr movies based on cosine similarity between movies
  if method == "cosine_similarity":
    similar_movie = pd.DataFrame(columns=["movieId", "cosine_similarity"])
    for id, feature in movie_factors.to_numpy():
      cosine_similarity=np.dot(target_feature,feature)/(np.linalg.norm(target_feature) * np.linalg.norm(feature))
      similar_movie = similar_movie.append({'movieId':str(id), 'cosine_similarity':cosine_similarity}, ignore_index=True)

    similar_movie_sd = spark.createDataFrame(similar_movie).orderBy("cosine_similarity", ascending=False)

    similar_movie_cs = similar_movie_sd.filter(similar_movie_sd['cosine_similarity'] < 0.99).limit(k).join(movie_details, 'movieId')
    #similar_movie_cs = movie_details.select(movie_details["movieId"].cast(StringType()), movie_details["title"], movie_details["genres"]).join(similar_movie_cs, "movieId")
    return similar_movie_cs.orderBy(similar_movie_cs['cosine_similarity'], ascending=False).drop(similar_movie_cs['cosine_similarity'])
  
  # Find the similar movies based on euclidean distance between movies
  if method == "euclidean_distance":
    similar_movie = pd.DataFrame(columns=["movieId", "euclidean_distance"])
    for id, feature in movie_factors.to_numpy():
      euclidean_distance = np.linalg.norm(np.array(target_feature) - np.array(feature))
      similar_movie= similar_movie.append({'movieId':str(id), 'euclidean_distance':euclidean_distance}, ignore_index=True)
    
    similar_movie_sd = spark.createDataFrame(similar_movie).orderBy("euclidean_distance", ascending=True)
    #movie_details = movie_details.select(movie_details["movieId"].cast(StringType()), movie_details["title"], movie_details["genres"])
    similar_movie_ed = similar_movie_sd.filter(similar_movie_sd['euclidean_distance'] > 0).limit(k).join(movie_details, 'movieId')
    #similar_movie_ed = movie_details.select(movie_details["movieId"].cast(StringType()), movie_details["title"], movie_details["genres"]).join(similar_movie_ed, "movieId")
    return similar_movie_ed.orderBy(similar_movie_ed['euclidean_distance'], ascending=True).drop(similar_movie_ed['euclidean_distance'])


In [None]:
similar_movies_463 = findSimilarMovies(10, 463, best_model, "cosine_similarity", movies_df)



Cannot find movie with id 463


In [None]:
similar_movies_471_cs = findSimilarMovies(10, 471, best_model, "cosine_similarity", movies_df)
similar_movies_471_cs.show()



+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|    824|Kaspar Hauser (1993)|       Drama|Mystery|
|    389|Colonel Chabert, ...|   Drama|Romance|War|
|  76077|Hot Tub Time Mach...|       Comedy|Sci-Fi|
|  27731|Cat Returns, The ...|Adventure|Animati...|
|     13|        Balto (1995)|Adventure|Animati...|
|   8738|Woman Is a Woman,...|Comedy|Drama|Musi...|
|  26147|Thousand Clowns, ...|Comedy|Drama|Romance|
|   6408|Animals are Beaut...|  Comedy|Documentary|
|   2108|   L.A. Story (1991)|      Comedy|Romance|
|    596|    Pinocchio (1940)|Animation|Childre...|
+-------+--------------------+--------------------+



In [None]:
similar_movies_471_ed = findSimilarMovies(10, 471, best_model, "euclidean_distance", movies_df)
similar_movies_471_ed.show()



+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|   1293|       Gandhi (1982)|               Drama|
|   3699|      Starman (1984)|Adventure|Drama|R...|
|   1147|When We Were King...|         Documentary|
|  76077|Hot Tub Time Mach...|       Comedy|Sci-Fi|
|   1497|  Double Team (1997)|              Action|
|   3928|Abbott and Costel...|       Comedy|Horror|
|   2108|   L.A. Story (1991)|      Comedy|Romance|
|    596|    Pinocchio (1940)|Animation|Childre...|
|    661|James and the Gia...|Adventure|Animati...|
|   3308|Flamingo Kid, The...|        Comedy|Drama|
+-------+--------------------+--------------------+



## Report
### Motivation
With the increasing popularity of artificial intelligence in recent years, there are more and more machine learning algorithms being applied to practical industrial production. Among them, recommendation system is a very popular research direction which can be used to help the companies, especially the Internet enterprise, to improve the user experience and daily active user by recommending content that user might like. Collaborative filter is one of the most popular recommend algorithm and can be implemented with Alternating Least Square(ALS) model in Spark ML. So I would like to attampt to design and implement a movie recommendation system based on the movie rating dataset.

### Step1: Environment Setup and Data ETL
Download the spark file and set it up in colab. Download  and import the datasets.
### Step2: Data Analysis and Preprocessing
Malnipulate the data with SQL and gain some intuitive insights.
### Step3: Hyperparameter Selection and Model building
Choose the best hyperparameters of ALS model by testing through corss-validation and train the model with the training dataset.
### Step4: Model Evaluation
Evaluate the model by measuring the root mean square error(RMSE) with the testing dataset.
### Step5: Model Application - Recommend movies to specific users
Define the function based on the ALS model to calculate and choose the movies that is most likely to be enjoyed by the given user.
### Step6: Model Application - Fine the Similar Movies
Define the fuction based on one of the matrix in ALS model, Item-Feature Matrix, to calculate the similarity between the target movie and others with two different method, cosine similarity and euclidean distance. And return the movies with the highest similarity.

### Output and Conclusion
- ALS is an intuitive way to implement a recommendation system due to its explainability, because it is a kind of model which uses the collaborative filtering algorithm based on matrix factorization method to implement recommendation system. 
- Base on cross-validation, the best model is chosen and used to recommeded movies to users and find the similar movies and get a good performance.
- However, ALS model also has some disadvantages. For example, the model needs to be periodically updated and its calculation process takes a long time. I would like to try to implement the recommendation system with other method in the future, just like Wide-and-deep.