# Hybrid Recommendation System

**Team Structure:**
- Member 1: Infrastructure, Data Loading, Fusion & Evaluation
- Member 2: Collaborative Filtering (ALS)
- Member 3: Content-Based Filtering (TF-IDF + LSH)

## 1. Setup

### 1.1 Imports

In [1]:
import os
import sys
import urllib.request
import zipfile
from math import log2

# Fix for Windows
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

from pyspark.ml.recommendation import ALS
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType, FloatType, StructType, StructField

### 1.2 Download Data

In [2]:
DATA_URL = "https://files.grouplens.org/datasets/movielens/ml-1m.zip"
DATA_DIR = "data"
DATASET_DIR = os.path.join(DATA_DIR, "ml-1m")
ZIP_PATH = os.path.join(DATA_DIR, "ml-1m.zip")

In [3]:
if not os.path.exists(DATA_DIR):
    os.makedirs(DATA_DIR)

In [4]:
if not os.path.exists(DATASET_DIR):

    if not os.path.exists(ZIP_PATH):
        print("Downloading MovieLens ml-1m...")
        urllib.request.urlretrieve(DATA_URL, ZIP_PATH)

    print("Extracting...")
    with zipfile.ZipFile(ZIP_PATH, 'r') as zip_ref:
        zip_ref.extractall(DATA_DIR)

### 1.3 Spark Session

In [5]:
spark = SparkSession.builder.appName("MMDS").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

### 1.4 Load Data

In [6]:
users_df = spark.read.text(os.path.join(DATASET_DIR, "users.dat")).select(
    F.split(F.col("value"), "::").getItem(0).cast(IntegerType()).alias("user_id"),
    F.split(F.col("value"), "::").getItem(1).alias("gender"),
    F.split(F.col("value"), "::").getItem(2).cast(IntegerType()).alias("age"),
    F.split(F.col("value"), "::").getItem(3).cast(IntegerType()).alias("occupation"),
    F.split(F.col("value"), "::").getItem(4).alias("zip_code")
)
users_df.count()

6040

In [7]:
items_df = spark.read.text(os.path.join(DATASET_DIR, "movies.dat")).select(
    F.split(F.col("value"), "::").getItem(0).cast(IntegerType()).alias("item_id"),
    F.split(F.col("value"), "::").getItem(1).alias("title"),
    F.split(F.col("value"), "::").getItem(2).alias("genres")
)
items_df.count()

3883

In [8]:
ratings_df = spark.read.text(os.path.join(DATASET_DIR, "ratings.dat")).select(
    F.split(F.col("value"), "::").getItem(0).cast(IntegerType()).alias("user_id"),
    F.split(F.col("value"), "::").getItem(1).cast(IntegerType()).alias("item_id"),
    F.split(F.col("value"), "::").getItem(2).cast(FloatType()).alias("rating"),
    F.split(F.col("value"), "::").getItem(3).cast(IntegerType()).alias("timestamp")
)
ratings_df.count()

1000209

In [9]:
users_df.show(5)

+-------+------+---+----------+--------+
|user_id|gender|age|occupation|zip_code|
+-------+------+---+----------+--------+
|      1|     F|  1|        10|   48067|
|      2|     M| 56|        16|   70072|
|      3|     M| 25|        15|   55117|
|      4|     M| 45|         7|   02460|
|      5|     M| 25|        20|   55455|
+-------+------+---+----------+--------+
only showing top 5 rows


In [10]:
items_df.show(5)

+-------+--------------------+--------------------+
|item_id|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Animation|Childre...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|        Comedy|Drama|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows


In [11]:
ratings_df.show(5)

+-------+-------+------+---------+
|user_id|item_id|rating|timestamp|
+-------+-------+------+---------+
|      1|   1193|   5.0|978300760|
|      1|    661|   3.0|978302109|
|      1|    914|   3.0|978301968|
|      1|   3408|   4.0|978300275|
|      1|   2355|   5.0|978824291|
+-------+-------+------+---------+
only showing top 5 rows


## 2. Exploratory Data Analysis

### 2.1 Rating matrix

In [12]:
num_users = users_df.count()
num_items = items_df.count()
num_ratings = ratings_df.count()
sparsity = (1 - (num_ratings / (num_users * num_items))) * 100

print(f"Users:            {num_users:,}")
print(f"Movies:           {num_items:,}")
print(f"Ratings:          {num_ratings:,}")
print(f"Sparsity:         {sparsity:.2f}%")
print(f"Avg ratings/user: {num_ratings/num_users:.1f}")
print(f"Avg ratings/movie:{num_ratings/num_items:.1f}")

Users:            6,040
Movies:           3,883
Ratings:          1,000,209
Sparsity:         95.74%
Avg ratings/user: 165.6
Avg ratings/movie:257.6


### 2.2 Rating distribution

In [13]:
ratings_df.groupBy("rating").count().orderBy("rating").show()

+------+------+
|rating| count|
+------+------+
|   1.0| 56174|
|   2.0|107557|
|   3.0|261197|
|   4.0|348971|
|   5.0|226310|
+------+------+



### 2.3 Genre distribution

In [14]:
items_df.select(F.explode(F.split(F.col("genres"), "\\|")).alias("genre")) \
    .groupBy("genre").count().orderBy(F.desc("count")).show()

+-----------+-----+
|      genre|count|
+-----------+-----+
|      Drama| 1603|
|     Comedy| 1200|
|     Action|  503|
|   Thriller|  492|
|    Romance|  471|
|     Horror|  343|
|  Adventure|  283|
|     Sci-Fi|  276|
| Children's|  251|
|      Crime|  211|
|        War|  143|
|Documentary|  127|
|    Musical|  114|
|    Mystery|  106|
|  Animation|  105|
|    Fantasy|   68|
|    Western|   68|
|  Film-Noir|   44|
+-----------+-----+



### 2.4 User gender distribution

In [15]:
users_df.groupBy("gender").count().show()

+------+-----+
|gender|count|
+------+-----+
|     F| 1709|
|     M| 4331|
+------+-----+



### 2.5 User age distribution

In [16]:
users_df.groupBy("age").count().orderBy("age").show()

+---+-----+
|age|count|
+---+-----+
|  1|  222|
| 18| 1103|
| 25| 2096|
| 35| 1193|
| 45|  550|
| 50|  496|
| 56|  380|
+---+-----+



## 3. Train/Test Split

In [17]:
train_df, test_df = ratings_df.randomSplit([0.8, 0.2], seed=42)
train_df = train_df.cache()
test_df = test_df.cache()

In [18]:
train_df.count()

800092

In [19]:
test_df.count()

200117

## 4. Collaborative Filtering (ALS)

Implement using `pyspark.ml.recommendation.ALS`

In [20]:
class CollaborativeFilter:

    def __init__(self, rank=10, regParam=0.1, maxIter=10):
        self.als = ALS(
            userCol="user_id",
            itemCol="item_id",
            ratingCol="rating",
            rank=rank,
            regParam=regParam,
            maxIter=maxIter,
            coldStartStrategy="drop",
            nonnegative=True
        )
        self.model = None

    def train(self, df):
        self.model = self.als.fit(df)

    def get_recommendations(self, df, k=10):
        """Get top-K recommendations"""
        if self.model is None:
            raise ValueError("Call train() first.")

        users = df.select("user_id").distinct()
        user_recs = self.model.recommendForUserSubset(users, k)

        return user_recs.select(
            F.col("user_id"),
            F.explode("recommendations").alias("rec")
        ).select(
            F.col("user_id"),
            F.col("rec.item_id").cast(IntegerType()).alias("item_id"),
            F.col("rec.rating").alias("prediction")
        )

    def predict(self, df):
        """Predict ratings for user-item pairs in test"""
        if self.model is None:
            raise ValueError("Train should be called first")
        return self.model.transform(df)

In [21]:
cf = CollaborativeFilter(rank=10, regParam=0.1, maxIter=10)
cf.train(train_df)

In [22]:
als_recs = cf.get_recommendations(test_df, k=10).withColumnRenamed("prediction", "als_score")

In [23]:
als_recs.show(10)

+-------+-------+---------+
|user_id|item_id|als_score|
+-------+-------+---------+
|     26|   1851|4.1776996|
|     26|   3233|4.0753975|
|     26|   3245| 3.925116|
|     26|   1741|3.8924155|
|     26|   3314|3.8353853|
|     26|   2332|3.8290246|
|     26|    811|3.7872531|
|     26|   2569|3.7839422|
|     26|    527| 3.777091|
|     26|    598|3.7752023|
+-------+-------+---------+
only showing top 10 rows


### Bonus: Hyperparameter Tuning

In [24]:
# TODO

## 5. Content-Based Filtering (TF-IDF + LSH)

Implement using `pyspark.ml.feature` (Tokenizer, HashingTF, IDF, BucketedRandomProjectionLSH)

In [25]:
content_recs = spark.range(0).select(
    F.lit(None).cast(IntegerType()).alias("user_id"),
    F.lit(None).cast(IntegerType()).alias("item_id"),
    F.lit(None).cast(FloatType()).alias("content_score")
).limit(0)

In [26]:
content_recs.show(10)

+-------+-------+-------------+
|user_id|item_id|content_score|
+-------+-------+-------------+
+-------+-------+-------------+



## 6.Fusion & Evaluation

In [27]:
ALPHA = 0.7
K = 10
RELEVANCE_THRESHOLD = 4.0

### 6.1 Normalization

In [28]:
def normalize(df, col_name):
    stats = df.agg(F.min(col_name).alias("min"), F.max(col_name).alias("max")).collect()[0]
    if stats["max"] == stats["min"]:
        return df.withColumn(col_name + "_norm", F.lit(0.5))
    return df.withColumn(col_name + "_norm", (F.col(col_name) - stats["min"]) / (stats["max"] - stats["min"]))

### 6.2 Hybrid Fusion

In [29]:
als_norm = normalize(als_recs, "als_score")
content_norm = normalize(content_recs, "content_score")

hybrid_recs = als_norm.select("user_id", "item_id", "als_score_norm") \
    .join(content_norm.select("user_id", "item_id", "content_score_norm"), ["user_id", "item_id"], "full_outer") \
    .fillna(0) \
    .withColumn("final_score", ALPHA * F.col("als_score_norm") + (1 - ALPHA) * F.col("content_score_norm"))

In [30]:
hybrid_recs.orderBy(F.desc("final_score")).show(10)

+-------+-------+------------------+------------------+------------------+
|user_id|item_id|    als_score_norm|content_score_norm|       final_score|
+-------+-------+------------------+------------------+------------------+
|   2155|   1796|               1.0|               0.0|               0.7|
|     46|   1796|0.9461164186370618|               0.0|0.6622814930459432|
|   2155|   3382|0.9440066924193304|               0.0|0.6608046846935313|
|   2867|   1360|0.9320735817948679|               0.0|0.6524515072564074|
|   4801|   1796|0.9063583148686332|               0.0|0.6344508204080431|
|   5099|   1796|0.8969736507904302|               0.0|0.6278815555533012|
|   3915|   1796| 0.896728774059216|               0.0|0.6277101418414511|
|    121|    557|0.8876177957394289|               0.0|0.6213324570176002|
|   3902|   1796|0.8870025995414138|               0.0|0.6209018196789896|
|    283|    598|0.8839311456841844|               0.0|0.6187518019789291|
+-------+-------+--------

### 6.3 Ground Truth

In [31]:
ground_truth = test_df.filter(F.col("rating") >= RELEVANCE_THRESHOLD) \
    .groupBy("user_id").agg(F.collect_list("item_id").alias("relevant_items"))

### Evaluation Functions

In [32]:
def get_top_k(recs_df, score_col, k):
    window = Window.partitionBy("user_id").orderBy(F.desc(score_col))
    return recs_df.withColumn("rank", F.row_number().over(window)) \
        .filter(F.col("rank") <= k) \
        .groupBy("user_id").agg(F.collect_list("item_id").alias("recommended_items"))

In [33]:
def precision_at_k(top_k_df, ground_truth_df, k):
    joined = top_k_df.join(ground_truth_df, "user_id")
    result = joined.withColumn("hits", F.size(F.array_intersect("recommended_items", "relevant_items"))) \
        .agg(F.avg(F.col("hits") / k)).collect()[0][0]
    return result or 0.0

In [34]:
def recall_at_k(top_k_df, ground_truth_df):
    joined = top_k_df.join(ground_truth_df, "user_id")
    result = joined.withColumn("hits", F.size(F.array_intersect("recommended_items", "relevant_items"))) \
        .withColumn("recall", F.when(F.size("relevant_items") > 0, F.col("hits") / F.size("relevant_items")).otherwise(0)) \
        .agg(F.avg("recall")).collect()[0][0]
    return result or 0.0

In [35]:
def ndcg_at_k(top_k_df, ground_truth_df, k):
    joined = top_k_df.join(ground_truth_df, "user_id")
    exploded = joined.select(
        "user_id",
        "relevant_items",
        F.posexplode("recommended_items").alias("pos", "item_id")
    ).withColumn("item_id", F.col("item_id").cast(IntegerType()))

    with_dcg = exploded \
        .withColumn("rel", F.when(F.array_contains("relevant_items", F.col("item_id")), 1.0).otherwise(0.0)) \
        .withColumn("dcg", F.col("rel") / F.log2(F.col("pos") + 2)) \
        .groupBy("user_id", "relevant_items").agg(F.sum("dcg").alias("dcg"))

    idcg_vals = [sum(1.0 / log2(i + 2) for i in range(n)) for n in range(k + 1)]
    idcg_map = F.create_map(*[x for i, v in enumerate(idcg_vals) for x in (F.lit(i), F.lit(v))])

    result = with_dcg \
        .withColumn("num_rel", F.least(F.size("relevant_items"), F.lit(k))) \
        .withColumn("idcg", idcg_map[F.col("num_rel")]) \
        .withColumn("ndcg", F.when(F.col("idcg") > 0, F.col("dcg") / F.col("idcg")).otherwise(0)) \
        .agg(F.avg("ndcg")).collect()[0][0]
    return result or 0.0

In [36]:
def evaluate(recs_df, score_col, name):
    if recs_df.count() == 0:
        print(f"{name}: No recommendations (not implemented)")
        return {"Precision@10": 0.0, "Recall@10": 0.0, "NDCG@10": 0.0}

    top_k = get_top_k(recs_df, score_col, K)
    p = precision_at_k(top_k, ground_truth, K)
    r = recall_at_k(top_k, ground_truth)
    n = ndcg_at_k(top_k, ground_truth, K)

    print(f"{name}: P@{K}={p:.4f}, R@{K}={r:.4f}, NDCG@{K}={n:.4f}")
    return {"Precision@10": p, "Recall@10": r, "NDCG@10": n}

### Evaluation

In [37]:
als_metrics = evaluate(als_recs, "als_score", "ALS")

ALS: P@10=0.0264, R@10=0.0200, NDCG@10=0.0237


In [38]:
content_metrics = evaluate(content_recs, "content_score", "Content-Based")

Content-Based: No recommendations (not implemented)


In [39]:
hybrid_metrics = evaluate(hybrid_recs, "final_score", "Hybrid")

Hybrid: P@10=0.0264, R@10=0.0200, NDCG@10=0.0237


### Bonus: GBT Re-Ranking

In [40]:
# TODO

## Results Summary

In [41]:
summary = [
    ("ALS", als_metrics["Precision@10"], als_metrics["Recall@10"], als_metrics["NDCG@10"]),
    ("Content-Based", content_metrics["Precision@10"], content_metrics["Recall@10"], content_metrics["NDCG@10"]),
    ("Hybrid", hybrid_metrics["Precision@10"], hybrid_metrics["Recall@10"], hybrid_metrics["NDCG@10"]),
]
spark.createDataFrame(summary, ["Model", "Precision@10", "Recall@10", "NDCG@10"]).show()

+-------------+-------------------+-------------------+--------------------+
|        Model|       Precision@10|          Recall@10|             NDCG@10|
+-------------+-------------------+-------------------+--------------------+
|          ALS|0.02639398998330497|0.01996941450571081|0.023734041831077992|
|Content-Based|                0.0|                0.0|                 0.0|
|       Hybrid|0.02639398998330497|0.01996941450571081|0.023734041831077992|
+-------------+-------------------+-------------------+--------------------+

