In [0]:
# Check Spark version and session
spark

print("Spark Version:", spark.version)

df = spark.range(5)
df.show()


Spark Version: 4.0.0
+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



In [0]:
# Load movies table
movies_df = spark.table("default.movies")
print("Movies Table:")
movies_df.show(5)

# Load ratings table
ratings_df = spark.table("default.ratings")
print("Ratings Table:")
ratings_df.show(5)


Movies Table:
+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows
Ratings Table:
+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows


In [0]:
ratings_df.select("rating").where("rating is NULL").count()

0

In [0]:
ratings_df.where("userId is NULL").count()

0

In [0]:
ratings_df.where("movieId is NULL").count()

0

In [0]:
combined_df = ratings_df.join(movies_df, on = "movieId", how = "inner")
combined_df.show(5)

+-------+------+------+---------+--------------------+--------------------+
|movieId|userId|rating|timestamp|               title|              genres|
+-------+------+------+---------+--------------------+--------------------+
|      1|     1|   4.0|964982703|    Toy Story (1995)|Adventure|Animati...|
|      3|     1|   4.0|964981247|Grumpier Old Men ...|      Comedy|Romance|
|      6|     1|   4.0|964982224|         Heat (1995)|Action|Crime|Thri...|
|     47|     1|   5.0|964983815|Seven (a.k.a. Se7...|    Mystery|Thriller|
|     50|     1|   5.0|964982931|Usual Suspects, T...|Crime|Mystery|Thr...|
+-------+------+------+---------+--------------------+--------------------+
only showing top 5 rows


In [0]:
cleaned_df = combined_df.drop("timestamp")
cleaned_df.show(5)

+-------+------+------+--------------------+--------------------+
|movieId|userId|rating|               title|              genres|
+-------+------+------+--------------------+--------------------+
|      1|     1|   4.0|    Toy Story (1995)|Adventure|Animati...|
|      3|     1|   4.0|Grumpier Old Men ...|      Comedy|Romance|
|      6|     1|   4.0|         Heat (1995)|Action|Crime|Thri...|
|     47|     1|   5.0|Seven (a.k.a. Se7...|    Mystery|Thriller|
|     50|     1|   5.0|Usual Suspects, T...|Crime|Mystery|Thr...|
+-------+------+------+--------------------+--------------------+
only showing top 5 rows


In [0]:
from pyspark.sql.functions import col
als_df = cleaned_df.select(col("userId").cast("int"), col("movieId").cast("int"), col("rating").cast("float"),"title","genres")
als_df.printSchema()
als_df.show(5)

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: float (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

+------+-------+------+--------------------+--------------------+
|userId|movieId|rating|               title|              genres|
+------+-------+------+--------------------+--------------------+
|     1|      1|   4.0|    Toy Story (1995)|Adventure|Animati...|
|     1|      3|   4.0|Grumpier Old Men ...|      Comedy|Romance|
|     1|      6|   4.0|         Heat (1995)|Action|Crime|Thri...|
|     1|     47|   5.0|Seven (a.k.a. Se7...|    Mystery|Thriller|
|     1|     50|   5.0|Usual Suspects, T...|Crime|Mystery|Thr...|
+------+-------+------+--------------------+--------------------+
only showing top 5 rows


In [0]:
train_df, test_df = als_df.randomSplit([0.8,0.2], seed = 42)
print("Training rows: ", train_df.count())
print("Testing rows: ", test_df.count())

Training rows:  80481
Testing rows:  20355


In [0]:
from pyspark.ml.recommendation import ALS

als = ALS(
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    rank=10,
    regParam=0.1,
    maxIter=10,
    coldStartStrategy="drop",  # avoids NaN predictions
    nonnegative=True
)


In [0]:
model = als.fit(train_df)


In [0]:
from pyspark.ml.evaluation import RegressionEvaluator

predictions = model.transform(test_df)

evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)

rmse = evaluator.evaluate(predictions)
print("Test RMSE:", rmse)


Test RMSE: 0.8761925712580132


In [0]:
from pyspark.sql.functions import col

all_user_movie = ratings_df.select("userId").distinct() \
    .crossJoin(ratings_df.select("movieId").distinct())
predicted = model.transform(all_user_movie)
rated = ratings_df.select("userId", "movieId")
predicted_filtered = predicted.join(
    rated,
    on=["userId", "movieId"],
    how="left_anti"
)
from pyspark.sql.window import Window
import pyspark.sql.functions as F

window = Window.partitionBy("userId").orderBy(F.desc("prediction"))

user_top10 = predicted_filtered \
    .withColumn("rank", F.row_number().over(window)) \
    .filter(col("rank") <= 10)
user_top10.show(20, truncate=False)


+------+-------+----------+----+
|userId|movieId|prediction|rank|
+------+-------+----------+----+
|15    |166534 |4.6786194 |1   |
|15    |26133  |4.5958548 |2   |
|15    |1272   |4.55286   |3   |
|15    |1251   |4.490462  |4   |
|15    |4437   |4.462873  |5   |
|15    |26131  |4.438374  |6   |
|15    |7481   |4.3897295 |7   |
|15    |39444  |4.359623  |8   |
|15    |26171  |4.35027   |9   |
|15    |2563   |4.3427744 |10  |
|20    |7096   |5.410513  |1   |
|20    |6375   |5.3067513 |2   |
|20    |87234  |5.2422423 |3   |
|20    |4649   |5.23726   |4   |
|20    |1172   |5.2108912 |5   |
|20    |26171  |5.1619473 |6   |
|20    |3677   |5.0507097 |7   |
|20    |51931  |5.038039  |8   |
|20    |56921  |5.035883  |9   |
|20    |290    |5.0316496 |10  |
+------+-------+----------+----+
only showing top 20 rows


In [0]:
final_recommendations = (
    user_top10
    .join(movies_df, on="movieId", how="left")
    .select("userId", "rank", "movieId", "title", "genres", "prediction")
    .orderBy("userId", "rank")
)

final_recommendations.show(20, truncate=False)


+------+----+-------+-----------------------------------------------------+---------------------+----------+
|userId|rank|movieId|title                                                |genres               |prediction|
+------+----+-------+-----------------------------------------------------+---------------------+----------+
|1     |1   |5490   |The Big Bus (1976)                                   |Action|Comedy        |5.7378874 |
|1     |2   |132333 |Seve (2014)                                          |Documentary|Drama    |5.7378874 |
|1     |3   |5915   |Victory (a.k.a. Escape to Victory) (1981)            |Action|Drama|War     |5.7378874 |
|1     |4   |33649  |Saving Face (2004)                                   |Comedy|Drama|Romance |5.7304664 |
|1     |5   |6818   |Come and See (Idi i smotri) (1985)                   |Drama|War            |5.5311627 |
|1     |6   |1172   |Cinema Paradiso (Nuovo cinema Paradiso) (1989)       |Drama                |5.5196133 |
|1     |7   |6375  