In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Movie Recommendation App").getOrCreate()

Importing csv files

In [2]:
movies_df = spark.read.csv("./data/movies.csv",header = True)
ratings_df = spark.read.csv("./data/ratings.csv",header= True)

movies_df.show(5)
ratings_df.show(5)


+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [3]:
numonitor = ratings_df.count()
print("total Count: ",numonitor)
col = ratings_df.columns
print("Rating table columns: ",col)
user = ratings_df.select("userId").distinct().count()
print("Total Users: ",user)
movie = ratings_df.select("movieId").distinct().count()
print("Total movies: ",movie)

total Count:  100836
Rating table columns:  ['userId', 'movieId', 'rating', 'timestamp']
Total Users:  610
Total movies:  9724


In [4]:
#check sparsity of data : the condition where a large percentage of data within a 
# dataset is missing or is set to zero.
denominator = user * movie
sparsity = (1.0 - (numonitor *1.0)/denominator)* 100
print(sparsity)

98.30003169443864


In [5]:
# drop rows with missing values
movies_df = movies_df.dropna()
ratings_df = ratings_df.dropna()
print(movies_df.count())
print(ratings_df.count())

9742
100836


In [6]:
#check data schema
ratings_df.printSchema()

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [7]:
#convert string to integer and double
ratings_df = ratings_df.select(ratings_df.userId.cast("integer"), ratings_df.movieId.cast("integer"), 
                               ratings_df.rating.cast("double"))
ratings_df.printSchema()
ratings_df.show(5)

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|      6|   4.0|
|     1|     47|   5.0|
|     1|     50|   5.0|
+------+-------+------+
only showing top 5 rows



In [36]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

(train, test) = ratings_df.randomSplit([0.8,0.2], seed=42)

als = ALS(regParam= 0.1,maxIter=20, userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative= True,coldStartStrategy="drop")

model = als.fit(train)

prediction = model.transform(test)
prediction.show()



+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   463|   1088|   3.5| 3.9745536|
|   580|   3175|   2.5|  3.567155|
|   580|  44022|   3.5|  3.616548|
|   362|   1645|   5.0|  4.039502|
|   597|   1959|   4.0| 4.2054834|
|   155|   3175|   4.0| 3.7744727|
|   368|   2122|   2.0| 2.1551127|
|   115|   1645|   4.0| 2.7881613|
|   115|   3175|   4.0| 3.8374147|
|    28|   1645|   2.5| 2.9010844|
|    28|   3175|   1.5| 2.9563844|
|   587|   1580|   4.0| 3.7444701|
|   332|   1645|   3.5| 3.0612357|
|   332|   2366|   3.5| 3.5639892|
|   577|   1580|   3.0| 3.3973541|
|   577|   1959|   4.0| 3.5664096|
|   271|   6658|   2.0| 2.5463045|
|   606|   1088|   3.0|   3.39345|
|    91|   1580|   3.5| 3.3952494|
|    91|   6620|   3.5| 2.9556072|
+------+-------+------+----------+
only showing top 20 rows



In [35]:
# Calculate Root Mean Squared Error 
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(prediction)

print(f"Root Mean Squared Error (RMSE): {rmse}")

Root Mean Squared Error (RMSE): 0.874706776453848


In [19]:

user_id = 1
user_ratings = ratings_df.filter(ratings_df.userId == user_id)

recommendations = model.recommendForUserSubset(user_ratings, 10)


from pyspark.sql.functions import explode

# Extract movieId from the recommendations (as it's in an array)
recommended_movie_ids = recommendations.select(explode("recommendations.movieId").alias("movieId"))

# Step 3: Join the exploded movieIds with movies_df to get the movie titles
recommended_movies = recommended_movie_ids.join(movies_df, on="movieId", how="left_outer")

# Step 4: Show the recommended movie titles
recommended_movies.select("title").show(truncate=False)


+-----------------------------------------------------------------------------------------------------------------------------+
|title                                                                                                                        |
+-----------------------------------------------------------------------------------------------------------------------------+
|Dragon Ball Z: The History of Trunks (Doragon bôru Z: Zetsubô e no hankô!! Nokosareta chô senshi - Gohan to Torankusu) (1993)|
|On the Beach (1959)                                                                                                          |
|Yojimbo (1961)                                                                                                               |
|Cosmos                                                                                                                       |
|Band of Brothers (2001)                                                                                

In [37]:
spark.stop()