# **User-Based Collaborative Filtering Recommendation System using Apache Spark**

# **INITIALIZATION**

In [2]:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, sqrt, sum as _sum, count as _count, avg, abs as _abs
from pyspark.sql.functions import countDistinct, broadcast

spark = SparkSession.builder \
    .appName("UserBasedCF") \
    .config("spark.driver.memory","4g") \
    .getOrCreate()
# spark = SparkSession.builder \
    # .appName("User-Based Collaborative Filtering") \
    # .getOrCreate()

print("=" * 70)
print("USER-BASED COLLABORATIVE FILTERING RECOMMENDATION SYSTEM")
print("=" * 70)
print(f"✓ Spark Session Initialized (Version {spark.version})")

USER-BASED COLLABORATIVE FILTERING RECOMMENDATION SYSTEM
✓ Spark Session Initialized (Version 3.5.1)


In [3]:
!git clone -q https://github.com/sankalpjain99/Movie-recommendation-system.git

# **STEP 1: DATA PREPARATION**

In [5]:
# ============================================================================
# STEP 1: DATA PREPARATION
# ============================================================================


print("\n" + "=" * 70)
print("STEP 1: DATA PREPARATION")
print("=" * 70)

ratings_df = spark.read.csv("Movie-recommendation-system/ratings.csv", header=True, inferSchema=True)
movies_df = spark.read.csv("Movie-recommendation-system/movies.csv", header=True, inferSchema=True)

print("\n1.1 Dataset Schema:")
print("root")
for field in ratings_df.schema.fields:
    print(f" |-- {field.name}: {field.dataType} (nullable = {field.nullable})")

print("\n1.2 Sample Data (first 10 rows):")
ratings_df.show(10)

# --- Data Cleaning ---
initial_count = ratings_df.count()
ratings_clean = ratings_df.na.drop(subset=["userId", "movieId", "rating"]) \
                            .dropDuplicates(["userId", "movieId"]) \
                            .filter((col("rating") >= 0.5) & (col("rating") <= 5.0))
ratings_clean.cache()
final_count = ratings_clean.count()

n_users = ratings_clean.select("userId").distinct().count()
n_movies = ratings_clean.select("movieId").distinct().count()
n_ratings = final_count
sparsity = (1.0 - (n_ratings / (n_users * n_movies))) * 100

print("\n1.3 Basic Statistics (Cleaned Data):")
print(f"   • Total Ratings: {n_ratings:,}")
print(f"   • Total Users: {n_users:,}")
print(f"   • Total Movies: {n_movies:,}")
print(f"   • Sparsity: {sparsity:.2f}%")

print("\n1.4 Rating Distribution:")
ratings_clean.groupBy("rating").count().orderBy("rating").show()

print("\n1.5 User Activity Statistics:")
user_activity_df = ratings_clean.groupBy("userId").agg(
    _count("rating").alias("num_ratings"),
    avg("rating").alias("avg_rating")
)
user_activity_df.cache()

user_activity_df.select(col("userId").cast("double"), "num_ratings", "avg_rating").describe().show()

print("\n1.6 Creating User-Item Rating Matrix...")
# Efficiently get user_mean_df from the already computed activity
user_mean_df = user_activity_df.select("userId", col("avg_rating").alias("mean_rating"))
user_mean_df.cache()
print(f"   ✓ Average ratings computed for {user_mean_df.count()} users")

movies_count = movies_df.count()
print(f"\n✓ Loaded metadata for {movies_count:,} movies")


STEP 1: DATA PREPARATION

1.1 Dataset Schema:
root
 |-- userId: IntegerType() (nullable = True)
 |-- movieId: IntegerType() (nullable = True)
 |-- rating: DoubleType() (nullable = True)
 |-- timestamp: IntegerType() (nullable = True)

1.2 Sample Data (first 10 rows):
+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
+------+-------+------+---------+
only showing top 10 rows


1.3 Basic Statistics (Cleaned Data):
   • Total Ratings: 100,836
   • Total Users: 610
   • Total Movies: 9,724
   • Sparsity: 98.30%

1.4 Rating Distribution:
+------+-----+
|rating|count|
+------+-----+
|   0.5| 1370|
|   1.0|

# **STEP 2: SIMILARITY COMPUTATION**

In [6]:

print("\n" + "=" * 70)
print("STEP 2: SIMILARITY COMPUTATION")
print("=" * 70)

# --- Configuration ---
TARGET_USER_ID = 40
MIN_COMMON_USERS = 3

r1 = ratings_clean.alias("r1")
r2 = ratings_clean.alias("r2")

pairs = r1.join(r2, (col("r1.movieId") == col("r2.movieId")) & (col("r1.userId") < col("r2.userId"))) \
          .select(
              col("r1.userId").alias("userA"),
              col("r2.userId").alias("userB"),
              col("r1.movieId").alias("movieId"),
              col("r1.rating").alias("ratingA"),
              col("r2.rating").alias("ratingB")
          )

pairs_with_mean = pairs.join(broadcast(user_mean_df.withColumnRenamed("userId","userA").withColumnRenamed("mean_rating","meanA")), "userA") \
                       .join(broadcast(user_mean_df.withColumnRenamed("userId","userB").withColumnRenamed("mean_rating","meanB")), "userB")

centered = pairs_with_mean.withColumn("devA", col("ratingA") - col("meanA")) \
                          .withColumn("devB", col("ratingB") - col("meanB")) \
                          .withColumn("devAB", col("devA") * col("devB")) \
                          .withColumn("devA2", col("devA") * col("devA")) \
                          .withColumn("devB2", col("devB") * col("devB"))

epsilon = 1e-6
similarity_calc_df = centered.groupBy("userA","userB") \
    .agg(
        _sum("devAB").alias("num"),
        _sum("devA2").alias("denA"),
        _sum("devB2").alias("denB"),
        countDistinct("movieId").alias("n_common")
    ) \
    .withColumn("similarity", col("num") / ( sqrt(col("denA")) * sqrt(col("denB")) + epsilon ) )

similarity_calc_df = similarity_calc_df.na.drop(subset=["similarity"])

sim_uv = similarity_calc_df.select("userA","userB","similarity","n_common")
sim_vu = sim_uv.select(col("userB").alias("userA"), col("userA").alias("userB"), col("similarity"), col("n_common"))
sim_all = sim_uv.union(sim_vu).cache()

1
print("\n" + "=" * 30)
print("User-User Similarity Matrix (Sample)")
print("=" * 30)
sim_all.filter(col("n_common") >= 2).orderBy(col("similarity").desc()).show(20)

# --- Target User Similarity ---
print(f"\n2.1 Computing PEARSON similarity for user {TARGET_USER_ID}...")
print(f"    Minimum common items threshold: {MIN_COMMON_USERS}")


sim_with_target = sim_all.filter((col("userA") == TARGET_USER_ID) & (col("n_common") >= MIN_COMMON_USERS)) \
                         .select(col("userB").alias("userId"), col("similarity").alias("sim"), col("n_common"))

sim_with_target.cache()
sim_count = sim_with_target.count()
print(f"   ✓ Computed similarities with {sim_count} users")

print("\n2.2 Top 10 Most Similar Users:")
print("   User ID    Similarity Score     Common Items   ")
print("   " + "-" * 45)

top_10_similar = sim_with_target.orderBy(col("sim").desc()).limit(10).collect()
for row in top_10_similar:
    print(f"   {row['userId']:<11}{row['sim']:<21.4f}{row['n_common']:<14}")


STEP 2: SIMILARITY COMPUTATION

User-User Similarity Matrix (Sample)
+-----+-----+------------------+--------+
|userA|userB|        similarity|n_common|
+-----+-----+------------------+--------+
|  416|  598| 0.999999930715492|       2|
|  598|  416| 0.999999930715492|       2|
|  283|   22|0.9999998778664158|       2|
|   22|  283|0.9999998778664158|       2|
|  591|   77|0.9999998548387308|       4|
|   77|  591|0.9999998548387308|       4|
|  512|  331|0.9999998506454474|       4|
|  331|  512|0.9999998506454474|       4|
|  265|   77| 0.999999848513034|       4|
|   77|  265| 0.999999848513034|       4|
|  588|  295|0.9999998483587841|       3|
|  295|  588|0.9999998483587841|       3|
|  476|   22|0.9999998438914272|       2|
|   22|  476|0.9999998438914272|       2|
|  528|   13|0.9999998389209456|       3|
|   13|  528|0.9999998389209456|       3|
|  207|   82|0.9999998366923623|       2|
|   82|  207|0.9999998366923623|       2|
|  373|  278|0.9999998363004154|       3|
|  278

# **STEP 3 & 4: RATING PREDICTION AND RECOMMENDATION GENERATION**

In [7]:
# ============================================================================
# STEP 3 & 4: RATING PREDICTION AND RECOMMENDATION GENERATION
# ============================================================================


print("\n" + "=" * 70)
print("STEP 3 & 4: RATING PREDICTION AND RECOMMENDATION GENERATION")
print("=" * 70)

# --- Configuration ---
NUM_RECOMMENDATIONS = 5
MIN_CONTRIBUTORS = 20
MIN_RATING = 0.5
MAX_RATING = 5.0
# ---------------------

print(f"\n3.1 Generating recommendations for User {TARGET_USER_ID}...")
print(f"    Using formula: p(u,i) = r̄_u + [Σ sim(u,v)×(r_v,i - r̄_v)] / [Σ |sim(u,v)|]")

# 1. Get Target User Info
user_rated = ratings_clean.filter(col("userId") == TARGET_USER_ID).select("movieId")
rated_ids = [row["movieId"] for row in user_rated.collect()]
target_mean = user_mean_df.filter(col("userId") == TARGET_USER_ID).collect()[0]["mean_rating"]
unseen_movies_count = n_movies - len(rated_ids)

print(f"    • User has rated {len(rated_ids)} movies")
print(f"    • Found {unseen_movies_count} unseen movies to predict")

# 2. Find recommendation candidates
unseen_ratings = ratings_clean.filter(~col("movieId").isin(rated_ids))
contrib_mean = user_mean_df.withColumnRenamed("userId","userId_c").withColumnRenamed("mean_rating","mean_rating_c")

# This code now works because sim_with_target has the 'sim' column
candidates = unseen_ratings.join(broadcast(sim_with_target), on="userId", how="inner") \
                           .join(broadcast(contrib_mean), unseen_ratings.userId == contrib_mean.userId_c) \
                           .select("userId","movieId","rating","sim","mean_rating_c")
candidates.cache() # Cache for use in the next section

# 3. Compute predicted ratings
pred_df = candidates.withColumn("dev", col("rating") - col("mean_rating_c")) \
    .withColumn("sim_dev", col("sim") * col("dev")) \
    .groupBy("movieId") \
    .agg(
        _sum("sim_dev").alias("num"),                 # Numerator for breakdown
        _sum(_abs(col("sim"))).alias("den"),          # Denominator for breakdown
        countDistinct("userId").alias("num_contributors"),
        avg("sim").alias("avg_similarity")
    ) \
    .filter((col("den") > 0) & (col("num_contributors") >= MIN_CONTRIBUTORS)) \
    .withColumn("adjustment", col("num") / col("den")) \
    .withColumn("raw_pred", target_mean + col("adjustment")) \
    .withColumn("pred_rating", # This is the clipping fix
        F.when(col("raw_pred") > MAX_RATING, MAX_RATING)
         .when(col("raw_pred") < MIN_RATING, MIN_RATING)
         .otherwise(col("raw_pred"))
    )

# 4. Join movie titles for readability
final_recommendations_df = pred_df.join(movies_df.select("movieId","title","genres"), on="movieId") \
                                  .orderBy(col("pred_rating").desc())
final_recommendations_df.cache()

# Screenshot Output 2
print("\n" + "=" * 30)
print("Predicted Movie Ratings (Sample)")
print("=" * 30)
final_recommendations_df.select("title", "genres", "pred_rating", "num_contributors").show(20, truncate=False)


STEP 3 & 4: RATING PREDICTION AND RECOMMENDATION GENERATION

3.1 Generating recommendations for User 40...
    Using formula: p(u,i) = r̄_u + [Σ sim(u,v)×(r_v,i - r̄_v)] / [Σ |sim(u,v)|]
    • User has rated 103 movies
    • Found 9621 unseen movies to predict

Predicted Movie Ratings (Sample)
+---------------------------------------------------------+--------------------------------------------------+------------------+----------------+
|title                                                    |genres                                            |pred_rating       |num_contributors|
+---------------------------------------------------------+--------------------------------------------------+------------------+----------------+
|Wallace & Gromit: The Best of Aardman Animation (1996)   |Adventure|Animation|Comedy                        |4.7059631383964415|24              |
|Lawrence of Arabia (1962)                                |Adventure|Drama|War                               |4.6968

# **STEP 5: OUTPUT - TOP RECOMMENDATIONS**

In [8]:
# ============================================================================
# STEP 5: OUTPUT - TOP RECOMMENDATIONS
# ============================================================================



print("\n" + "=" * 70)
print("STEP 5: OUTPUT - TOP RECOMMENDATIONS")
print("=" * 70)

print("\nTarget User Statistics:")
print(f"   • User ID: {TARGET_USER_ID}")
print(f"   • Average Rating: {target_mean:.2f}")
print(f"   • Number of Rated Movies: {len(rated_ids)}")

print("\n" + "=" * 70)
print(f"TOP {NUM_RECOMMENDATIONS} RECOMMENDED MOVIES FOR USER {TARGET_USER_ID}")
print("=" * 70)

# Collect the (small) list of top recommendations to the driver
top_recs_list = final_recommendations_df.limit(NUM_RECOMMENDATIONS).collect()

# Loop through the Python list and print in the detailed format
for idx, rec_row in enumerate(top_recs_list, 1):
    movie_id = rec_row['movieId']

    print(f"\nRank #{idx}")
    print("─" * 70)
    print(f"Movie: {rec_row['title']}")
    print(f"Movie ID: {movie_id}")
    print(f"Predicted Rating: {rec_row['pred_rating']:.4f} ⭐")
    print(f"Number of Contributors: {rec_row['num_contributors']}")

    # Get top 3 contributors
    top_contributors = candidates.filter(col("movieId") == movie_id) \
                                 .orderBy(_abs(col("sim")).desc()) \
                                 .limit(3) \
                                 .collect()

    print("\nTop Contributing Similar Users:")
    print("   User ID    Similarity      Their Rating    Their Avg      ")
    print("   " + "─" * 58)
    for user_row in top_contributors:
        print(f"   {user_row['userId']:<11}{user_row['sim']:<16.4f}{user_row['rating']:<16.1f}{user_row['mean_rating_c']:.2f}")

    print("\nPrediction Calculation Breakdown:")
    base = target_mean
    numerator = rec_row['num']
    denominator = rec_row['den']
    adjustment = rec_row['adjustment']
    final_pred = base + adjustment

    print(f"   Base (r̄_u): {base:.4f}")
    print(f"   Numerator (Σ sim×(r_v,i - r̄_v)): {numerator:.4f}")
    print(f"   Denominator (Σ |sim|): {denominator:.4f}")
    print(f"   Adjustment: {adjustment:.4f}")
    print(f"   Final Prediction: {base:.4f} + {adjustment:.4f} = {final_pred:.4f}")
    if final_pred != rec_row['pred_rating']:
        print(f"   (Clipped to: {rec_row['pred_rating']:.4f})")

print("\n" + "=" * 70)
print("RECOMMENDATION SYSTEM COMPLETED SUCCESSFULLY")
print("=" * 70)


STEP 5: OUTPUT - TOP RECOMMENDATIONS

Target User Statistics:
   • User ID: 40
   • Average Rating: 3.77
   • Number of Rated Movies: 103

TOP 5 RECOMMENDED MOVIES FOR USER 40

Rank #1
──────────────────────────────────────────────────────────────────────
Movie: Wallace & Gromit: The Best of Aardman Animation (1996)
Movie ID: 720
Predicted Rating: 4.7060 ⭐
Number of Contributors: 24

Top Contributing Similar Users:
   User ID    Similarity      Their Rating    Their Avg      
   ──────────────────────────────────────────────────────────
   260        0.9676          5.0             3.76
   191        0.6369          5.0             3.74
   182        0.5946          4.5             3.51

Prediction Calculation Breakdown:
   Base (r̄_u): 3.7670
   Numerator (Σ sim×(r_v,i - r̄_v)): 6.8518
   Denominator (Σ |sim|): 7.2972
   Adjustment: 0.9390
   Final Prediction: 3.7670 + 0.9390 = 4.7060

Rank #2
──────────────────────────────────────────────────────────────────────
Movie: Lawrence of A