### Movie Recommendation Engine Development using Apache Spark
In this notebook, we will use an Alternating Least Squares (ALS) algorithm with Spark APIs to predict the ratings for the movies in [MovieLens small dataset](https://grouplens.org/datasets/movielens/latest/)

In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"

import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

from pyspark.sql.functions import col


In [3]:
# Install library for finding Spark
!pip install -q findspark
# Import the libary
import findspark
# Initiate findspark
findspark.init()
# Check the location for Spark
findspark.find()

'/content/spark-3.2.1-bin-hadoop3.2'

In [4]:
# Import SparkSession
from pyspark.sql import SparkSession
# Create a Spark Session
spark = SparkSession.builder.master("local[*]").getOrCreate()
# Check Spark Session Information
spark

In [5]:
# Import a Spark function from library
from pyspark.sql.functions import col

In [6]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import math
%matplotlib inline

In [7]:
import os
os.environ["PYSPARK_PYTHON"] = "python3"

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## Part1: Data ETL and Data Exploration

In [8]:
movies_df = spark.read.load("/content/drive/MyDrive/HW2_movie/movies.csv", format='csv', header = True)
ratings_df = spark.read.load("/content/drive/MyDrive/HW2_movie/ratings.csv", format='csv', header = True)
links_df = spark.read.load("/content/drive/MyDrive/HW2_movie/links.csv", format='csv', header = True)
tags_df = spark.read.load("/content/drive/MyDrive/HW2_movie/tags.csv", format='csv', header = True)

In [9]:
movies_df.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [10]:
ratings_df.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [11]:
links_df.show(5)

+-------+-------+------+
|movieId| imdbId|tmdbId|
+-------+-------+------+
|      1|0114709|   862|
|      2|0113497|  8844|
|      3|0113228| 15602|
|      4|0114885| 31357|
|      5|0113041| 11862|
+-------+-------+------+
only showing top 5 rows



In [12]:
tags_df.show(5)

+------+-------+---------------+----------+
|userId|movieId|            tag| timestamp|
+------+-------+---------------+----------+
|     2|  60756|          funny|1445714994|
|     2|  60756|Highly quotable|1445714996|
|     2|  60756|   will ferrell|1445714992|
|     2|  89774|   Boxing story|1445715207|
|     2|  89774|            MMA|1445715200|
+------+-------+---------------+----------+
only showing top 5 rows



In [13]:
tmp1 = ratings_df.groupBy("userID").count().toPandas()['count'].min()
tmp2 = ratings_df.groupBy("movieId").count().toPandas()['count'].min()
print('For the users that rated movies and the movies that were rated:')
print('Minimum number of ratings per user is {}'.format(tmp1))
print('Minimum number of ratings per movie is {}'.format(tmp2))

For the users that rated movies and the movies that were rated:
Minimum number of ratings per user is 20
Minimum number of ratings per movie is 1


In [14]:
tmp1 = sum(ratings_df.groupBy("movieId").count().toPandas()['count'] == 1)
tmp2 = ratings_df.select('movieId').distinct().count()
print('{} out of {} movies are rated by only one user'.format(tmp1, tmp2))

3446 out of 9724 movies are rated by only one user


## Part 1: Spark SQL and OLAP

In [15]:
movies_df.registerTempTable("movies")
ratings_df.registerTempTable("ratings")
links_df.registerTempTable("links")
tags_df.registerTempTable("tags")



### Q1: The number of Users

In [18]:
num_usr = spark.sql("Select Count(Distinct userId) as Number_of_Users from ratings")
num_usr.show()

+---------------+
|Number_of_Users|
+---------------+
|            610|
+---------------+



### Q2: The number of Movies

In [20]:
num_movie = spark.sql("Select Count(Distinct movieId) as Number_of_Movies from movies")
num_movie.show()

+----------------+
|Number_of_Movies|
+----------------+
|            9742|
+----------------+



### Q3:  How many movies are rated by users? List movies not rated before

In [23]:
rated_movie = spark.sql("Select Count(Distinct movieID) as Number_of_Movies from ratings")
rated_movie.show()

+----------------+
|Number_of_Movies|
+----------------+
|            9724|
+----------------+



In [36]:
not_rated_movie = spark.sql('select * from movies where movieID not in (select movieID from ratings)')
not_rated_movie.show(truncate=False)

+-------+--------------------------------------------+------------------------+
|movieId|title                                       |genres                  |
+-------+--------------------------------------------+------------------------+
|1076   |Innocents, The (1961)                       |Drama|Horror|Thriller   |
|2939   |Niagara (1953)                              |Drama|Thriller          |
|3338   |For All Mankind (1989)                      |Documentary             |
|3456   |Color of Paradise, The (Rang-e khoda) (1999)|Drama                   |
|4194   |I Know Where I'm Going! (1945)              |Drama|Romance|War       |
|5721   |Chosen, The (1981)                          |Drama                   |
|6668   |Road Home, The (Wo de fu qin mu qin) (1999) |Drama|Romance           |
|6849   |Scrooge (1970)                              |Drama|Fantasy|Musical   |
|7020   |Proof (1991)                                |Comedy|Drama|Romance    |
|7792   |Parallax View, The (1974)      

### Q4: List Movie Genres

In [35]:
movie_genre = spark.sql("Select genres from movies")
movie_genre.show(truncate=False)

+-------------------------------------------+
|genres                                     |
+-------------------------------------------+
|Adventure|Animation|Children|Comedy|Fantasy|
|Adventure|Children|Fantasy                 |
|Comedy|Romance                             |
|Comedy|Drama|Romance                       |
|Comedy                                     |
|Action|Crime|Thriller                      |
|Comedy|Romance                             |
|Adventure|Children                         |
|Action                                     |
|Action|Adventure|Thriller                  |
|Comedy|Drama|Romance                       |
|Comedy|Horror                              |
|Adventure|Animation|Children               |
|Drama                                      |
|Action|Adventure|Romance                   |
|Crime|Drama                                |
|Drama|Romance                              |
|Comedy                                     |
|Comedy                           

In [45]:
movie_genre = spark.sql("""
    SELECT DISTINCT genre
    FROM (
        SELECT explode(split(genres, '[|]')) AS genre
        FROM movies
    )
""")

movie_genre.show(truncate=False)

+------------------+
|genre             |
+------------------+
|Crime             |
|Romance           |
|Thriller          |
|Adventure         |
|Drama             |
|War               |
|Documentary       |
|Fantasy           |
|Mystery           |
|Musical           |
|Animation         |
|Film-Noir         |
|(no genres listed)|
|IMAX              |
|Horror            |
|Western           |
|Comedy            |
|Children          |
|Action            |
|Sci-Fi            |
+------------------+



### Q5: Movie for Each Category

In [47]:
num_category = spark.sql("""
    SELECT genre AS Category, COUNT(*) AS number
    FROM (
        SELECT explode(split(genres, '[|]')) AS genre
        FROM movies
    )
    GROUP BY genre
    ORDER BY number DESC
""")
num_category.show(truncate=False)

+------------------+------+
|Category          |number|
+------------------+------+
|Drama             |4361  |
|Comedy            |3756  |
|Thriller          |1894  |
|Action            |1828  |
|Romance           |1596  |
|Adventure         |1263  |
|Crime             |1199  |
|Sci-Fi            |980   |
|Horror            |978   |
|Fantasy           |779   |
|Children          |664   |
|Animation         |611   |
|Mystery           |573   |
|Documentary       |440   |
|War               |382   |
|Musical           |334   |
|Western           |167   |
|IMAX              |158   |
|Film-Noir         |87    |
|(no genres listed)|34    |
+------------------+------+



In [50]:
movie_category_list = spark.sql("""
    SELECT genre AS Category, collect_list(title) AS list_of_movies
    FROM (
        SELECT explode(split(genres, '[|]')) AS genre, title
        FROM movies
    )
    GROUP BY genre
""")

movie_category_list.show()

+------------------+--------------------+
|          Category|      list_of_movies|
+------------------+--------------------+
|             Crime|[Heat (1995), Cas...|
|           Romance|[Grumpier Old Men...|
|          Thriller|[Heat (1995), Gol...|
|         Adventure|[Toy Story (1995)...|
|             Drama|[Waiting to Exhal...|
|               War|[Richard III (199...|
|       Documentary|[Nico Icon (1995)...|
|           Fantasy|[Toy Story (1995)...|
|           Mystery|[Copycat (1995), ...|
|           Musical|[Pocahontas (1995...|
|         Animation|[Toy Story (1995)...|
|         Film-Noir|[Devil in a Blue ...|
|(no genres listed)|[La cravate (1957...|
|              IMAX|[Apollo 13 (1995)...|
|            Horror|[Dracula: Dead an...|
|           Western|[Desperado (1995)...|
|            Comedy|[Toy Story (1995)...|
|          Children|[Toy Story (1995)...|
|            Action|[Heat (1995), Sud...|
|            Sci-Fi|[Powder (1995), C...|
+------------------+--------------

## Part2: Spark ALS based approach for training model
We will use an Spark ML to predict the ratings, so let's reload "ratings.csv" using ``sc.textFile`` and then convert it to the form of (user, item, rating) tuples.

In [51]:
ratings_df.show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
|     1|    163|   5.0|964983650|
|     1|    216|   5.0|964981208|
|     1|    223|   3.0|964980985|
|     1|    231|   5.0|964981179|
|     1|    235|   4.0|964980908|
|     1|    260|   5.0|964981680|
|     1|    296|   3.0|964982967|
|     1|    316|   3.0|964982310|
|     1|    333|   5.0|964981179|
|     1|    349|   4.0|964982563|
+------+-------+------+---------+
only showing top 20 rows



In [52]:
movie_ratings=ratings_df.drop('timestamp')

In [53]:
# Data type convert
from pyspark.sql.types import IntegerType, FloatType
movie_ratings = movie_ratings.withColumn("userId", movie_ratings["userId"].cast(IntegerType()))
movie_ratings = movie_ratings.withColumn("movieId", movie_ratings["movieId"].cast(IntegerType()))
movie_ratings = movie_ratings.withColumn("rating", movie_ratings["rating"].cast(FloatType()))

In [54]:
movie_ratings.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|      6|   4.0|
|     1|     47|   5.0|
|     1|     50|   5.0|
|     1|     70|   3.0|
|     1|    101|   5.0|
|     1|    110|   4.0|
|     1|    151|   5.0|
|     1|    157|   5.0|
|     1|    163|   5.0|
|     1|    216|   5.0|
|     1|    223|   3.0|
|     1|    231|   5.0|
|     1|    235|   4.0|
|     1|    260|   5.0|
|     1|    296|   3.0|
|     1|    316|   3.0|
|     1|    333|   5.0|
|     1|    349|   4.0|
+------+-------+------+
only showing top 20 rows



### ALS Model Selection and Evaluation

With the ALS model, we can use a grid search to find the optimal hyperparameters.

In [55]:
# import package
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder

In [56]:
#Create test and train set
(training,test)=movie_ratings.randomSplit([0.8,0.2])

In [83]:
#Create ALS model
als_model = ALS(
    rank = 10,
    maxIter = 5,
    regParam = 0.01,
    userCol = "userId",
    itemCol = "movieId",
    ratingCol = "rating",
    coldStartStrategy = "drop",
    seed = 6
)

In [84]:
#Tune model using ParamGridBuilder
paramGrid = ParamGridBuilder() \
    .addGrid(als_model.rank, [5, 10, 15, 20, 25])\
    .addGrid(als_model.regParam, [0.1, 0.01, 0.001])\
    .addGrid(als_model.maxIter, [3, 5, 10])\
    .addGrid(als_model.alpha, [0.1, 0.01, 0.001])\
    .build()

In [85]:
# Define evaluator as RMSE
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)

In [88]:
# Build Cross validation
crossval = CrossValidator(
    estimator=als_model,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=5,
    seed=6
)

In [89]:
#Fit ALS model to training data
import time
start_time = time.time()
model = crossval.fit(training)
end_time = time.time()

In [90]:
#Extract best model from the tuning exercise using ParamGridBuilder
bestModel=model.bestModel
best_params = model.getEstimatorParamMaps()[np.argmin(model.avgMetrics)]
print('Best ALS model parameters by CV:')
for i,j in best_params.items():
  print('-> '+i.name+': '+str(j))

prediction_train=model.transform(training)
rmse_train = evaluator.evaluate(prediction_train)
print("Root-mean-square error for training data is " + str(rmse_train))

Best ALS model parameters by CV:
-> rank: 25
-> regParam: 0.1
-> maxIter: 10
-> alpha: 0.1
Root-mean-square error for training data is 0.4818740526428917


### Model testing
And finally, make a prediction and check the testing error.

In [92]:
#Generate predictions and evaluate using RMSE
predictions=bestModel.transform(test)
rmse = evaluator.evaluate(predictions)

In [94]:
#Print evaluation metrics and model parameters
print ("RMSE = "+str(rmse))
print ("**Best Model**")
print (" Rank: ", str(bestModel._java_obj.parent().getRank())),
print (" MaxIter: ", str(bestModel._java_obj.parent().getMaxIter())),
print (" RegParam: ", str(bestModel._java_obj.parent().getRegParam()))

RMSE = 0.8826439394838237
**Best Model**
 Rank:  25
 MaxIter:  10
 RegParam:  0.1


In [95]:
predictions.show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   137|   1580|   3.5|  3.390127|
|   580|   1580|   4.0| 3.3505359|
|   322|   1580|   3.5| 3.0585606|
|   362|   1645|   5.0| 3.6939654|
|   593|   1580|   1.5|  3.222214|
|   597|   1580|   3.0| 3.5908246|
|   597|   1959|   4.0|  4.074137|
|   368|   2366|   4.0| 3.5536225|
|    28|   1580|   3.0| 3.0961149|
|   587|   1580|   4.0|  4.073407|
|   587|   3175|   5.0| 4.2180715|
|    27|   1580|   3.0| 3.5590591|
|   332|   1645|   3.5| 2.9509902|
|   577|   1580|   3.0| 3.3064036|
|   606|   1580|   2.5| 3.3959742|
|   606|   1829|   3.5| 1.9650246|
|   388|  44022|   4.5| 3.1383395|
|   602|    471|   4.0| 3.1731167|
|    91|    471|   1.0|  3.026765|
|    91|   1645|   3.0|  3.621868|
+------+-------+------+----------+
only showing top 20 rows



### Model apply and see the performance

In [96]:
alldata=bestModel.transform(movie_ratings)
rmse = evaluator.evaluate(alldata)
print ("RMSE = "+str(rmse))

RMSE = 0.58334755662859


In [97]:
alldata.registerTempTable("alldata")



In [100]:
all_data = spark.sql("select * from alldata")
all_data.show(truncate=False)

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|463   |1088   |3.5   |3.4004312 |
|137   |1580   |3.5   |3.390127  |
|580   |1580   |4.0   |3.3505359 |
|580   |3175   |2.5   |3.3571498 |
|580   |44022  |3.5   |3.423721  |
|133   |471    |4.0   |3.544727  |
|322   |1580   |3.5   |3.0585606 |
|362   |1591   |4.0   |3.242827  |
|362   |1645   |5.0   |3.6939654 |
|593   |1580   |1.5   |3.222214  |
|597   |471    |2.0   |3.3143203 |
|597   |1580   |3.0   |3.5908246 |
|597   |1959   |4.0   |4.074137  |
|597   |2366   |5.0   |4.421212  |
|108   |1959   |5.0   |4.603735  |
|155   |1580   |4.0   |3.9388194 |
|155   |3175   |4.0   |4.015964  |
|34    |1580   |2.5   |3.0300171 |
|34    |3997   |2.0   |1.8331724 |
|368   |1580   |3.0   |3.1835866 |
+------+-------+------+----------+
only showing top 20 rows



In [101]:
spark.sql("select * from movies join alldata on movies.movieId=alldata.movieId").show(truncate=False)

+-------+--------------------------------+-----------------------------------+------+-------+------+----------+
|movieId|title                           |genres                             |userId|movieId|rating|prediction|
+-------+--------------------------------+-----------------------------------+------+-------+------+----------+
|1088   |Dirty Dancing (1987)            |Drama|Musical|Romance              |463   |1088   |3.5   |3.4004312 |
|1580   |Men in Black (a.k.a. MIB) (1997)|Action|Comedy|Sci-Fi               |137   |1580   |3.5   |3.390127  |
|1580   |Men in Black (a.k.a. MIB) (1997)|Action|Comedy|Sci-Fi               |580   |1580   |4.0   |3.3505359 |
|3175   |Galaxy Quest (1999)             |Adventure|Comedy|Sci-Fi            |580   |3175   |2.5   |3.3571498 |
|44022  |Ice Age 2: The Meltdown (2006)  |Adventure|Animation|Children|Comedy|580   |44022  |3.5   |3.423721  |
|471    |Hudsucker Proxy, The (1994)     |Comedy                             |133   |471    |4.0   |3.54

## Recommend moive to users with id: 575, 232.
you can choose some users to recommend the moives

In [114]:
def TopRecommend(k,id,model):
  all_recommended = model.recommendForAllUsers(k)
  user_recommended = all_recommended.where(all_recommended.userId==id).toPandas()

  if user_recommended.shape[0]==0:
    print(f"No recommendations found for user with ID {user_id}")
    return None

  user_recommended = user_recommended.iloc[0,1]
  user_recommended = pd.DataFrame(user_recommended, columns=['movieId', 'predicted_ratings'])
  recommendations = None

  for movie_id in user_recommended['movieId']:
    if recommendations is None:
      recommendations = movies_df.where(movies_df.movieId==str(movie_id))
    else:
      recommendations = recommendations.union(movies_df.where(movies_df.movieId==str(movie_id)))

  recommendations = pd.concat([recommendations.toPandas(), user_recommended['predicted_ratings']], axis=1)
  recommendations.index = range(1, k+1)

  return recommendations

In [116]:
TopRecommend(15,575,bestModel)



Unnamed: 0,movieId,title,genres,predicted_ratings
1,1411,Hamlet (1996),Crime|Drama|Romance,4.935855
2,177593,"Three Billboards Outside Ebbing, Missouri (2017)",Crime|Drama,4.91415
3,4973,"Amelie (Fabuleux destin d'Amélie Poulain, Le) ...",Comedy|Romance,4.85364
4,296,Pulp Fiction (1994),Comedy|Crime|Drama|Thriller,4.75836
5,430,Calendar Girl (1993),Comedy|Drama,4.756208
6,608,Fargo (1996),Comedy|Crime|Drama|Thriller,4.74083
7,7096,Rivers and Tides (2001),Documentary,4.716104
8,1732,"Big Lebowski, The (1998)",Comedy|Crime,4.694263
9,16,Casino (1995),Crime|Drama,4.64483
10,6442,Belle époque (1992),Comedy|Romance,4.610984


In [117]:
TopRecommend(15,232,bestModel)



Unnamed: 0,movieId,title,genres,predicted_ratings
1,80906,Inside Job (2010),Documentary,4.491966
2,172583,Investigation Held by Kolobki (1986),Animation,4.39345
3,166183,Junior and Karlson (1968),Adventure|Animation|Children,4.39345
4,163112,Winnie the Pooh Goes Visiting (1971),Animation,4.39345
5,163072,Winnie Pooh (1969),Animation|Children,4.39345
6,173963,Empties (2007),Comedy,4.39345
7,173351,Wow! A Talking Fish! (1983),Animation|Children|Comedy|Fantasy,4.39345
8,32892,Ivan's Childhood (a.k.a. My Name is Ivan) (Iva...,Drama|War,4.39345
9,147250,The Adventures of Sherlock Holmes and Doctor W...,(no genres listed),4.39345
10,130970,George Carlin: Life Is Worth Losing (2005),Comedy,4.39345


## Find the similar moives for moive with id: 463, 471
You can find the similar moives based on the ALS results

In [128]:
movie_factors = bestModel.itemFactors

exprs = [f"features[{i}] as feature{i}" for i in range(bestModel.rank)]
exprs.insert(0, "id as movieId")

movie_factors = movie_factors.selectExpr(*exprs)
movie_factors.createOrReplaceTempView('movie_factors')
movie_factors.show()

+-------+------------+-----------+-----------+-----------+------------+------------+----------+-----------+-----------+------------+----------+-----------+------------+------------+------------+-------------+------------+------------+------------+------------+------------+------------+------------+-----------+----------+
|movieId|    feature0|   feature1|   feature2|   feature3|    feature4|    feature5|  feature6|   feature7|   feature8|    feature9| feature10|  feature11|   feature12|   feature13|   feature14|    feature15|   feature16|   feature17|   feature18|   feature19|   feature20|   feature21|   feature22|  feature23| feature24|
+-------+------------+-----------+-----------+-----------+------------+------------+----------+-----------+-----------+------------+----------+-----------+------------+------------+------------+-------------+------------+------------+------------+------------+------------+------------+------------+-----------+----------+
|     10|  0.28435665|-0.396923



In [129]:
def dist_similar(k, mid):
    '''
    k: number of similar movies to find
    mid: id of the movie to find similarities
    '''
    # Get the feature vector of the target movie
    movie_info = spark.sql(f'select * from movie_factors where movieId={mid}').toPandas()

    # If no movie is found, return to the prompt
    if movie_info.shape[0] == 0:
        print(f'No movie with id {mid} is found in the data.')
        return None, None

    temp = ['select movieId,']

    # Iterate over each feature dimension and compute the square of each dimension component of the Euclidean distance
    for i in range(bestModel.rank):
        val = movie_info.iloc[0, i + 1]
        comd = f'power(feature{i} - {val}, 2)'
        if i < bestModel.rank - 1:
            temp.append(f'{comd} as sd{i},')
        else:
            temp.append(f'{comd} as sd{i}')

    # Constructing query statements: excluding the target movie itself
    temp.append(f'from movie_factors where movieId != {mid}')

    # caculate the distence
    ssd = spark.sql(' '.join(temp))

    # Calculate the sum of squares of all dimensions and take the square root to get the final Euclidean distance
    ssd = ssd.selectExpr(
        'movieId',
        f'sqrt(sd0 + sd1 + sd2 + sd3 + sd4) as euclidean_dist'
    ).orderBy('euclidean_dist', ascending=True).limit(k).toPandas()

    # Get details of the k most similar movies
    out = None
    for i in ssd['movieId']:
        if out is None:
            out = movies_df.where(movies_df.movieId == str(i))
        else:
            out = out.union(movies_df.where(movies_df.movieId == str(i)))

    # switch the result as Pandas DataFrame
    out = out.toPandas()
    out.index = range(1, k + 1)
    return out, ssd

In [134]:
out, ssd = dist_similar(10, 471)
print(out)

   movieId                                              title  \
1     5025                               Orange County (2002)   
2     7743                                   Explorers (1985)   
3   157296                                Finding Dory (2016)   
4    33683  High Tension (Haute tension) (Switchblade Roma...   
5     3502                                     My Life (1993)   
6     1428                                  Angel Baby (1995)   
7    55282                            30 Days of Night (2007)   
8      718              Visitors, The (Visiteurs, Les) (1993)   
9      335                                  Underneath (1995)   
10    5401                          Undercover Brother (2002)   

                        genres  
1                       Comedy  
2    Adventure|Children|Sci-Fi  
3   Adventure|Animation|Comedy  
4              Horror|Thriller  
5                        Drama  
6                        Drama  
7              Horror|Thriller  
8        Comedy|Fant

In [135]:
out, ssd = dist_similar(10, 463)
print(out)

No movie with id 463 is found in the data.
None



## Report
### motivation
In this project we built a personalized movie recommendation system using Spark’s Alternating Least Squares (ALS) algorithm. The system aims to predict users’ movie ratings and recommend movies they are likely to enjoy based on historical data.

1. step1. Data Loading and Preprocessing\
In this step, we loaded the movie and ratings data from CSV files and transformed them into a format suitable for machine learning using Spark’s DataFrame API. We cleaned the data, converting columns like userId, movieId, and rating into appropriate data types for processing.

2. step2. Model Building Using ALS\
The ALS model was built using the Spark MLlib library. The training data was divided into an 80/20 split, and a grid search was applied to find the best hyperparameters (rank, regularization, etc.). The model was then trained on the dataset to predict movie ratings.

3. step3. Model Evaluation and Fine-Tuning\
After training, the model’s performance was evaluated using Root Mean Square Error (RMSE). The model was fine-tuned by adjusting hyperparameters to minimize the RMSE, resulting in the best set of parameters.
4. step4. Making Predictions and Recommendations\
Finally, the trained ALS model was used to recommend movies to users. Recommendations were based on user-specific preferences and historical ratings. Additionally, similar movies were suggested based on movie feature similarity.

### output and conclusion
The final model provided personalized movie recommendations with an RMSE of around 0.88 on the test set, indicating good prediction accuracy. The project successfully demonstrates how collaborative filtering techniques can be applied to build a scalable recommendation system.