In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
spark = SparkSession.builder.appName('Collaborative Filtering').getOrCreate()

In [2]:
moviesDF = spark.read.options(header = 'True', inferSchema = 'True').csv("../data/Collaborative Filtering/movies.csv")
ratingsDF = spark.read.options(header = 'True', inferSchema = 'True').csv("../data/Collaborative Filtering/ratings.csv")

moviesDF.show()
ratingsDF.show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sen

In [3]:
display(moviesDF)

DataFrame[movieId: int, title: string, genres: string]

In [4]:
display(ratingsDF)

DataFrame[userId: int, movieId: int, rating: double, timestamp: int]

In [5]:
ratings = ratingsDF.join(moviesDF, 'movieId', 'left')
ratings.show()

+-------+------+------+---------+--------------------+--------------------+
|movieId|userId|rating|timestamp|               title|              genres|
+-------+------+------+---------+--------------------+--------------------+
|      1|     1|   4.0|964982703|    Toy Story (1995)|Adventure|Animati...|
|      3|     1|   4.0|964981247|Grumpier Old Men ...|      Comedy|Romance|
|      6|     1|   4.0|964982224|         Heat (1995)|Action|Crime|Thri...|
|     47|     1|   5.0|964983815|Seven (a.k.a. Se7...|    Mystery|Thriller|
|     50|     1|   5.0|964982931|Usual Suspects, T...|Crime|Mystery|Thr...|
|     70|     1|   3.0|964982400|From Dusk Till Da...|Action|Comedy|Hor...|
|    101|     1|   5.0|964980868|Bottle Rocket (1996)|Adventure|Comedy|...|
|    110|     1|   4.0|964982176|   Braveheart (1995)|    Action|Drama|War|
|    151|     1|   5.0|964984041|      Rob Roy (1995)|Action|Drama|Roma...|
|    157|     1|   5.0|964984100|Canadian Bacon (1...|          Comedy|War|
|    163|   

In [6]:
(train, test) = ratings.randomSplit([0.8, 0.2])

In [7]:
ratings.count()

100836

In [8]:
print(train.count())
train.show()

80814
+-------+------+------+----------+----------------+--------------------+
|movieId|userId|rating| timestamp|           title|              genres|
+-------+------+------+----------+----------------+--------------------+
|      1|     1|   4.0| 964982703|Toy Story (1995)|Adventure|Animati...|
|      1|     5|   4.0| 847434962|Toy Story (1995)|Adventure|Animati...|
|      1|     7|   4.5|1106635946|Toy Story (1995)|Adventure|Animati...|
|      1|    15|   2.5|1510577970|Toy Story (1995)|Adventure|Animati...|
|      1|    17|   4.5|1305696483|Toy Story (1995)|Adventure|Animati...|
|      1|    18|   3.5|1455209816|Toy Story (1995)|Adventure|Animati...|
|      1|    19|   4.0| 965705637|Toy Story (1995)|Adventure|Animati...|
|      1|    21|   3.5|1407618878|Toy Story (1995)|Adventure|Animati...|
|      1|    27|   3.0| 962685262|Toy Story (1995)|Adventure|Animati...|
|      1|    31|   5.0| 850466616|Toy Story (1995)|Adventure|Animati...|
|      1|    32|   3.0| 856736119|Toy Story (

In [9]:
print(test.count())
test.show()

20022
+-------+------+------+----------+----------------+--------------------+
|movieId|userId|rating| timestamp|           title|              genres|
+-------+------+------+----------+----------------+--------------------+
|      1|    40|   5.0| 832058959|Toy Story (1995)|Adventure|Animati...|
|      1|    86|   4.0|1344082549|Toy Story (1995)|Adventure|Animati...|
|      1|    90|   3.0| 856353996|Toy Story (1995)|Adventure|Animati...|
|      1|    96|   5.0| 964772990|Toy Story (1995)|Adventure|Animati...|
|      1|    98|   4.5|1532457849|Toy Story (1995)|Adventure|Animati...|
|      1|   107|   4.0| 829322340|Toy Story (1995)|Adventure|Animati...|
|      1|   121|   4.0| 847656180|Toy Story (1995)|Adventure|Animati...|
|      1|   124|   4.0|1336584336|Toy Story (1995)|Adventure|Animati...|
|      1|   141|   4.0|1513130643|Toy Story (1995)|Adventure|Animati...|
|      1|   151|   5.0| 855947195|Toy Story (1995)|Adventure|Animati...|
|      1|   161|   4.0|1176751765|Toy Story (

In [10]:
als = ALS(
    userCol='userId',
    itemCol='movieId',
    ratingCol='rating',
    nonnegative=True,
    implicitPrefs=False,
    coldStartStrategy='drop'
)

In [11]:
param_grid = ParamGridBuilder() \
    .addGrid(als.rank, [10, 50, 100, 150]) \
    .addGrid(als.regParam, [.01, .05, .1, .15]) \
    .build()

In [12]:
evaluator = RegressionEvaluator(
    metricName = "rmse",
    labelCol = "rating",
    predictionCol = "prediction"
)

In [13]:
len(param_grid)

16

In [14]:
cv = CrossValidator(
    estimator = als,
    estimatorParamMaps = param_grid,
    evaluator = evaluator,
    numFolds = 5
)

### Hands on 

In [15]:
model = cv.fit(train)
best_model = model.bestModel
test_predictions = best_model.transform(test)
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

### Recommendations

In [None]:
print(RMSE)

In [None]:
recommendations = best_model.recommendForAllUsers(5)

In [None]:
recommendations.show()

In [None]:
df = recommendations

In [None]:
display(df)

In [None]:
df2 = df.withColumn('movieid_rating', explode('recommendations'))

In [None]:
display(df2)

In [None]:
display(df2.select("userId", col("movieid_rating")))

In [None]:
display(df2.select("userId", col("movieid_rating.movieID"), col("movieid_rating.rating")))