In [23]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import udf,col,when
import numpy as np
from pyspark.sql.functions import explode

In [24]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("MovieRecommendationSystem") \
    .getOrCreate()

# Load the data
movies_path = "D:/Downloads/movie_data/movies.csv"
ratings_path = "D:/Downloads/movie_data/ratings.csv"

In [25]:
movies_df = spark.read.csv(movies_path, header=True, inferSchema=True)
ratings_df = spark.read.csv(ratings_path, header=True, inferSchema=True)

In [26]:
print("Movies Data:")
movies_df.show(5)

Movies Data:
+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [27]:
print("Ratings Data:")
ratings_df.show(5)

Ratings Data:
+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [28]:
ratings_df = ratings_df.dropna()
# Split data into training and test sets
(train_data, test_data) = ratings_df.randomSplit([0.8, 0.2], seed=42)

In [29]:
# Train ALS model
als = ALS(
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    coldStartStrategy="drop",  # Handle cold-start problem
    nonnegative=True,          # Ensure non-negative ratings
    implicitPrefs=False        # Explicit feedback (ratings)
)

In [30]:
model = als.fit(train_data)

In [31]:
test_data.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [32]:
# Evaluate the model
predictions = model.transform(test_data)
predictions = predictions.withColumn(
    "prediction",
    when(col("prediction") > 5, 5).when(col("prediction") < 1, 1).otherwise(col("prediction"))
)
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")

Root Mean Squared Error (RMSE): 0.8696335473187065


In [33]:
user_recs = model.recommendForAllUsers(5)
print("Top 5 movie recommendations for each user:")
user_recs.show(5, truncate=False)

Top 5 movie recommendations for each user:
+------+----------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                     |
+------+----------------------------------------------------------------------------------------------------+
|1     |[{25771, 5.853434}, {3266, 5.727755}, {58301, 5.726351}, {8477, 5.6169987}, {132333, 5.569378}]     |
|2     |[{6380, 4.9912367}, {131724, 4.8971786}, {184245, 4.861794}, {134796, 4.861794}, {117531, 4.861794}]|
|3     |[{6835, 4.9096556}, {5746, 4.9096556}, {5181, 4.8525653}, {4518, 4.704792}, {2851, 4.700511}]       |
|4     |[{306, 5.471559}, {25771, 5.2317896}, {8477, 5.0207634}, {1262, 4.988554}, {2300, 4.9771957}]       |
|5     |[{25771, 5.3435073}, {8477, 5.2734475}, {3266, 5.1759133}, {58301, 5.126786}, {71899, 4.982245}]    |
+------+---------------------------------------------------------------------

In [34]:
movie_recs = model.recommendForAllItems(5)
print("Top 5 user recommendations for each movie:")
movie_recs.show(5, truncate=False)

Top 5 user recommendations for each movie:
+-------+----------------------------------------------------------------------------------------+
|movieId|recommendations                                                                         |
+-------+----------------------------------------------------------------------------------------+
|1      |[{53, 5.1670017}, {276, 5.158022}, {393, 5.0979676}, {543, 5.0946074}, {43, 4.952079}]  |
|12     |[{276, 3.621855}, {96, 3.6182528}, {12, 3.5330048}, {119, 3.5316632}, {392, 3.5087423}] |
|13     |[{393, 4.2046227}, {43, 4.0824556}, {543, 3.7520075}, {267, 3.750775}, {20, 3.6963384}] |
|22     |[{53, 4.404162}, {276, 4.2605085}, {548, 4.2102847}, {452, 4.1991887}, {543, 4.1539626}]|
|26     |[{43, 4.7795115}, {53, 4.521044}, {171, 4.476831}, {595, 4.387001}, {337, 4.3810954}]   |
+-------+----------------------------------------------------------------------------------------+
only showing top 5 rows



In [35]:
# Recommend movies for a specific user
user_id = 1 
user_recs = model.recommendForUserSubset(spark.createDataFrame([(user_id,)]).toDF("userId"), 5)
print(f"Top 5 movie recommendations for user {user_id}:")
user_recs.show(truncate=False)

Top 5 movie recommendations for user 1:
+------+-----------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                |
+------+-----------------------------------------------------------------------------------------------+
|1     |[{25771, 5.853434}, {3266, 5.727755}, {58301, 5.726351}, {8477, 5.6169987}, {132333, 5.569378}]|
+------+-----------------------------------------------------------------------------------------------+



In [36]:
# Extract movieId and rating from the recommendations array
user_recs_expanded = user_recs \
    .select("userId", explode("recommendations").alias("rec")) \
    .select("userId", col("rec.movieId"), col("rec.rating").alias("predicted_rating"))

# Join with movies_df to get movie titles
recommendations_with_titles = user_recs_expanded \
    .join(movies_df, on="movieId", how="inner") \
    .select("userId", "movieId", "title", "predicted_rating")

# Show the recommendations with titles
print(f"Top 5 movie recommendations for user {user_id} with titles:")
recommendations_with_titles.show(5, truncate=False)

Top 5 movie recommendations for user 1 with titles:
+------+-------+-----------------------------------------------------+----------------+
|userId|movieId|title                                                |predicted_rating|
+------+-------+-----------------------------------------------------+----------------+
|1     |25771  |Andalusian Dog, An (Chien andalou, Un) (1929)        |5.853434        |
|1     |3266   |Man Bites Dog (C'est arrivé près de chez vous) (1992)|5.727755        |
|1     |58301  |Funny Games U.S. (2007)                              |5.726351        |
|1     |8477   |Jetée, La (1962)                                     |5.6169987       |
|1     |132333 |Seve (2014)                                          |5.569378        |
+------+-------+-----------------------------------------------------+----------------+



In [37]:
spark.stop()