In [0]:
dbutils.fs.rm('/FileStore/tables/',True) #to remove files not needed

Out[1]: True

In [0]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import col,explode
spark = SparkSession.builder.appName("Collaborative filtering").getOrCreate()

In [0]:
moviesDF = spark.read.options(header="True", inferSchema="True").csv("/FileStore/tables/movies.csv")
ratingsDF = spark.read.options(header="True", inferSchema="True").csv("/FileStore/tables/ratings.csv")

In [0]:
#display(moviesDF) - prints the movie df

In [0]:
moviesDF.show(5)
ratingsDF.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [0]:
moviesDF.printSchema()
ratingsDF.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [0]:
ratings = ratingsDF.join(moviesDF, 'movieId', 'left')
ratings.show(5) #left join

+-------+------+------+---------+--------------------+--------------------+
|movieId|userId|rating|timestamp|               title|              genres|
+-------+------+------+---------+--------------------+--------------------+
|      1|     1|   4.0|964982703|    Toy Story (1995)|Adventure|Animati...|
|      3|     1|   4.0|964981247|Grumpier Old Men ...|      Comedy|Romance|
|      6|     1|   4.0|964982224|         Heat (1995)|Action|Crime|Thri...|
|     47|     1|   5.0|964983815|Seven (a.k.a. Se7...|    Mystery|Thriller|
|     50|     1|   5.0|964982931|Usual Suspects, T...|Crime|Mystery|Thr...|
+-------+------+------+---------+--------------------+--------------------+
only showing top 5 rows



In [0]:
(train, test) = ratings.randomSplit([0.8,0.2])

In [0]:
ratings.count()

Out[11]: 100836

In [0]:
train.count()

Out[12]: 80559

In [0]:
test.count()

Out[13]: 20277

als model -  is a popular algorithm used for collaborative filtering in recommendation systems

In [0]:
als = ALS(userCol = "userId", itemCol="movieId", ratingCol="rating", nonnegative=True,implicitPrefs=False, coldStartStrategy="drop") 
#usercol - refers to the user 
#itemcol - refers to the item selected or the item of which recommendation should be made
#ratingcol - the parameter according to which ratings are defined
#nonnegative - consideration of negative ratings
#implicitperfs - refers to the implicit or explicit dataset
#colstartstrategy - dropping the users with no ratings

hyperparameter tuning and cross validation

In [0]:
#Hyperparameter tuning is the process of finding the best settings for the parts of a machine learning model that you have to decide before training
#When tuning hyperparameters, you find the best combination of hyperparameters that result in the highest performance of your model. To do this reliably, you use cross-validation to evaluate the performance of each combination of hyperparameters

In [0]:
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 50, 100, 150]) \
            .addGrid(als.regParam, [.01, .05, .1, .15]) \
            .build()
#is a utility in PySpark's MLlib that helps you create a grid of hyperparameters for model tuning, particularly for use with
#this will create 4*4 = 16 models with all combinations of parameters with values and then refer to the best module using cross validator
#rank parameter, number of features used in the model, model will be evaluated based on the values in the list
#regulizer parameter, which prevents the model from overfitting

In [0]:
evaluator = RegressionEvaluator(
           metricName="rmse",   #this shows the parameters with which evaluation should be performed
           labelCol="rating",   #rmse, refers to root mean square error
           predictionCol="prediction")
#regression evaluator along with param grid builder goes for cross validator

In [0]:
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)
#to get the optimised or best cross validator

In [0]:
model = cv.fit(train)
best_model = model.bestModel #getting the best models created inside param grid
test_predictions = best_model.transform(test) 
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

In [0]:
recommendations = best_model.recommendForAllUsers(5) #it will recommend top 5 best models for users based on ratings for users
df = recommendations
display(df)   

In [0]:
df2.select("userId", col("movieid_rating.movieId"), col("movieid_rating.rating")).show(5)