# A Collaborative Filtering project for Recommender System Using Alternating Least Squares Algorithm(ALS)

In [0]:
# Importing the necessary libraries

from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import col,explode

In [0]:
spark = SparkSession.builder.appName("Project_1").getOrCreate()

In [0]:
# Defining the file path

path1 = "/FileStore/tables/movies.csv"
path2 = "/FileStore/tables/ratings.csv"

In [0]:
# Reading the CSV file

movies = spark.read.csv(path1, header=True, inferSchema=True)
ratings = spark.read.csv(path2, header=True, inferSchema=True)

In [0]:
# Printing out the dataframe's schema.

movies.printSchema()
ratings.printSchema()

In [0]:
movies.limit(5).display()
ratings.limit(5).display()

movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy


userId,movieId,rating,timestamp
1,1,4.0,964982703
1,3,4.0,964981247
1,6,4.0,964982224
1,47,5.0,964983815
1,50,5.0,964982931


In [0]:
df = movies.join(ratings, 'movieId', 'right')

In [0]:
df.limit(5).display()

movieId,title,genres,userId,rating,timestamp
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy,1,4.0,964982703
3,Grumpier Old Men (1995),Comedy|Romance,1,4.0,964981247
6,Heat (1995),Action|Crime|Thriller,1,4.0,964982224
47,Seven (a.k.a. Se7en) (1995),Mystery|Thriller,1,5.0,964983815
50,"Usual Suspects, The (1995)",Crime|Mystery|Thriller,1,5.0,964982931


In [0]:
# Splitting the data into train and test set (80%:20%)

(train, test) = df.randomSplit([0.8,0.2])

In [0]:
# DataFrame size, Train size, Test size

print("Dataframe size : " + str(df.count()))
print("Train size : " + str(train.count()))
print("Test size : " + str(test.count()))

In [0]:
# Building the recommendation model using ALS Algorithm on the training data

als = ALS(implicitPrefs= False, coldStartStrategy="drop", nonnegative=True , itemCol= 'movieId', userCol= 'userId' , ratingCol= 'rating')

In [0]:
# ParamGridBuilder to construct a grid of parameters to search over.

param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 50, 100, 150]) \
            .addGrid(als.regParam, [.01, .05, .1, .15]) \
            .build()

In [0]:
# Metric to measure how well the model performs on the test data (Root Mean Square Error)

evaluator = RegressionEvaluator(
           metricName="rmse", 
           labelCol="rating", 
           predictionCol="prediction")

In [0]:
# Cross-validation for model performance measurement

cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

In [0]:
# Training the model

model = cv.fit(train)
best_model = model.bestModel
test_predictions = best_model.transform(test)
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

In [0]:
# The root mean squared error (RMSE) to measure error of the model

print(RMSE)

In [0]:
# Recommendation based on the model

top_recommendations = best_model.recommendForAllUsers(3)

In [0]:
top_recommendations.limit(5).display()

userId,recommendations
1,"List(List(33649, 5.675107), List(2563, 5.4604735), List(171495, 5.399735))"
2,"List(List(131724, 4.8121376), List(33649, 4.6480064), List(171495, 4.506824))"
3,"List(List(6835, 4.827012), List(5919, 4.7249675), List(5181, 4.7160034))"
4,"List(List(3851, 4.846816), List(4765, 4.771408), List(1733, 4.639248))"
5,"List(List(4495, 4.4474254), List(33649, 4.446929), List(7096, 4.444813))"


In [0]:
# Top recommendation and their ratings

top_rec = top_recommendations.withColumn("movieid_rating", explode("recommendations"))
top_rec.limit(5).display()

userId,recommendations,movieid_rating
1,"List(List(33649, 5.675107), List(2563, 5.4604735), List(171495, 5.399735))","List(33649, 5.675107)"
1,"List(List(33649, 5.675107), List(2563, 5.4604735), List(171495, 5.399735))","List(2563, 5.4604735)"
1,"List(List(33649, 5.675107), List(2563, 5.4604735), List(171495, 5.399735))","List(171495, 5.399735)"
2,"List(List(131724, 4.8121376), List(33649, 4.6480064), List(171495, 4.506824))","List(131724, 4.8121376)"
2,"List(List(131724, 4.8121376), List(33649, 4.6480064), List(171495, 4.506824))","List(33649, 4.6480064)"


In [0]:
# Each user movie recommendation

top_rec.select("userId", col("movieid_rating.movieId"), col("movieid_rating.rating")).limit(5).display()

userId,movieId,rating
1,33649,5.675107
1,2563,5.4604735
1,171495,5.399735
2,131724,4.8121376
2,33649,4.6480064
