# Moive Recommendation

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.5.5/spark-3.5.5-bin-hadoop3.tgz
!tar xf spark-3.5.5-bin-hadoop3.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "spark-3.5.5/spark-3.5.5-bin-hadoop3"

In [3]:
import findspark
findspark.init("spark-3.5.5-bin-hadoop3")
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [50]:
import numpy as np
import pandas as pd
import math

## 1. Data Processing and Data Exploration

data source: https://grouplens.org/datasets/movielens/latest/

In [49]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [6]:
movies_df = spark.read.load("drive/My Drive/Movie_data/movies.csv", format='csv', header = True)
ratings_df = spark.read.load("drive/My Drive/Movie_data/ratings.csv", format='csv', header = True)
links_df = spark.read.load("drive/My Drive/Movie_data/links.csv", format='csv', header = True)
tags_df = spark.read.load("drive/My Drive/Movie_data/tags.csv", format='csv', header = True)

In [7]:
dataframes = {
    "movies_df": movies_df,
    "ratings_df": ratings_df,
    "links_df": links_df,
    "tags_df": tags_df
}

In [8]:
for df_name, df in dataframes.items():
  print(f"Data frame: {df_name}")
  df.printSchema()
  df.show(5, truncate=False)

Data frame: movies_df
root
 |-- movieId: string (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

+-------+----------------------------------+-------------------------------------------+
|movieId|title                             |genres                                     |
+-------+----------------------------------+-------------------------------------------+
|1      |Toy Story (1995)                  |Adventure|Animation|Children|Comedy|Fantasy|
|2      |Jumanji (1995)                    |Adventure|Children|Fantasy                 |
|3      |Grumpier Old Men (1995)           |Comedy|Romance                             |
|4      |Waiting to Exhale (1995)          |Comedy|Drama|Romance                       |
|5      |Father of the Bride Part II (1995)|Comedy                                     |
+-------+----------------------------------+-------------------------------------------+
only showing top 5 rows

Data frame: ratings_df
root
 |--

In [9]:
from pyspark.sql.functions import col, sum

for df_name, df in dataframes.items():
  print(f"Data frame {df_name}")
  df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]).show()

Data frame movies_df
+-------+-----+------+
|movieId|title|genres|
+-------+-----+------+
|      0|    0|     0|
+-------+-----+------+

Data frame ratings_df
+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     0|      0|     0|        0|
+------+-------+------+---------+

Data frame links_df
+-------+------+------+
|movieId|imdbId|tmdbId|
+-------+------+------+
|      0|     0|     8|
+-------+------+------+

Data frame tags_df
+------+-------+---+---------+
|userId|movieId|tag|timestamp|
+------+-------+---+---------+
|     0|      0|  0|        0|
+------+-------+---+---------+



In [10]:
links_df = links_df.na.drop()

In [11]:
from pyspark.sql.functions import from_unixtime

# Make timestamp more readable
ratings_df = ratings_df.withColumn("timestamp", from_unixtime(col("timestamp")))
tags_df = tags_df.withColumn("timestamp", from_unixtime(col("timestamp")))

In [12]:
ratings_df.show(5, truncate=False)
tags_df.show(5, truncate=False)

+------+-------+------+-------------------+
|userId|movieId|rating|timestamp          |
+------+-------+------+-------------------+
|1     |1      |4.0   |2000-07-30 18:45:03|
|1     |3      |4.0   |2000-07-30 18:20:47|
|1     |6      |4.0   |2000-07-30 18:37:04|
|1     |47     |5.0   |2000-07-30 19:03:35|
|1     |50     |5.0   |2000-07-30 18:48:51|
+------+-------+------+-------------------+
only showing top 5 rows

+------+-------+---------------+-------------------+
|userId|movieId|tag            |timestamp          |
+------+-------+---------------+-------------------+
|2     |60756  |funny          |2015-10-24 19:29:54|
|2     |60756  |Highly quotable|2015-10-24 19:29:56|
|2     |60756  |will ferrell   |2015-10-24 19:29:52|
|2     |89774  |Boxing story   |2015-10-24 19:33:27|
|2     |89774  |MMA            |2015-10-24 19:33:20|
+------+-------+---------------+-------------------+
only showing top 5 rows



### Sql analysis

In [13]:
movies_df.registerTempTable("movies")
ratings_df.registerTempTable("ratings")
links_df.registerTempTable("links")
tags_df.registerTempTable("tags")



In [14]:
# User Number and Movie Number
user_numb = spark.sql("SELECT COUNT(DISTINCT userId) AS user_numb FROM ratings")
movie_numb = spark.sql("SELECT COUNT(DISTINCT movieId) AS movie_numb FROM movies")
user_numb.show()
movie_numb.show()

+---------+
|user_numb|
+---------+
|      610|
+---------+

+----------+
|movie_numb|
+----------+
|      9742|
+----------+



In [93]:
# Show movie genres
movie_genres = spark.sql("SELECT DISTINCT genres FROM movies")
print(f"movie genres count: {movie_genres.count()}")
movie_genres.show(10, truncate=False)

movie genres count: 951
+------------------------------------------+
|genres                                    |
+------------------------------------------+
|Comedy|Horror|Thriller                    |
|Adventure|Sci-Fi|Thriller                 |
|Action|Adventure|Drama|Fantasy            |
|Action|Drama|Horror                       |
|Action|Animation|Comedy|Sci-Fi            |
|Animation|Children|Drama|Musical|Romance  |
|Action|Adventure|Drama                    |
|Adventure|Sci-Fi                          |
|Documentary|Musical|IMAX                  |
|Adventure|Children|Fantasy|Sci-Fi|Thriller|
+------------------------------------------+
only showing top 10 rows



In [94]:
movie_genres_count = spark.sql("SELECT genres, COUNT(*) AS count FROM movies GROUP BY 1 Order by 2 DESC")
movie_genres_count.show(10, truncate=False)

+--------------------+-----+
|genres              |count|
+--------------------+-----+
|Drama               |1053 |
|Comedy              |946  |
|Comedy|Drama        |435  |
|Comedy|Romance      |363  |
|Drama|Romance       |349  |
|Documentary         |339  |
|Comedy|Drama|Romance|276  |
|Drama|Thriller      |168  |
|Horror              |167  |
|Horror|Thriller     |135  |
+--------------------+-----+
only showing top 10 rows



In [170]:
# count rated movies
rated_movies = spark.sql("SELECT movieID, Title FROM movies WHERE movieID IN (SELECT movieId from ratings)")
print(f"movie genres count: {rated_movies.count()}")
rated_movies.show(10, truncate=False)

movie genres count: 9724
+-------+----------------------------------+
|movieID|Title                             |
+-------+----------------------------------+
|1      |Toy Story (1995)                  |
|2      |Jumanji (1995)                    |
|3      |Grumpier Old Men (1995)           |
|4      |Waiting to Exhale (1995)          |
|5      |Father of the Bride Part II (1995)|
|6      |Heat (1995)                       |
|7      |Sabrina (1995)                    |
|8      |Tom and Huck (1995)               |
|9      |Sudden Death (1995)               |
|10     |GoldenEye (1995)                  |
+-------+----------------------------------+
only showing top 10 rows



In [96]:
# Count unique movie genres
unique_genres = spark.sql("SELECT genres, COUNT(*) as count FROM (SELECT explode(split(genres, '[|]')) as genres, movieID FROM movies) Group By 1 Order by 2 DESC")
print(f"movie genres count: {unique_genres.count()}")
unique_genres.show(10, truncate=False)

movie genres count: 20
+---------+-----+
|genres   |count|
+---------+-----+
|Drama    |4361 |
|Comedy   |3756 |
|Thriller |1894 |
|Action   |1828 |
|Romance  |1596 |
|Adventure|1263 |
|Crime    |1199 |
|Sci-Fi   |980  |
|Horror   |978  |
|Fantasy  |779  |
+---------+-----+
only showing top 10 rows



## 2. Spark ALS model training and tuning

### modify data type for training

In [19]:
movie_ratings = ratings_df.drop("timestamp")

In [20]:
from pyspark.sql.types import IntegerType, FloatType
movie_ratings = movie_ratings.withColumn("userId", col("userId").cast(IntegerType()))
movie_ratings = movie_ratings.withColumn("movieId", col("movieId").cast(IntegerType()))
movie_ratings = movie_ratings.withColumn("rating", col("rating").cast(FloatType()))

### prepare model and tuning

In [21]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder

In [22]:
(training,test) = movie_ratings.randomSplit([0.8,0.2])

In [23]:
als = ALS(maxIter=5, rank=10, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")

In [24]:
paramGrid = (ParamGridBuilder()
          .addGrid(als.regParam, [0.05, 0.1, 0.3, 0.5, 0.8])
          .addGrid(als.rank, [5, 8, 11, 15])
          .addGrid(als.maxIter, [3, 5, 8, 10])
          .build())

In [25]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                    predictionCol="prediction")

In [26]:
cv = CrossValidator(estimator=als, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

In [27]:
cvModel = cv.fit(training)

In [28]:
bestModel=cvModel.bestModel

### Model Training

In [89]:
test_predictions = bestModel.transform(test)
rmse_1 = evaluator.evaluate(test_predictions)

In [90]:
#test set predict result
print ("Best Model Performance:")
print (f"RMSE: {rmse_1}")
print (f" Rank: {bestModel._java_obj.parent().getRank()}"),
print (f" MaxIter: {bestModel._java_obj.parent().getMaxIter()}"),
print (f" RegParam: {bestModel._java_obj.parent().getRegParam()}")

Best Model Performance:
RMSE: 0.8915467028329968
 Rank: 5
 MaxIter: 10
 RegParam: 0.1


In [91]:
test_predictions.show(10)

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   463|   1088|   3.5| 3.1597826|
|   133|    471|   4.0|   2.82497|
|   322|   1580|   3.5| 3.1128342|
|   597|   1580|   3.0| 3.7918675|
|   155|   1580|   4.0| 3.8850126|
|   115|   1580|   4.0| 3.7900617|
|   115|   1645|   4.0| 3.4983606|
|   115|   3175|   4.0|  3.814225|
|   183|   1580|   4.0| 3.7332616|
|    27|   2142|   3.0| 2.8103154|
+------+-------+------+----------+
only showing top 10 rows



In [92]:
full_predictions = bestModel.transform(movie_ratings)
full_predictions.show(10)

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   463|   1088|   3.5| 3.1597826|
|   137|   1580|   3.5|  3.188569|
|   580|   1580|   4.0| 3.4207795|
|   580|   3175|   2.5| 3.4886749|
|   580|  44022|   3.5|  3.494078|
|   133|    471|   4.0|   2.82497|
|   322|   1580|   3.5| 3.1128342|
|   362|   1591|   4.0| 3.1952415|
|   362|   1645|   5.0|  3.961551|
|   593|   1580|   1.5| 2.8105989|
+------+-------+------+----------+
only showing top 10 rows



In [34]:
#whole dataset predict result
rmse_2 = evaluator.evaluate(full_predictions)
print (f"Full data prediction RMSE: {rmse_2}")

Full data prediction RMSE: 0.6925898347691948


## 3. Use model to recommand movies

In [51]:
import pyspark.pandas as ps



In [37]:
recommendations = bestModel.recommendForAllUsers(5)

In [86]:
recommendations.show(5, truncate=False)

+------+--------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                   |
+------+--------------------------------------------------------------------------------------------------+
|1     |[{3379, 6.1649156}, {33649, 6.108824}, {141718, 6.1074157}, {45503, 5.7321615}, {6818, 5.700225}] |
|2     |[{33649, 5.3681726}, {3379, 5.246966}, {6201, 5.206136}, {4495, 5.206136}, {141718, 5.1683354}]   |
|3     |[{5048, 5.955384}, {70946, 5.9453845}, {26865, 5.8731227}, {86347, 5.7541847}, {48322, 5.5967617}]|
|4     |[{3925, 5.490872}, {187, 5.286545}, {6201, 5.2766247}, {4495, 5.2766247}, {25825, 5.2600913}]     |
|5     |[{141718, 5.181051}, {132333, 5.0689464}, {5490, 5.0689464}, {6818, 5.0267277}, {3379, 4.9511504}]|
+------+--------------------------------------------------------------------------------------------------+
only showing top 5 rows



In [68]:
from pyspark.sql.functions import explode
def movieRec(Id):
  user_recs = recommendations.filter(col("userId") == Id)
  exploded_recs = user_recs.select(explode(col("recommendations")).alias("rec"))
  movie_ids = exploded_recs.select(col("rec.movieId"))
  recs_list = [str(row.movieId) for row in movie_ids.collect()]
  recommended_movies = movies_df.filter(col("movieId").isin(recs_list))
  return recommended_movies

In [72]:
print("Recommended movies for user with id '168':")
movieRec(168).show(truncate=False)

Recommended movies for user with id '168':
+-------+-----------------------------------------------------------------------------------------------------------------------------+--------------------------+
|movieId|title                                                                                                                        |genres                    |
+-------+-----------------------------------------------------------------------------------------------------------------------------+--------------------------+
|3379   |On the Beach (1959)                                                                                                          |Drama                     |
|6818   |Come and See (Idi i smotri) (1985)                                                                                           |Drama|War                 |
|8477   |Jetée, La (1962)                                                                                                             |Romance

##4. Find the similar moives

In [191]:
item_factors = bestModel.itemFactors
item_factors.show(5, truncate=False)

+---+-------------------------------------------------------------+
|id |features                                                     |
+---+-------------------------------------------------------------+
|10 |[0.669068, -0.98661375, 1.0303719, -1.404348, 0.0310954]     |
|20 |[0.58208466, -0.581814, 0.33827534, -1.0681218, 0.80129224]  |
|30 |[-0.88411576, -1.349364, -0.09334107, -0.65296865, 1.5343711]|
|40 |[-0.2663687, -1.488251, 1.1736697, -1.5591568, -0.09360958]  |
|50 |[0.38101068, -1.3306297, 0.55440944, -1.8172596, 0.7394904]  |
+---+-------------------------------------------------------------+
only showing top 5 rows



In [189]:
item_features = item_factors.select("id", "features").rdd.collect()
features_dict = {row['id']: np.array(row['features']) for row in item_features}

In [223]:
def cosine_similarity(a, b):
    return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))
def euclidean_distance(a, b):
    return float(np.linalg.norm(a - b))

def find_similar_movies(id, method = "cosine"):
  try:
    target_movie_factor = features_dict[id]
  except:
    print("no movie with this id found")
    return None

  similarMovie = pd.DataFrame(columns=('movieId',f'{method}Index'))
  if method == "cosine":
    for id, features in features_dict.items():
      similarity = cosine_similarity(target_movie_factor, features)
      new_row = pd.DataFrame({'movieId': [str(id)], f'{method}Index': [similarity]})
      similarMovie = pd.concat([similarMovie, new_row], ignore_index=True)
    top_similar_id = similarMovie.sort_values(by=f'{method}Index', ascending=False)[1:6]
    movies_pd = movies_df.toPandas()
    top_similar_movies = top_similar_id.merge(movies_pd, on='movieId')
  elif method == "euclidean":
    for id, features in features_dict.items():
      distance = euclidean_distance(target_movie_factor, features)
      new_row = pd.DataFrame({'movieId': [str(id)], f'{method}Index': [distance]})
      similarMovie = pd.concat([similarMovie, new_row], ignore_index=True)
    top_similar_id = similarMovie.sort_values(by=f'{method}Index', ascending=False)[1:6]
    movies_pd = movies_df.toPandas()
    top_similar_movies = top_similar_id.merge(movies_pd, on='movieId')
  else:
    print("Invalid method. Use 'cosine' or 'euclidean'.")
    return None
  return top_similar_movies

In [231]:
find_similar_movies(6333)

  similarMovie = pd.concat([similarMovie, new_row], ignore_index=True)


Unnamed: 0,movieId,cosineIndex,title,genres
0,8132,0.998849,Gladiator (1992),Action|Drama
1,468,0.997539,Englishman Who Went Up a Hill But Came Down a ...,Comedy|Romance
2,91978,0.997261,Man on a Ledge (2012),Crime|Thriller
3,96281,0.996686,ParaNorman (2012),Adventure|Animation|Comedy
4,5955,0.996635,Antwone Fisher (2002),Drama


In [234]:
find_similar_movies(668, 'euclidean')

  similarMovie = pd.concat([similarMovie, new_row], ignore_index=True)


Unnamed: 0,movieId,euclideanIndex,title,genres
0,73042,3.568052,Alvin and the Chipmunks: The Squeakquel (2009),Animation|Children|Comedy|Musical
1,121097,3.133988,To Grandmother's House We Go (1992),Adventure|Children|Comedy
2,8138,3.069052,Attack of the Giant Leeches (1959),Horror|Sci-Fi
3,5048,3.038736,Snow Dogs (2002),Adventure|Children|Comedy
4,8372,2.997258,Garfield: The Movie (2004),Animation|Children|Comedy


In [235]:
find_similar_movies(463)

no movie with this id found
