<a href="https://colab.research.google.com/github/abhilashhn1993/movie-recommendation-engine/blob/main/ALS_model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


Installing the dependencies

In [None]:
#Java JDK
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#Downloading Spark
!wget -q http://apache.mirrors.pair.com/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz 
#Unzipping the hadoop file
!tar -xvf spark-3.0.1-bin-hadoop3.2.tgz

In [None]:
#Fetching the MovieLens dataset
!wget http://files.grouplens.org/datasets/movielens/ml-latest.zip

In [None]:
#Unzip the file
!unzip ml-latest.zip

In [None]:
###################### SPARK SETUP ################################
#Install findspark
!pip install -q findspark

In [None]:
#Setting up environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop3.2"

In [None]:
#Initialize Spark session using findspark lib
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

Read the data file

In [None]:
#setting the path of the files
ratings_file ='/content/ml-latest/ratings.csv'
movies_file = '/content/ml-latest/movies.csv'
links_file = '/content/ml-latest/links.csv'

In [None]:
def readFiles(filename):
  data = spark.read.format('com.databricks.spark.csv').\
                               options(header='true', \
                               inferschema='true').\
                load(filename,header=True)
  return data

In [None]:
#Read the data files
ratings = readFiles(ratings_file)
movies = readFiles(movies_file)
links = readFiles(links_file)

In [None]:
ratings.show(5)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    307|   3.5|1256677221|
|     1|    481|   3.5|1256677456|
|     1|   1091|   1.5|1256677471|
|     1|   1257|   4.5|1256677460|
|     1|   1449|   4.5|1256677264|
+------+-------+------+----------+
only showing top 5 rows



In [None]:
movies.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [None]:
links.show(5)

+-------+------+------+
|movieId|imdbId|tmdbId|
+-------+------+------+
|      1|114709|   862|
|      2|113497|  8844|
|      3|113228| 15602|
|      4|114885| 31357|
|      5|113041| 11862|
+-------+------+------+
only showing top 5 rows



In [None]:
#We will consider ratings file as the file of interest as it has the rating values
#checking the schema
ratings.printSchema()

In [None]:
#Data dimensions
print('No. of rows: %d' % ratings.count())
ratings.show(5)

No. of rows: 27753444
+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    307|   3.5|1256677221|
|     1|    481|   3.5|1256677456|
|     1|   1091|   1.5|1256677471|
|     1|   1257|   4.5|1256677460|
|     1|   1449|   4.5|1256677264|
+------+-------+------+----------+
only showing top 5 rows



In [None]:
#Removing timestamp column, so dropping it
data = ratings.drop("timestamp")

#print the schema now and check that timestamp column is dropped
data.printSchema()

In [None]:
# Randomly split the data into train and test where 80% data is in train and remaining is test
train, test = data.randomSplit([0.8, 0.2])

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [None]:
# Build a recommendation model using Alternating Least Squares method
# Evaluate the model by computing the RMSE on the test data
model = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative=True, coldStartStrategy="drop").fit(train)

from pyspark.ml.evaluation import RegressionEvaluator
evaluator=RegressionEvaluator(metricName="rmse",labelCol="rating",predictionCol="prediction")

In [None]:
# Make predictions and print the RMSE of the ALS model
predictions=model.transform(test)
rmse=evaluator.evaluate(predictions)
print("New RMSE: ", evaluator.evaluate(model.transform(test)))

New RMSE:  0.8217521273769881


Implementing ALS with Cross Validation

In [None]:
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import ParamGridBuilder

In [None]:
# Now we try to improve the performance of the original model using cross validation and solve the cold-start problem.
# we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics

model = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative = True, coldStartStrategy="drop")

#For Parameter tuning of the ALS model we use ParamGridBuilder function
#We tune two parameters 
#1. The Regularization parameter ranging from 0.1, 0.01, 0.001, 0.0001
#2. The rank for matrix factorization
paramGrid = ParamGridBuilder() \
    .addGrid(model.regParam, [0.1, 0.05, 0.01, 0.001]) \
    .addGrid(model.rank, [5, 10, 20, 30]) \
    .build()

#Defining a cross-validator object
#Setting up CV and adding parameters. We will be performing a 5 fold CV
crossvalidation = CrossValidator(estimator = model,
                     estimatorParamMaps = paramGrid,
                     evaluator = evaluator,
                     numFolds=5)

In [None]:
# Run cross-validation, and choose the best set of parameters.
Best_model = crossvalidation.fit(train).bestModel

Printing the Best Model's parameter values

In [None]:
#The Best_model
print(type(Best_model))
#Complete the code below to extract the ALS model parameters
print("**Best Model**")
#Rank
print("Rank: ", Best_model._java_obj.parent().getRank())
#MaxIter
print("MaxIter: ", Best_model._java_obj.parent().getMaxIter())
#RegParam
print("RegParam: ", Best_model._java_obj.parent().getRegParam())

<class 'pyspark.ml.recommendation.ALSModel'>
**Best Model**
Rank:  30
MaxIter:  10
RegParam:  0.05


In [None]:
# Calculate the RMSE on test data using the best set of parameters obtained after cross validation
print("Best RMSE value is: ", evaluator.evaluate(Best_model.transform(test)))

Best RMSE value is:  0.8037012461211825


Checking Predictions on the test set

In [None]:
pred = Best_model.transform(test)
pred.show(10)

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|107339|    148|   4.0| 3.3288345|
| 93112|    148|   3.0| 2.9263139|
|106148|    148|   2.5| 2.7871637|
|234926|    148|   4.0| 2.7707722|
|253535|    148|   4.0| 2.7711174|
|207939|    148|   3.0| 2.8659055|
|220572|    148|   2.0| 2.7842884|
|244192|    148|   3.0| 3.0389357|
|102642|    148|   4.0|   3.34515|
|275860|    148|   3.0| 2.8727908|
+------+-------+------+----------+
only showing top 10 rows



In [None]:
pred.join(movies, "movieId").select("userId","title","genres","prediction").show(5)

+------+--------------------+------+----------+
|userId|               title|genres|prediction|
+------+--------------------+------+----------+
|107339|Awfully Big Adven...| Drama| 3.3288345|
| 93112|Awfully Big Adven...| Drama| 2.9263139|
|106148|Awfully Big Adven...| Drama| 2.7871637|
|234926|Awfully Big Adven...| Drama| 2.7707722|
|253535|Awfully Big Adven...| Drama| 2.7711174|
+------+--------------------+------+----------+
only showing top 5 rows



In [None]:
for_an_user = pred.where(pred.userId==234926).join(movies, "movieId").join(links, "movieId").select("userId","title","tmdbId","genres","prediction")
for_an_user.show(5)

+------+--------------------+------+--------------+----------+
|userId|               title|tmdbId|        genres|prediction|
+------+--------------------+------+--------------+----------+
|234926|Awfully Big Adven...| 22279|         Drama| 2.7707722|
|234926|Angels and Insect...|  8447| Drama|Romance| 3.3438826|
|234926|     Spy Hard (1996)| 10535|        Comedy| 2.2870703|
|234926|    Barcelona (1994)| 16771|Comedy|Romance| 3.3588824|
|234926|    Tommy Boy (1995)| 11381|        Comedy| 2.4864728|
+------+--------------------+------+--------------+----------+
only showing top 5 rows



In [None]:
from IPython.display import Image
from IPython.display import display

In [None]:
import webbrowser
link = "https://www.themoviedb.org/movie"
for movie in for_an_user.take(2):
  url = link+str(movie.tmdbId)
  print(movie.title)
  webbrowser.open(url)

Awfully Big Adventure, An (1995)
Aladdin (1992)


Retreive results and Writing into a csv file

In [None]:
# Get top 10 movie recommendations for all users
movie_recommendation = Best_model.recommendForAllUsers(10)
movie_recommendation.show()
import pandas as pd
movie_recommendation = movie_recommendation.toPandas()

In [None]:
#Get recommendations for all the users into a dataframe
user_list = []
recommendations = []

for i in range(len(movie_recommendation)):
  user_list.append(movie_recommendation.iloc[i,0])
  user_recommendations = "" 
  
  #Get item IDs from the recommendations
  for item in movie_recommendation.iloc[i,1]:
     user_recommendations = user_recommendations + ", " + str(item.asDict()["movieId"])

  #Append the item IDs to recommendations list
  recommendations.append(user_recommendations[2:])

#Convert results into a dataframe
recommendations_df = pd.DataFrame(data = zip(user_list, recommendations), columns=["user", "MovieID"])

In [None]:
#Check the users and the top 10 movie recommendations for the first 10 users
recommendations_df.head(10)

Unnamed: 0,user,MovieID
0,148,"188925, 188923, 128536, 153002, 145893, 152043..."
1,463,"188925, 188923, 191203, 153002, 185519, 180293..."
2,471,"188925, 188923, 183185, 191203, 190707, 153002..."
3,496,"188925, 188923, 149508, 191203, 183185, 76816,..."
4,833,"183185, 188925, 30764, 85205, 166812, 188923, ..."
5,1088,"188925, 183185, 188923, 162436, 137423, 181811..."
6,1238,"142891, 176517, 162436, 139140, 188925, 125786..."
7,1342,"182521, 115685, 128324, 2826, 140807, 77344, 1..."
8,1580,"188925, 188923, 125297, 181405, 191203, 183947..."
9,1591,"188925, 188923, 118095, 86952, 180129, 73826, ..."


In [None]:
#write into a csv file
recommendations_df.to_csv('/content/recommendation_results.txt', sep='\t', index=False)