A Movie Recommendation Service

Source:https://www.codementor.io/@jadianes/building-a-recommender-with-apache-spark-python-example-app-part1-du1083qbw

In [3]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

# Start Spark session
spark = SparkSession.builder.appName("Pumpkinmeter").getOrCreate()


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/29 18:58:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Load MovieLens Full Dataset (25M Ratings)

Load Ratings and Movies

In [18]:
ratings = spark.read.csv("/home/ubuntu/ratings.csv", header=True, inferSchema=True)
movies = spark.read.csv("/home/ubuntu/movies.csv", header=True, inferSchema=True)

ratings.show(10)
movies.show(10)




+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
+------+-------+------+---------+
only showing top 10 rows
+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)| 

My Own Ratings (User 1: Aashma)

In [9]:
from pyspark.sql import Row

new_user_id = 999999

# Sample ratings from Aashma
new_user_ratings = [
    Row(userId=new_user_id, movieId=1, rating=4.5),
    Row(userId=new_user_id, movieId=2, rating=4.0),
    Row(userId=new_user_id, movieId=3, rating=5.0),
    Row(userId=new_user_id, movieId=4, rating=3.5),
    Row(userId=new_user_id, movieId=5, rating=4.0),
    Row(userId=new_user_id, movieId=6, rating=5.0),
    Row(userId=new_user_id, movieId=7, rating=3.0),
    Row(userId=new_user_id, movieId=8, rating=4.0),
    Row(userId=new_user_id, movieId=9, rating=2.5),
    Row(userId=new_user_id, movieId=10, rating=4.5),
]

# Convert to DataFrame
new_user_ratings_df = spark.createDataFrame(new_user_ratings)

# Show the DataFrame
new_user_ratings_df.show()


[Stage 6:>                                                          (0 + 1) / 1]

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|999999|      1|   4.5|
|999999|      2|   4.0|
|999999|      3|   5.0|
|999999|      4|   3.5|
|999999|      5|   4.0|
|999999|      6|   5.0|
|999999|      7|   3.0|
|999999|      8|   4.0|
|999999|      9|   2.5|
|999999|     10|   4.5|
+------+-------+------+



                                                                                

Merge Custom Ratings with Full Dataset
Scenario 1

In [13]:
# Step 1: Remove the timestamp column from the original ratings
ratings_trimmed = ratings.select("userId", "movieId", "rating")

# Step 2: Combine the trimmed ratings with the new user's ratings
complete_ratings = ratings_trimmed.union(new_user_ratings_df)

# Step 3: Show a sample of the combined dataset
complete_ratings.show(5)


+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|      6|   4.0|
|     1|     47|   5.0|
|     1|     50|   5.0|
+------+-------+------+
only showing top 5 rows


Train the ALS Model

In [15]:
from pyspark.ml.recommendation import ALS

# Configure ALS model
als = ALS(
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    maxIter=10,
    regParam=0.1,
    rank=10,
    coldStartStrategy="drop",  # Drop unseen data during prediction
    nonnegative=True
)

# Train the model on the full dataset with my user included
als_model = als.fit(complete_ratings)


Top 15 movie Recommendation for Aashma

In [31]:
from pyspark.sql import Row

# Create a DataFrame with just Aashma's userId
aashma_user_df = spark.createDataFrame([Row(userId=999999)])

# Generate top 15 recommendations for Aashma
aashma_recommendations = als_model.recommendForUserSubset(ashma_user_df, 15)

# Show recommendations
aashma_recommendations.show(truncate=False)


+------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                                                                                                                                                                                                                  |
+------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|999999|[{3925, 6.025569}, {26171, 5.8630714}, {60943, 5.8064995}, {59018, 5.8064995}, {7842, 5.7942743

Join Recommendatios with Movie Tiitles

In [32]:
from pyspark.sql.functions import explode

# Explode the nested 'recommendations' column
exploded_recs = aashma_recommendations.selectExpr("userId", "explode(recommendations) as rec") \
                                     .selectExpr("userId", "rec.movieId as movieId", "rec.rating as predicted_rating")

# Join with movies to get titles
recommended_movies = exploded_recs.join(movies, on="movieId")

# Show final recommendations with titles
recommended_movies.select("movieId", "title", "predicted_rating").show(truncate=False)


+-------+-----------------------------------------------+----------------+
|movieId|title                                          |predicted_rating|
+-------+-----------------------------------------------+----------------+
|3925   |Stranger Than Paradise (1984)                  |6.025569        |
|26171  |Play Time (a.k.a. Playtime) (1967)             |5.8630714       |
|60943  |Frozen River (2008)                            |5.8064995       |
|59018  |Visitor, The (2007)                            |5.8064995       |
|7842   |Dune (2000)                                    |5.7942743       |
|33649  |Saving Face (2004)                             |5.5779705       |
|6086   |I, the Jury (1982)                             |5.560388        |
|3379   |On the Beach (1959)                            |5.5535126       |
|171495 |Cosmos                                         |5.54708         |
|156726 |Hush (2016)                                    |5.515937        |
|102217 |Bill Hicks: Reve

Scenario 2 — Filter Movies with ≥100 Ratings

In [20]:
from pyspark.sql.functions import count

# Count the number of ratings per movie
movie_rating_counts = ratings.groupBy("movieId").agg(count("rating").alias("rating_count"))

# Filter movies with at least 100 ratings
popular_movies = movie_rating_counts.filter("rating_count >= 100")

# Join with ratings to keep only those movies
ratings_100 = ratings.join(popular_movies, on="movieId").select("userId", "movieId", "rating")


Merge my ratings again

In [21]:
# Reuse Ashma's ratings and merge again
complete_ratings_100 = ratings_100.union(new_user_ratings_df)


Retrain ALS on this filtered dataset

In [22]:
# Train ALS on movies with ≥100 ratings
als_model_100 = als.fit(complete_ratings_100)


 Generate recommendations again for Aashma

In [23]:
# Reuse Ashma's user DataFrame
ashma_recommendations_100 = als_model_100.recommendForUserSubset(ashma_user_df, 15)

# Explode and map to titles
exploded_100 = ashma_recommendations_100.selectExpr("userId", "explode(recommendations) as rec") \
                                        .selectExpr("userId", "rec.movieId as movieId", "rec.rating as predicted_rating")

# Join with movies for titles
recommended_100 = exploded_100.join(movies, on="movieId")

# Show results
recommended_100.select("movieId", "title", "predicted_rating").show(truncate=False)


+-------+------------------------------------------------------------------------------+----------------+
|movieId|title                                                                         |predicted_rating|
+-------+------------------------------------------------------------------------------+----------------+
|3      |Grumpier Old Men (1995)                                                       |4.8226166       |
|2571   |Matrix, The (1999)                                                            |4.8085594       |
|318    |Shawshank Redemption, The (1994)                                              |4.8035283       |
|79132  |Inception (2010)                                                              |4.7923355       |
|1198   |Raiders of the Lost Ark (Indiana Jones and the Raiders of the Lost Ark) (1981)|4.651085        |
|48516  |Departed, The (2006)                                                          |4.6207833       |
|58559  |Dark Knight, The (2008)              

Add Ratings for User 2 (My Friend)

In [24]:
userId = 888888  # for my freind


1. Add 10 ratings for User 2

In [26]:
# Sample ratings for User 2
user2_ratings = [
    Row(userId=888888, movieId=1, rating=3.5),
    Row(userId=888888, movieId=2, rating=4.0),
    Row(userId=888888, movieId=5, rating=2.5),
    Row(userId=888888, movieId=6, rating=4.0),
    Row(userId=888888, movieId=10, rating=3.5),
    Row(userId=888888, movieId=11, rating=5.0),
    Row(userId=888888, movieId=15, rating=4.0),
    Row(userId=888888, movieId=17, rating=3.0),
    Row(userId=888888, movieId=20, rating=4.5),
    Row(userId=888888, movieId=22, rating=4.0)
]

user2_ratings_df = spark.createDataFrame(user2_ratings)


Merge User 2 Ratings with the Dataset

A. Scenario 1 — Full Dataset (No filter)

In [27]:
# Drop timestamp to match schema
ratings_trimmed_user2 = ratings.select("userId", "movieId", "rating")

# Combine with User 2's ratings
complete_ratings_user2 = ratings_trimmed_user2.union(user2_ratings_df)


B. Train ALS Model for User 2

In [28]:
# Train ALS model with full dataset + User 2
als_model_user2 = als.fit(complete_ratings_user2)


C. Get Top 15 Recommendations for User 2

In [29]:
# Create DataFrame for User 2
user2_df = spark.createDataFrame([Row(userId=888888)])

# Generate recommendations
recommendations_user2 = als_model_user2.recommendForUserSubset(user2_df, 15)

# Map to titles
recs_user2 = recommendations_user2.selectExpr("userId", "explode(recommendations) as rec") \
                                  .selectExpr("userId", "rec.movieId as movieId", "rec.rating as predicted_rating")

recs_user2_with_titles = recs_user2.join(movies, on="movieId")

# Show result
recs_user2_with_titles.select("movieId", "title", "predicted_rating").show(truncate=False)


+-------+------------------------------------------------------------------+----------------+
|movieId|title                                                             |predicted_rating|
+-------+------------------------------------------------------------------+----------------+
|32892  |Ivan's Childhood (a.k.a. My Name is Ivan) (Ivanovo detstvo) (1962)|5.357841        |
|159858 |The Conjuring 2 (2016)                                            |5.2922397       |
|7121   |Adam's Rib (1949)                                                 |5.282956        |
|42730  |Glory Road (2006)                                                 |5.1748304       |
|93008  |Very Potter Sequel, A (2010)                                      |5.1359167       |
|77846  |12 Angry Men (1997)                                               |5.1359167       |
|25906  |Mr. Skeffington (1944)                                            |5.1359167       |
|5867   |Thief (1981)                                       

Scenario 2 — Filter Movies with ≥100 Ratings

In [30]:
# Reuse earlier filtered data: ratings_100
complete_ratings_100_user2 = ratings_100.union(user2_ratings_df)

# Train ALS again
als_model_100_user2 = als.fit(complete_ratings_100_user2)

# Get recommendations
recommendations_user2_100 = als_model_100_user2.recommendForUserSubset(user2_df, 15)

# Map to titles
recs_user2_100 = recommendations_user2_100.selectExpr("userId", "explode(recommendations) as rec") \
                                          .selectExpr("userId", "rec.movieId as movieId", "rec.rating as predicted_rating") \
                                          .join(movies, on="movieId")

# Show recommendations for Scenario 2
recs_user2_100.select("movieId", "title", "predicted_rating").show(truncate=False)


+-------+--------------------------------+----------------+
|movieId|title                           |predicted_rating|
+-------+--------------------------------+----------------+
|11     |American President, The (1995)  |4.7985916       |
|20     |Money Train (1995)              |4.3187323       |
|79132  |Inception (2010)                |4.1376877       |
|2028   |Saving Private Ryan (1998)      |4.099926        |
|356    |Forrest Gump (1994)             |4.093837        |
|318    |Shawshank Redemption, The (1994)|4.07812         |
|3147   |Green Mile, The (1999)          |4.036998        |
|110    |Braveheart (1995)               |4.021549        |
|912    |Casablanca (1942)               |4.0184          |
|2571   |Matrix, The (1999)              |3.9839067       |
|4995   |Beautiful Mind, A (2001)        |3.976591        |
|527    |Schindler's List (1993)         |3.9654548       |
|3578   |Gladiator (2000)                |3.9590144       |
|1270   |Back to the Future (1985)      

Insights and Foresights: Scenario Comparison

Scenario 1: Full Dataset (No Rating Threshold)

In this scenario, the recommendation model was trained on all available movies, including those with very few ratings. As a result, the system had access to a wider range of movie options, including niche or less mainstream titles. For both users (Aashma(me) and My friend), the recommendations were often diverse, with some lesser-known or unique titles being suggested. This scenario shows strong personalization potential, particularly when users have rated a mix of mainstream and less common movies. However, because low-rated or rarely rated movies were included, there is a higher chance of recommending less reliable titles, possibly due to limited collaborative signals.

Scenario 2: Filtered Dataset (Movies with ≥100 Ratings)

In the second scenario, I filtered out all movies with fewer than 100 ratings before training the ALS model. This approach removed less popular or obscure titles and focused only on movies with strong viewer engagement. As expected, the recommendations leaned heavily toward popular, mainstream titles that have been consistently rated by many users. While this ensured a higher degree of recommendation reliability, it somewhat reduced the diversity and personalization of the suggestions. The model was more cautious and less exploratory, offering safe and broadly liked options rather than niche content.



Final Foresights and Business Implications

For a movie recommendation startup like Ripe Pumpkins, it may be best to:Use Scenario 1 (full dataset) for long-term users with well-established profiles and a history of diverse tastes and Use Scenario 2 (filtered dataset) for new users with limited ratings to ensure trust in recommendations.
We can consider a hybrid model that starts with popular content (Scenario 2) and gradually incorporates more niche titles (Scenario 1) as user preference data grows.Leverage Pumpkinmeter scores to highlight how "mainstream" or "personalized" a recommendation is, improving transparency and user trust.