In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [None]:
# initiating spark session
spark = SparkSession.builder.appName('IMDb').getOrCreate()

In [None]:
# loading ratings
df = spark.read.csv(r'./dataclean/ratings.csv', inferSchema= True, header= True)

In [None]:
(train, test) = df.randomSplit([0.95, 0.05], seed= 27)

In [None]:
# Alternating Least Squares (ALS) algorithm for collaborative filtering
als = ALS(maxIter= 5, regParam= 0.01, userCol= 'userId', itemCol= 'movieId', ratingCol= 'rating', \
    coldStartStrategy= "drop", nonnegative= True, implicitPrefs= False)

In [None]:
# Define evaluator as RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

In [None]:
model = als.fit(train)

In [None]:
pred = model.transform(test)

In [None]:
pred.show()

In [None]:
rmse = evaluator.evaluate(pred)
print("Root-mean-square error = " + str(rmse))


In [None]:
als = ALS(maxIter= 10, regParam= 0.1, userCol= 'userId', itemCol= 'movieId', ratingCol= 'rating', \
            coldStartStrategy= "drop", nonnegative= True, implicitPrefs= False)
model = als.fit(train)

pred = model.transform(test)

rmse = evaluator.evaluate(pred)
print("Root-mean-square error = " + str(rmse))

In [None]:
# hyperparameter tuning
# !insensitive Computations

'''
paramGrid = ParamGridBuilder() \
    .addGrid(als.rank, [10, 50, 100]) \
    .addGrid(als.maxIter, [5, 50, 100]) \
    .addGrid(als.regParam, [0.01, 0.05, 0.1]) \
    .build()
    
tvs = CrossValidator(estimator=als, estimatorParamMaps=paramGrid, evaluator=evaluator)

model = tvs.fit(train)
    
best_model = model.bestModel

# Get the best hyperparameters
best_rank = best_model.rank
best_maxIter = best_model._java_obj.parent().getMaxIter()
best_regParam = best_model._java_obj.parent().getRegParam()

# Print the best hyperparameters
print("Best Rank: ", best_rank)
print("Best MaxIter: ", best_maxIter)
print("Best RegParam: ", best_regParam)
'''

maxIters = [5, 10, 15]
regParams = [0.01, 0.05, 0.1]

Root-mean-square error = 0.8885246521231005
Root-mean-square error = 0.8610997118672273
Root-mean-square error = 0.8390232613429933

Root-mean-square error = 0.8767621283015823
Root-mean-square error = 0.8446793441851539
Root-mean-square error = 0.8361191736825451

In [None]:
# try it with for loop to find the best hyperparameters
# !insensitive Computations

'''
maxIters = [5, 10, 15]
regParams = [0.01, 0.05, 0.1]
lowest_rmse = 100
best_model = None

for maxIter in maxIters:
    for regParam in regParams:
        als = ALS(maxIter= maxIter, regParam= regParam, userCol= 'userId', itemCol= 'movieId', ratingCol= 'rating', \
            coldStartStrategy= "drop", nonnegative= True, implicitPrefs= False)
        model = als.fit(train)
        pred = model.transform(test)
        rmse = evaluator.evaluate(pred)
        print("Root-mean-square error = " + str(rmse))
        if rmse < lowest_rmse:
            lowest_rmse = rmse
            best_model = model
'''

Now it is acceptable to use the model on the whole data to make predictions to all users

In [None]:
als = ALS(maxIter= 10, regParam= 0.1, userCol= 'userId', itemCol= 'movieId', ratingCol= 'rating', \
            coldStartStrategy= "drop", nonnegative= True, implicitPrefs= False)

In [None]:
model = als.fit(df)
pred = model.transform(df)

In [None]:
rmse = evaluator.evaluate(pred)
print(f"RMSE: {rmse}")

In [None]:
users_rec = model.recommendForAllUsers(10).toPandas()

In [None]:
users_rec.head()

In [None]:
import pandas as pd
ratings = pd.read_csv('./dataclean/ratings.csv')

In [None]:
movies = pd.read_csv('./dataclean/metadata.csv')
movies.columns

In [None]:
movies[movies['movie_id'] == 104103]

In [None]:
ratings[(ratings.movieId == 862) & (ratings.rating > 4)].head()


In [None]:
rec_byId = users_rec.set_index('userId')

In [None]:
lis = list(ratings[ratings.userId == 2103].sort_values(by='rating').movieId)[:10]
movies[movies.movie_id.isin(lis)]