In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Collaborative_filtering").getOrCreate()

In [0]:
#File uploaded to /FileStore/tables/movies.csv
#File uploaded to /FileStore/tables/ratings.csv
from pyspark.sql.functions import col
movie_df = spark.read.options(header = "True", inferSchema = "True").csv("/FileStore/tables/movies.csv")
movie_df = movie_df.drop(col("_c3"))
display(movie_df)

rating_df = spark.read.options(header = "True", inferSchema = "True").csv("/FileStore/tables/ratings.csv")
display(rating_df)

movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy
6,Heat (1995),Action|Crime|Thriller
7,Sabrina (1995),Comedy|Romance
8,Tom and Huck (1995),Adventure|Children
9,Sudden Death (1995),Action
10,GoldenEye (1995),Action|Adventure|Thriller


userId,movieId,rating,timestamp
1,1,4.0,964982703
1,3,4.0,964981247
1,6,4.0,964982224
1,47,5.0,964983815
1,50,5.0,964982931
1,70,3.0,964982400
1,101,5.0,964980868
1,110,4.0,964982176
1,151,5.0,964984041
1,157,5.0,964984100


In [0]:
#Joining two dataframes
ratings = rating_df.join(movie_df, "movieId", "left")
display(ratings)

movieId,userId,rating,timestamp,title,genres
1,1,4.0,964982703,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
3,1,4.0,964981247,Grumpier Old Men (1995),Comedy|Romance
6,1,4.0,964982224,Heat (1995),Action|Crime|Thriller
47,1,5.0,964983815,Seven (a.k.a. Se7en) (1995),Mystery|Thriller
50,1,5.0,964982931,"Usual Suspects, The (1995)",Crime|Mystery|Thriller
70,1,3.0,964982400,From Dusk Till Dawn (1996),Action|Comedy|Horror|Thriller
101,1,5.0,964980868,Bottle Rocket (1996),Adventure|Comedy|Crime|Romance
110,1,4.0,964982176,Braveheart (1995),Action|Drama|War
151,1,5.0,964984041,Rob Roy (1995),Action|Drama|Romance|War
157,1,5.0,964984100,Canadian Bacon (1995),Comedy|War


In [0]:
type(ratings)

In [0]:
#Preparing training and test set instances or datasets for ML training
(train, test) = ratings.randomSplit([0.8,0.2],1)
print("Training data: ", train.count())
print("Testing data: ", test.count())
print("Total records: ", ratings.count())

In [0]:
#Collaborative filtering using ALS model (Alternating Least Square model)
from pyspark.ml.recommendation import ALS

In [0]:
als = ALS(userCol = "userId",
         itemCol = "movieId",
         ratingCol = "rating",
         nonnegative = True,
         implicitPrefs = False,
         coldStartStrategy = "drop"
         )

# Other parameters in ALs model definition are maxIter=5,regParam=0.09,rank=25

In [0]:
#Hyperparameter tuning and CV
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

param_grid = ParamGridBuilder().addGrid(als.rank, [10,50, 100, 150]).addGrid(als.regParam,[0.01,0.05,0.1,0.15]).build()
evaluator = RegressionEvaluator(metricName = 'rmse', labelCol = "rating", predictionCol = "prediction")

cv = CrossValidator(estimator = als, estimatorParamMaps = param_grid, evaluator = evaluator, numFolds = 5)

model = cv.fit(train)

In [0]:
best_model = model.bestModel

In [0]:
test_prediction = best_model.transform(test)

In [0]:
rmse = evaluator.evaluate(test_prediction)
print(rmse)

In [0]:
#Recommendation for user
recommendation = best_model.recommendForAllUsers(5)
df = recommendation
display(df)

userId,recommendations
1,"List(List(96004, 5.6862903), List(7748, 5.560649), List(5490, 5.5578995), List(132333, 5.5578995), List(6442, 5.3912907))"
2,"List(List(131724, 4.8124914), List(6818, 4.64887), List(25771, 4.5552087), List(8477, 4.5491595), List(78836, 4.514745))"
3,"List(List(5746, 4.859357), List(6835, 4.859357), List(5181, 4.7700753), List(7991, 4.6315627), List(4518, 4.5883965))"
4,"List(List(3365, 4.6935506), List(2204, 4.6691527), List(5490, 4.668449), List(132333, 4.668449), List(1046, 4.65646))"
5,"List(List(96004, 4.8375897), List(7748, 4.7979736), List(6818, 4.6952467), List(132333, 4.6804085), List(5490, 4.6804085))"
6,"List(List(6732, 4.885961), List(74282, 4.6656804), List(5490, 4.623512), List(132333, 4.623512), List(147382, 4.5985613))"
7,"List(List(96004, 4.6249475), List(7748, 4.5262527), List(132333, 4.4611087), List(5490, 4.4611087), List(5466, 4.4474516))"
8,"List(List(96004, 4.8405256), List(7748, 4.7271595), List(4495, 4.6650023), List(6201, 4.6650023), List(177593, 4.584171))"
9,"List(List(96004, 4.9432187), List(7748, 4.9114113), List(6818, 4.7821155), List(27156, 4.750377), List(132333, 4.7423153))"
10,"List(List(8869, 4.468843), List(71579, 4.4413395), List(113275, 4.3430114), List(3086, 4.2957177), List(94070, 4.286806))"


In [0]:
from pyspark.sql.functions import split, explode
df2 = df.withColumn("movieID_rating",explode("recommendations"))
display(df2)

userId,recommendations,movieID_rating
1,"List(List(96004, 5.6862903), List(7748, 5.560649), List(5490, 5.5578995), List(132333, 5.5578995), List(6442, 5.3912907))","List(96004, 5.6862903)"
1,"List(List(96004, 5.6862903), List(7748, 5.560649), List(5490, 5.5578995), List(132333, 5.5578995), List(6442, 5.3912907))","List(7748, 5.560649)"
1,"List(List(96004, 5.6862903), List(7748, 5.560649), List(5490, 5.5578995), List(132333, 5.5578995), List(6442, 5.3912907))","List(5490, 5.5578995)"
1,"List(List(96004, 5.6862903), List(7748, 5.560649), List(5490, 5.5578995), List(132333, 5.5578995), List(6442, 5.3912907))","List(132333, 5.5578995)"
1,"List(List(96004, 5.6862903), List(7748, 5.560649), List(5490, 5.5578995), List(132333, 5.5578995), List(6442, 5.3912907))","List(6442, 5.3912907)"
2,"List(List(131724, 4.8124914), List(6818, 4.64887), List(25771, 4.5552087), List(8477, 4.5491595), List(78836, 4.514745))","List(131724, 4.8124914)"
2,"List(List(131724, 4.8124914), List(6818, 4.64887), List(25771, 4.5552087), List(8477, 4.5491595), List(78836, 4.514745))","List(6818, 4.64887)"
2,"List(List(131724, 4.8124914), List(6818, 4.64887), List(25771, 4.5552087), List(8477, 4.5491595), List(78836, 4.514745))","List(25771, 4.5552087)"
2,"List(List(131724, 4.8124914), List(6818, 4.64887), List(25771, 4.5552087), List(8477, 4.5491595), List(78836, 4.514745))","List(8477, 4.5491595)"
2,"List(List(131724, 4.8124914), List(6818, 4.64887), List(25771, 4.5552087), List(8477, 4.5491595), List(78836, 4.514745))","List(78836, 4.514745)"


In [0]:
#Making a compact dataframe
reco_df = df2.select("userId", col("movieID_rating.movieId"), col("movieID_rating.rating"))


movieId,userId,rating,title,genres
96004,1,5.6862903,Dragon Ball Z: The History of Trunks (Doragon b�ru Z: Zetsub� e no hank�!! Nokosareta ch� senshi - Gohan to Torankusu) (1993),Action|Adventure|Animation
7748,1,5.560649,Pierrot le fou (1965),Crime|Drama
5490,1,5.5578995,The Big Bus (1976),Action|Comedy
132333,1,5.5578995,Seve (2014),Documentary|Drama
6442,1,5.3912907,Belle �poque (1992),Comedy|Romance
131724,2,4.8124914,The Jinx: The Life and Deaths of Robert Durst (2015),Documentary
6818,2,4.64887,Come and See (Idi i smotri) (1985),Drama|War
25771,2,4.5552087,"Andalusian Dog, An (Chien andalou, Un) (1929)",Fantasy
8477,2,4.5491595,"Jet�e, La (1962)",Romance|Sci-Fi
78836,2,4.514745,Enter the Void (2009),Drama


In [0]:
#Getting the movie name and its genre
final_reco_df = reco_df.join(movie_df, "movieId","left")
display(final_reco_df)