In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Collaborative Filtering").getOrCreate()

In [2]:
movies_df = spark.read.options(header=True, inferSchema=True).csv("movies.csv")
ratings_df = spark.read.options(header=True, inferSchema=True).csv("ratings.csv")

In [3]:
# join the two dataframes
movie_ratings_df = ratings_df.join(movies_df, "movieId", "left")
movie_ratings_df.show()

+-------+------+------+---------+--------------------+--------------------+
|movieId|userId|rating|timestamp|               title|              genres|
+-------+------+------+---------+--------------------+--------------------+
|      1|     1|   4.0|964982703|    Toy Story (1995)|Adventure|Animati...|
|      3|     1|   4.0|964981247|Grumpier Old Men ...|      Comedy|Romance|
|      6|     1|   4.0|964982224|         Heat (1995)|Action|Crime|Thri...|
|     47|     1|   5.0|964983815|Seven (a.k.a. Se7...|    Mystery|Thriller|
|     50|     1|   5.0|964982931|Usual Suspects, T...|Crime|Mystery|Thr...|
|     70|     1|   3.0|964982400|From Dusk Till Da...|Action|Comedy|Hor...|
|    101|     1|   5.0|964980868|Bottle Rocket (1996)|Adventure|Comedy|...|
|    110|     1|   4.0|964982176|   Braveheart (1995)|    Action|Drama|War|
|    151|     1|   5.0|964984041|      Rob Roy (1995)|Action|Drama|Roma...|
|    157|     1|   5.0|964984100|Canadian Bacon (1...|          Comedy|War|
|    163|   

In [4]:
# split the data into training and test sets
(training_df, test_df) = movie_ratings_df.randomSplit([0.8, 0.2], seed=42)
training_df.show()
test_df.show()

+-------+------+------+----------+----------------+--------------------+
|movieId|userId|rating| timestamp|           title|              genres|
+-------+------+------+----------+----------------+--------------------+
|      1|     1|   4.0| 964982703|Toy Story (1995)|Adventure|Animati...|
|      1|     5|   4.0| 847434962|Toy Story (1995)|Adventure|Animati...|
|      1|    15|   2.5|1510577970|Toy Story (1995)|Adventure|Animati...|
|      1|    17|   4.5|1305696483|Toy Story (1995)|Adventure|Animati...|
|      1|    18|   3.5|1455209816|Toy Story (1995)|Adventure|Animati...|
|      1|    21|   3.5|1407618878|Toy Story (1995)|Adventure|Animati...|
|      1|    31|   5.0| 850466616|Toy Story (1995)|Adventure|Animati...|
|      1|    32|   3.0| 856736119|Toy Story (1995)|Adventure|Animati...|
|      1|    33|   3.0| 939647444|Toy Story (1995)|Adventure|Animati...|
|      1|    40|   5.0| 832058959|Toy Story (1995)|Adventure|Animati...|
|      1|    44|   3.0| 869251860|Toy Story (1995)|

In [5]:
# ALS model
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative=True, implicitPrefs=False, coldStartStrategy="drop")

In [6]:
param_grid = ParamGridBuilder() \
    .addGrid(als.rank, [10, 50, 100]) \
    .addGrid(als.maxIter, [5, 10, 20]) \
    .addGrid(als.regParam, [0.01, 0.1, 1.0]) \
    .build()


In [7]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

In [8]:
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

In [9]:
model = cv.fit(training_df)
best_model = model.bestModel
testPredictions = best_model.transform(test_df)
rmse = evaluator.evaluate(testPredictions)
print(f"Root-mean-square error = {rmse}")

KeyboardInterrupt: 

In [None]:
recommendations = best_model.recommendForAllUsers(5)
df = recommendations
df.show()

NameError: name 'best_model' is not defined

In [None]:
from pyspark.sql.functions import explode, col
df2 = df.withColumn("movieid_rating", explode("recommendations"))
df2.select("userId", col("movieid_rating.movieId"), col("movieid_rating.rating")).show()

NameError: name 'df' is not defined