In [1]:
import findspark
import streamlit
import setuptools

findspark.init()

import pyspark

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.sql("select 'spark' as hello")

/usr/local/lib/python3.12/site-packages/pyspark/bin/load-spark-env.sh: line 68: ps: command not found
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/25 09:16:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Load the Data

In [2]:
from pyspark.sql.functions import col

ratings = spark.read.csv("ratings.csv",header=True)
movies = spark.read.csv("movies.csv",header=True)


ratings = ratings \
            .withColumn("userId", col("userId").cast('integer'))\
            .withColumn("movieId", col("movieId").cast("integer"))\
            .withColumn("rating", col("rating").cast("float"))\
            .drop('timestamp')

user_ratings = ratings.join(movies, on='movieId')
user_ratings = user_ratings.toPandas()
user_ratings = user_ratings.pivot_table(index=['userId'], columns=['title'], values=['rating'])
user_ratings.head()

Unnamed: 0_level_0,rating,rating,rating,rating,rating,rating,rating,rating,rating,rating,rating,rating,rating,rating,rating,rating,rating,rating,rating,rating,rating
title,"""11'09""""01 - September 11 (2002)""",'71 (2014),'Hellboy': The Seeds of Creation (2004),'Round Midnight (1986),'Salem's Lot (2004),'Til There Was You (1997),'Tis the Season for Love (2015),"'burbs, The (1989)",'night Mother (1986),(500) Days of Summer (2009),...,Zulu (2013),[REC] (2007),[REC]² (2009),[REC]³ 3 Génesis (2012),anohana: The Flower We Saw That Day - The Movie (2013),eXistenZ (1999),xXx (2002),xXx: State of the Union (2005),¡Three Amigos! (1986),À nous la liberté (Freedom for Us) (1931)
userId,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2,Unnamed: 7_level_2,Unnamed: 8_level_2,Unnamed: 9_level_2,Unnamed: 10_level_2,Unnamed: 11_level_2,Unnamed: 12_level_2,Unnamed: 13_level_2,Unnamed: 14_level_2,Unnamed: 15_level_2,Unnamed: 16_level_2,Unnamed: 17_level_2,Unnamed: 18_level_2,Unnamed: 19_level_2,Unnamed: 20_level_2,Unnamed: 21_level_2
1,,,,,,,,,,,...,,,,,,,,,4.0,
2,,,,,,,,,,,...,,,,,,,,,,
3,,,,,,,,,,,...,,,,,,,,,,
4,,,,,,,,,,,...,,,,,,,,,,
5,,,,,,,,,,,...,,,,,,,,,,


# Model Based Recommendation 

In real-world problems, the utility matrix (or rating matrix from Step 8) is expected to be very sparse,
as each user only encounters a small fraction of items among the vast pool of options available

In [4]:
# Calculate Sparsity 

# total number of ratings 
numerator = ratings.select("rating").count()

# distinct users
num_users = ratings.select("userId").distinct().count()
num_movies = ratings.select("movieId").distinct().count()

denominator = num_users * num_movies

sparcity = (1.0 - (numerator * 1.0)/denominator)*100 
print("The ratings dataframe is", "%.2f" % sparcity + "% empty")

The ratings dataframe is 98.30% empty


$$sparcity = 1 - \frac{numberOfNonZeroEntries}{(numberOfUsers \times numberOfItems)}$$

Real-world datasets may suffer from a greater extent of sparsity and have been a long-standing challenge
in building recommender systems. A viable solution is to use additional side information such as user/item
features to alleviate the sparsity by using, for example, hyperparameter tuning

In [9]:
from pyspark.ml.recommendation import ALS 
from pyspark.ml.evaluation import RegressionEvaluator

# split the data 
(train, test) = ratings.randomSplit([0.8, 0.2], seed=42)

als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative=True, implicitPrefs=False, coldStartStrategy="drop")

ALSModel = als.fit(train)

# make predictions and display
pred = ALSModel.transform(test)
pred.show()

# evaluate the model
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol='prediction')
RMSE = evaluator.evaluate(pred)

print(f"RMSE = {RMSE}")

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   148|   4896|   4.0| 3.4620237|
|   148|   5618|   3.0| 3.7984607|
|   148|   7153|   3.0| 3.3978703|
|   148|  40629|   5.0| 3.0824335|
|   148|  40815|   4.0| 3.5715442|
|   148|  60069|   4.5| 3.7576718|
|   148|  68954|   4.0|  3.608876|
|   148|  69844|   4.0| 3.5291357|
|   148|  79132|   1.5| 3.8258843|
|   148|  79702|   4.0|   3.50854|
|   148|  81834|   4.0|  3.849213|
|   148|  81847|   4.5| 3.2300253|
|   148|  98243|   4.5| 3.5974226|
|   148|  98491|   5.0| 3.4292328|
|   148| 108932|   4.0|  3.276753|
|   463|   1088|   3.5| 3.5674582|
|   463|   1221|   4.5| 3.9980366|
|   463|   2028|   4.5|  4.199782|
|   463|   2167|   3.0| 3.6339138|
|   463|   3448|   3.0| 3.7760227|
+------+-------+------+----------+
only showing top 20 rows

RMSE = 0.8784977775665759


In [12]:
from pyspark.sql.functions import explode

# Make recommendations
nrecommendations = ALSModel.recommendForAllUsers(10)

nrecommendations = nrecommendations\
        .withColumn("rec_exp", explode("recommendations"))\
        .select("userId", col("rec_exp.movieId"), col("rec_exp.rating"))

nrecommendations.limit(10).show()

+------+-------+---------+
|userId|movieId|   rating|
+------+-------+---------+
|     1| 132333|5.7562394|
|     1|  96004| 5.724287|
|     1|   3379| 5.724287|
|     1| 177593|5.6645894|
|     1|   5915|5.6053195|
|     1|   8542| 5.602462|
|     1|  60943|5.5927606|
|     1|  59018|5.5927606|
|     1|   5075| 5.583153|
|     1| 171495|5.5218754|
+------+-------+---------+



In [13]:
nrecommendations.join(movies, on='movieId').filter('userId = 100').show()

ratings.join(movies, on='movieId').filter('userId = 100').sort('rating', ascending=False).limit(10).show()

+-------+------+---------+--------------------+--------------------+
|movieId|userId|   rating|               title|              genres|
+-------+------+---------+--------------------+--------------------+
|  33649|   100| 5.343287|  Saving Face (2004)|Comedy|Drama|Romance|
|   5075|   100| 5.208376|  Waydowntown (2000)|              Comedy|
| 183897|   100| 5.174079| Isle of Dogs (2018)|    Animation|Comedy|
|  67618|   100| 5.076163|Strictly Sexual (...|Comedy|Drama|Romance|
|  25771|   100| 5.044877|Andalusian Dog, A...|             Fantasy|
|    945|   100| 5.022914|      Top Hat (1935)|Comedy|Musical|Ro...|
|  74282|   100| 5.001731|Anne of Green Gab...|Children|Drama|Ro...|
|    171|   100| 4.984367|      Jeffrey (1995)|        Comedy|Drama|
|   1066|   100|4.9647713|Shall We Dance (1...|Comedy|Musical|Ro...|
| 104875|   100|4.9545116|History of Future...|Adventure|Comedy|...|
+-------+------+---------+--------------------+--------------------+

+-------+------+------+----------

The movie recommended to the 100th user primarily belongs to comedy, drama, romance genres, and
the movies preferred by the user as seen in the above table, match closely with these genres.