# Hybrid Recommendation System using MovieLens 1M Dataset with Spark ALS, Spark TF-IDF, FAISS (HNSW) and Cold-Start Handling


Load packages

In [15]:
#
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import avg, col, lit, expr, row_number, collect_list, size, array_contains, monotonically_increasing_id, udf, explode, min as spark_min, max as spark_max,countDistinct, when, isnan
from pyspark.sql.window import Window
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.sql.types import FloatType, DoubleType
from pyspark.ml.linalg import SparseVector, Vectors
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator
from sklearn.preprocessing import normalize
import faiss
from collections import defaultdict

## Read data and initiate Spark session

In [4]:
# Initialize Spark
spark = SparkSession.builder.appName("HybridRecommender").getOrCreate()

# Load Ratings and Movies Data into Pandas, then convert to Spark DataFrames
ratings_pd = pd.read_csv('./sample_data/ratings.dat', sep='::', names=['user_id', 'movie_id', 'rating', 'timestamp'], engine='python')
movies_pd = pd.read_csv('./sample_data/movies.dat', sep='::', names=['movie_id', 'title', 'genres'], engine='python',encoding='latin-1')

# Remove special chars, else words will be broken into sub-words
movies_pd['genres'] = movies_pd['genres'].str.replace('Sci-Fi','SciFi')
movies_pd['genres'] = movies_pd['genres'].str.replace('Film-Noir','Noir')

ratings = spark.createDataFrame(ratings_pd[['user_id', 'movie_id', 'rating']])
movies = spark.createDataFrame(movies_pd)

# Split ratings into train and test
train_ratings, test_ratings = ratings.randomSplit([0.8,0.2])

## Collaborative Filtering RS

Cross Validation on ALS using Grid Search

In [5]:
als = ALS(userCol='user_id',itemCol='movie_id',ratingCol='rating',coldStartStrategy="drop",implicitPrefs=False,
          seed=32,maxIter=20)


# Define parameters to search on
paramGrid = ParamGridBuilder() \
    .addGrid(als.regParam, [1, 0.1, 0.01,0.05]) \
    .addGrid(als.rank, [10, 20,30]) \
    .build()


In [6]:
# Create CV
crossval = CrossValidator(estimator=als,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="mae"),
                          numFolds=3,parallelism=4)

In [7]:
# Fit CV on training set of Ratings
cv_model = crossval.fit(train_ratings)

In [8]:
# Extract best performing parameters
best_rank = cv_model.bestModel._java_obj.parent().getRank()
best_regParam = cv_model.bestModel._java_obj.parent().getRegParam()
best_model_params = {'rank': best_rank, 'regParam': best_regParam}

In [9]:
print(f"Best rank: {best_rank}")
print(f"Best regParam: {best_regParam}")

Best rank: 10
Best regParam: 0.05


In [10]:
cv_model.avgMetrics #avg MAE across all model params combo

[np.float64(1.1645400338999485),
 np.float64(1.1645400336655698),
 np.float64(1.164540033476987),
 np.float64(0.6932183878525519),
 np.float64(0.6941108844502475),
 np.float64(0.6949633033091386),
 np.float64(0.7267621518424024),
 np.float64(0.7851407508974715),
 np.float64(0.8226668496567205),
 np.float64(0.690404391999493),
 np.float64(0.7011506360011271),
 np.float64(0.7068420881571496)]

In [20]:
# train best ALS
als = ALS(userCol='user_id',itemCol='movie_id',ratingCol='rating', \
          rank=best_model_params.get('rank'), \
          regParam=best_model_params.get('regParam'), \
          coldStartStrategy="drop",
          implicitPrefs=False)

model_cf = als.fit(train_ratings)

Calculate 100 recommendations for all users at one go

In [21]:
user_recs = model_cf.recommendForAllUsers(100)
user_recs = user_recs.selectExpr("user_id", "explode(recommendations) as rec")
user_recs = user_recs.selectExpr("user_id", "rec.movie_id as movie_id", "rec.rating as cf_score")


In [22]:
user_recs.groupBy("user_id").count().show(5)

+-------+-----+
|user_id|count|
+-------+-----+
|    148|  100|
|    463|  100|
|    471|  100|
|    496|  100|
|    833|  100|
+-------+-----+
only showing top 5 rows



## Content Based Filtering (CBF) RS

Using Spark ML TF-IDF vectorization technique on movie **Genre** and storing it into FAISS HNSW vector store

In [56]:
# Spark TF-IDF on Genres
movies = movies.withColumn("genres", expr("translate(genres, '|', ' ')"))
tokenizer = Tokenizer(inputCol="genres", outputCol="words")
words_data = tokenizer.transform(movies)

hashing_tf = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurized_data = hashing_tf.transform(words_data)

idf = IDF(inputCol="rawFeatures", outputCol="features")
idf_model = idf.fit(featurized_data)
rescaled_data = idf_model.transform(featurized_data)

# FAISS Index for scalable content-based retrieval
# Convert Spark TF-IDF features to NumPy

movie_features_pd = rescaled_data.select("movie_id", "features").rdd.map(lambda row: (row[0], row[1].toArray())).collect()
movie_ids, features = zip(*movie_features_pd)
features = np.vstack(features).astype('float32')

#Normalize features for cosine similarity
features_norm = normalize(features, axis=1, norm='l2').astype('float32')

# Create FAISS HNSW index using dot product (cosine similarity)
dimension = features_norm.shape[1]
index = faiss.IndexHNSWFlat(dimension, 32,faiss.METRIC_INNER_PRODUCT)  # 32 = #neighbors in HNSW
index.hnsw.efConstruction = 40
index.add(features_norm)

# Search top-k similar movies for each item
k_similar = 100
# Recommend similar movies using FAISS for new movies
faiss_neighbors = {}
# Compute average cosine similarity per movie (excluding self)
cbf_score_map = defaultdict(float)
for i, vec in enumerate(features_norm):
    D, I = index.search(np.array([vec]).astype('float32'), k_similar)
    faiss_neighbors[movie_ids[i]] = [movie_ids[j] for j in I[0] if movie_ids[j] != movie_ids[i]]
    sim_sum = 0.0
    count = 0
    for j, dist in zip(I[0], D[0]):
        if movie_ids[i] != movie_ids[j]:
            sim_sum += dist
            count += 1
    #avg cosine-similiarities for each movie
    if count > 0:
        cbf_score_map[movie_ids[i]] = sim_sum / count
    else:
      cbf_score_map[movie_ids[i]]=0.0

# Convert to Spark DataFrame
cbf_scores_pd = pd.DataFrame(list(cbf_score_map.items()), columns=["movie_id", "cbf_score"])
cbf_scores_sdf = spark.createDataFrame(cbf_scores_pd)


  sim_sum += dist


In [57]:
cbf_scores_pd['cbf_score'].describe()

Unnamed: 0,cbf_score
count,3883.0
mean,-inf
std,
min,-inf
25%,0.954243
50%,1.0
75%,1.0
max,1.0


In [59]:
# add guard-rails so score can Infinities/nulls can be converted to 0
cbf_scores_sdf = cbf_scores_sdf.withColumn(
    "cbf_score", when(col("cbf_score").isNull() |
                      isnan(col("cbf_score")) |
                      (col("cbf_score") == float("inf")) |
                      (col("cbf_score") == float("-inf")) |
                      (col("cbf_score") < float(-5e5)), 0.0).otherwise(col("cbf_score")))


In [60]:
# check Min and Max Cosine scores - should be between 0-1
cbf_scores_sdf.select(spark_min("cbf_score"),spark_max("cbf_score")).show(truncate=False)

+--------------+--------------+
|min(cbf_score)|max(cbf_score)|
+--------------+--------------+
|0.0           |1.0           |
+--------------+--------------+



### Check for similiar movies by picking any random movie

In [61]:
movie_to_chk=3091
movies_pd[movies_pd['movie_id']==movie_to_chk]

Unnamed: 0,movie_id,title,genres
3022,3091,Kagemusha (1980),Drama|War


In [62]:
movies_pd[movies_pd['movie_id'].isin(faiss_neighbors.get(movie_to_chk))]
#genres are very similar to chosen movie

Unnamed: 0,movie_id,title,genres
33,34,Babe (1995),Children's|Comedy|Drama
40,41,Richard III (1995),Drama|War
58,59,"Confessional, The (Le Confessionnal) (1995)",Drama|Mystery
111,113,Before and After (1996),Drama|Mystery
153,155,Beyond Rangoon (1995),Drama|War
...,...,...,...
3732,3801,Anatomy of a Murder (1959),Drama|Mystery
3739,3808,Two Women (La Ciociara) (1961),Drama|War
3742,3811,Breaker Morant (1980),Drama|War
3850,3920,"Faraway, So Close (In Weiter Ferne, So Nah!) (...",Drama|Fantasy


## Create Hybrid RecSys

Weights are set to 0.5 for both CF and CBF in Hybrid score

In [63]:
# Join CF and CBF scores
hybrid_scores = user_recs.join(cbf_scores_sdf, on="movie_id", how="left")

# Handle missing values in CBF scores if any
hybrid_scores = hybrid_scores.fillna({"cbf_score": 0.0})

# Normalize CF scores to [0, 1] so it can be added with CBF
cf_stats = hybrid_scores.select(spark_min("cf_score").alias("cf_min"), spark_max("cf_score").alias("cf_max")).first()

cf_min, cf_max = cf_stats["cf_min"], cf_stats["cf_max"]

hybrid_scores = hybrid_scores.withColumn("cf_score_norm", (col("cf_score") - lit(cf_min)) / (lit(cf_max) - lit(cf_min)))
hybrid_scores = hybrid_scores.withColumn("hybrid_score", expr("0.5 * cf_score_norm + 0.5 * cbf_score"))


In [64]:
hybrid_scores.show()

+--------+-------+---------+------------------+-------------------+-------------------+
|movie_id|user_id| cf_score|         cbf_score|      cf_score_norm|       hybrid_score|
+--------+-------+---------+------------------+-------------------+-------------------+
|      29|      5| 4.017146|0.9542426466941833| 0.3029198920979887|  0.628581269396086|
|    3091|     17|4.6508794|               0.0|0.37257921078673134|0.18628960539336567|
|      29|     22|3.8462017|0.9542426466941833| 0.2841298512470714| 0.6191862489706274|
|    3091|     27| 4.734081|               0.0| 0.3817246252270764| 0.1908623126135382|
|    3091|     40| 4.187794|               0.0| 0.3216773579854437|0.16083867899272186|
|    3091|     81|5.1175833|               0.0| 0.4238788293124197|0.21193941465620986|
|    2250|     85|4.7133617|               1.0|0.37944720811139127| 0.6897236040556957|
|    1950|     88| 4.831393|               0.0|0.39242105868395394|0.19621052934197697|
|     964|    101|5.2860756|    

### Evaluation on all 3 models

In [69]:
# Join entire Ratings dataset for evaluation by keeping high Rated movies only
relevant_set = ratings.filter(col("rating") > 3.0).groupBy("user_id").agg(collect_list("movie_id").alias("relevant"))


In [70]:
# Evaluate precision@k and ndcg@k using Spark SQL array functions
from pyspark.sql.functions import expr, collect_list, explode, size, array_intersect, array_position, array, sort_array

def evaluate_recommender_variant(score_column):
  # Generate recommendations
  topk = 10
  window = Window.partitionBy("user_id").orderBy(col(score_column).desc())
  top_recs = hybrid_scores.withColumn("rank", row_number().over(window)).filter(col("rank") <= topk).groupBy("user_id").agg(collect_list("movie_id").alias("recommended"))

  # Join with relevance info
  eval_df = top_recs.join(relevant_set, on="user_id", how="inner")
  eval_df = eval_df.withColumn("intersection", array_intersect("recommended", "relevant"))
  eval_df = eval_df.withColumn("precision", size("intersection") / lit(topk))

  eval_df = eval_df.withColumn("ordered_relevant", sort_array("relevant"))
  eval_df = eval_df.withColumn("dcg", expr("aggregate(sequence(0, size(recommended)-1), 0D, (acc, i) -> acc + IF(array_contains(relevant, recommended[i]), 1 / log2(i+2), 0))"))
  eval_df = eval_df.withColumn("idcg", expr("aggregate(sequence(0, least(size(relevant), {0})-1), 0D, (acc, i) -> acc + 1 / log2(i+2))".format(topk)))
  eval_df = eval_df.withColumn("ndcg", expr("IF(idcg > 0, dcg / idcg, 0)"))

  # Average metrics
  results = eval_df.select("user_id", "precision", "ndcg")
  results.cache()

  metrics = results.agg({"precision": "avg", "ndcg": "avg"}).first()
  avg_precision = metrics["avg(precision)"]
  avg_ndcg = metrics["avg(ndcg)"]

  return avg_precision, avg_ndcg


In [71]:
metrics_cf = evaluate_recommender_variant("cf_score_norm")
metrics_cbf = evaluate_recommender_variant("cbf_score")
metrics_hybrid = evaluate_recommender_variant("hybrid_score")


In [72]:
# Print Comparison
print("\n--- Recommendation Quality Comparison ---")
print(f"Model \t\t| {'Precision@k':<12} | {'NDCG@k'}")
print(f"Collabarative | {metrics_cf[0]:<12.4f} | {metrics_cf[1]:.4f}")
print(f"Content Based | {metrics_cbf[0]:<12.4f} | {metrics_cbf[1]:.4f}")
print(f"Hybrid \t\t| {metrics_hybrid[0]:<12.4f} | {metrics_hybrid[1]:.4f}")


--- Recommendation Quality Comparison ---
Model 		| Precision@k  | NDCG@k
Collabarative | 0.0926       | 0.0730
Content Based | 0.0558       | 0.0536
Hybrid 		| 0.0574       | 0.0453


## Cold start strategies
### For new users: Recommend top popular movies

In [74]:
topk=10
popular_movies = train_ratings.groupBy("movie_id").count().orderBy(col("count").desc()).limit(topk)
popular_movies.show()

+--------+-----+
|movie_id|count|
+--------+-----+
|    2858| 2714|
|    1196| 2407|
|     260| 2384|
|    1210| 2304|
|     480| 2127|
|    2028| 2118|
|     589| 2095|
|     593| 2076|
|    2571| 2074|
|    1270| 2059|
+--------+-----+



### For new items: Recommend to users with similar past rated genres

In [75]:
# Step 1: Build user genre profiles
train_with_genres = train_ratings.join(movies.select("movie_id", "genres"), on="movie_id")
user_genre_profile = train_with_genres.groupBy("user_id", "genres").count()

# Step 2: Recommend new items to users who rated similar genres in past
new_items = movies.join(train_ratings, on="movie_id", how="left_anti")  # unseen items
user_genre_profile = user_genre_profile.withColumnRenamed("genres", "user_genres")
new_item_candidates = new_items.join(user_genre_profile, new_items.genres == user_genre_profile.user_genres)

recommended_new_items = new_item_candidates.groupBy("user_id").agg(collect_list("movie_id").alias("new_recommendations"))
recommended_new_items.show(truncate=False)


+-------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

### Show New movies

In [76]:
new_items.show()

+--------+--------------------+--------------------+
|movie_id|               title|              genres|
+--------+--------------------+--------------------+
|      51|Guardian Angel (1...|Action Drama Thri...|
|     115|Happiness Is in t...|              Comedy|
|     139|       Target (1995)|        Action Drama|
|     142|Shadows (Cienie) ...|               Drama|
|     143|        Gospa (1995)|               Drama|
|     395| Desert Winds (1995)|               Drama|
|     399|Girl in the Cadil...|               Drama|
|     403|   Two Crimes (1995)|  Comedy Crime Drama|
|     629|         Rude (1995)|               Drama|
|     641|Little Indian, Bi...|              Comedy|
|     654|Und keiner weint ...|       Drama Romance|
|     675|Hostile Intention...|Action Drama Thri...|
|     683|Eye of Vichy, The...|         Documentary|
|     693|Under the Domin T...|               Drama|
|     699|To Cross the Rubi...|               Drama|
|     713|Of Love and Shado...|               

In [77]:
ratings.filter(col("movie_id")==51).show() #verify if its new movie, not rated in past

+-------+--------+------+
|user_id|movie_id|rating|
+-------+--------+------+
+-------+--------+------+



In [78]:
spark.stop()