In [1]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import json
import os
import time


In [2]:

spark = SparkSession.builder \
    .appName("MovieRecommendation") \
    .master("local[*]") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

In [3]:
df = spark.read.csv("ratings.csv", header=True, inferSchema=True)

In [4]:
(training, test) = df.randomSplit([0.8, 0.2], seed=42)

In [5]:
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop", nonnegative=True)

In [6]:
# Currently using a fixed set of hyperparameters but multiple values can be set for each parameter ideally!
param_grid = ParamGridBuilder() \
    .addGrid(als.rank, [11]) \
    .addGrid(als.maxIter, [11]) \
    .addGrid(als.regParam, [0.17]) \
    .build()

In [7]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

In [8]:
tvs = TrainValidationSplit(estimator=als, 
                           estimatorParamMaps=param_grid,
                           evaluator=evaluator
                           )

In [9]:
try:
    # Fit the model
    model = tvs.fit(training)
    
    # Get the best model
    best_model = model.bestModel
    print(f"Best model rank: {best_model._java_obj.parent().getRank()}")
    print(f"Best model maxIter: {best_model._java_obj.parent().getMaxIter()}")
    print(f"Best model regParam: {best_model._java_obj.parent().getRegParam()}")
    
    # Make predictions
    predictions = best_model.transform(test)
    
    # Evaluate the model
    rmse = evaluator.evaluate(predictions)
    print(f"Root-mean-square error = {rmse}")
except Exception as e:
    print(f"Error during model training or evaluation: {e}")


Best model rank: 11
Best model maxIter: 11
Best model regParam: 0.17
Root-mean-square error = 0.8711006005664664


In [10]:
def calculate_precision_at_k(predictions_df, k=10, rating_threshold=4.0):
    """
    Calculate precision@k using Spark operations
    """
    # Window spec for top-k recommendations per user
    window_spec = Window.partitionBy("userId").orderBy(F.desc("prediction"))
    
    # Add rank column to get top k predictions per user
    ranked_predictions = predictions_df.withColumn("rank", F.rank().over(window_spec))
    
    # Filter to only top k predictions and relevant ratings
    top_k_predictions = ranked_predictions.filter(F.col("rank") <= k)
    
    # Mark if a recommendation was relevant (actual rating >= threshold)
    marked_predictions = top_k_predictions.withColumn(
        "is_relevant", 
        F.when(F.col("rating") >= rating_threshold, 1).otherwise(0)
    )
    
    # Calculate precision for each user
    user_precision = marked_predictions.groupBy("userId").agg(
        F.sum("is_relevant").alias("relevant_count"),
        F.count("*").alias("total_count")
    ).withColumn("precision", F.col("relevant_count") / F.col("total_count"))
    
    # Calculate average precision across all users
    avg_precision = user_precision.select(F.avg("precision")).collect()[0][0]
    
    return avg_precision

# Calculate and print precision@10
precision_10 = calculate_precision_at_k(predictions, k=10, rating_threshold=4.0)
print(f"Precision@10 = {precision_10:.4f}")

Precision@10 = 0.6540


In [11]:
# Create models directory first
if not os.path.exists('models'):
    os.makedirs('models')

# Save metrics
metrics = {
    "rmse": rmse,
    "precision_at_10": precision_10,
    "best_rank": best_model._java_obj.parent().getRank(),
    "best_maxIter": best_model._java_obj.parent().getMaxIter(),
    "best_regParam": best_model._java_obj.parent().getRegParam(),
    "training_date": time.strftime("%Y-%m-%d")
}

with open("models/model_metrics.json", "w") as f:
    json.dump(metrics, f)


In [12]:
# Load the movies data
movies_df = spark.read.csv("movies.csv", header=True, inferSchema=True)

# Function to display highly rated movies and top recommendations for a specific user
def show_user_recommendations(user_id, ratings_df, movies_df, model, k=10, high_rating_threshold=4.0):

    # Get movies the user has already rated
    user_ratings = ratings_df.filter(f"userId = {user_id}")
    
    # Get highly rated movies by this user
    high_rated = user_ratings.filter(f"rating >= {high_rating_threshold}")
    
    # Join with movies_df to get movie titles
    high_rated_with_info = high_rated.join(
        movies_df, 
        on="movieId", 
        how="inner"
    ).select("movieId", "title", "genres", "rating").orderBy("rating", ascending=False)
    
    # Display the user's highly rated movies
    print(f"\n===== User {user_id}'s Highly Rated Movies (Rating >= {high_rating_threshold}) =====")
    high_rated_with_info.show(10, truncate=False)
    
    # Get all movies not rated by this user for recommendations
    user_unrated_movies = movies_df.join(
        user_ratings, 
        on="movieId", 
        how="left_anti" 
    )
    
    # Convert to the format expected by the model
    unrated_movies_df = user_unrated_movies.select("movieId").withColumn("userId", F.lit(user_id))
    
    # Get recommendations
    recommendations = model.transform(unrated_movies_df)
    
    # Join with movie info and show top k recommendations
    top_recommendations = recommendations.join(
        movies_df, 
        on="movieId", 
        how="inner"
    ).select("movieId", "title", "genres", "prediction").orderBy("prediction", ascending=False).limit(k)
    
    # Display top recommendations
    print(f"\n===== Top {k} Recommended Movies for User {user_id} =====")
    top_recommendations.show(k, truncate=False)
    
    
    print("\n===== Genre Analysis =====")
    
    # For liked movies
    print("Genres in highly-rated movies:")
    high_rated_with_info.withColumn("genre", F.explode(F.split("genres", "\\|"))) \
        .groupBy("genre") \
        .count() \
        .orderBy("count", ascending=False) \
        .show(truncate=False)
    
    # For recommended movies
    print("Genres in recommended movies:")
    top_recommendations.withColumn("genre", F.explode(F.split("genres", "\\|"))) \
        .groupBy("genre") \
        .count() \
        .orderBy("count", ascending=False) \
        .show(truncate=False)
    
    return high_rated_with_info, top_recommendations

user_counts = df.groupBy("userId").count().orderBy("count", ascending=False)
sample_user_id = user_counts.first()["userId"]

print(f"Selected user {sample_user_id} for recommendation analysis (user with most ratings)")

# Show the comparison for this user
high_rated_movies, recommended_movies = show_user_recommendations(
    user_id=sample_user_id, 
    ratings_df=df, 
    movies_df=movies_df, 
    model=best_model,
    k=10, 
    high_rating_threshold=4.0
)



Selected user 414 for recommendation analysis (user with most ratings)

===== User 414's Highly Rated Movies (Rating >= 4.0) =====
+-------+-----------------------------------------+---------------------------+------+
|movieId|title                                    |genres                     |rating|
+-------+-----------------------------------------+---------------------------+------+
|94     |Beautiful Girls (1996)                   |Comedy|Drama|Romance       |5.0   |
|318    |Shawshank Redemption, The (1994)         |Crime|Drama                |5.0   |
|110    |Braveheart (1995)                        |Action|Drama|War           |5.0   |
|223    |Clerks (1994)                            |Comedy                     |5.0   |
|296    |Pulp Fiction (1994)                      |Comedy|Crime|Drama|Thriller|5.0   |
|260    |Star Wars: Episode IV - A New Hope (1977)|Action|Adventure|Sci-Fi    |5.0   |
|34     |Babe (1995)                              |Children|Drama             |5.0   |