**Foreword for Synthetic Data Selection**

#### Synthetic Data Generated for Recommendation System Using CTGAN

To support large-scale training of the recommendation system, synthetic interaction data was generated using the CTGAN (Conditional Tabular GAN) model from the SDV framework. Only structured interaction features namely user_id, product_id, and rating were used for synthetic generation. Textual fields such as review content were intentionally excluded from CTGAN training, as GAN-based tabular models are not designed to generate semantically meaningful natural language and tend to produce anonymized placeholder strings when applied to text data. Sentiment labels were therefore generated independently using a dedicated NLP pipeline.

Prior to CTGAN training, the original rating distribution was found to be severely imbalanced, with the majority of ratings clustered in the higher range (approximately 4.0–4.3). To mitigate this bias and enable more effective learning by the recommender model, ratings were discretized into four bins: Low, Medium, High, and Very High and oversampling with replacement was applied to construct a balanced training dataset. This balanced dataset ensured that CTGAN learned from a more representative rating distribution rather than simply replicating the original skew.

CTGAN was then trained on the balanced subset using only the three structured variables: user_id, product_id, and rating. Due to the high cardinality of user and product identifiers and the computational cost of GAN training, the model was trained with reduced epochs and batch size, with GPU acceleration enabled where available. After training, the model was used to generate a synthetic dataset approximately 50 times larger than the original dataset.

The resulting synthetic interaction dataset preserves:

- Realistic user–product interaction density

- A balanced rating distribution

- Valid referential structure between users and products

This synthetic dataset was subsequently used as the primary input for training and evaluating the ALS-based collaborative filtering recommender system in Databricks. By separating structured synthetic generation (CTGAN) from text-based sentiment modeling, the project ensures both computational efficiency and modeling correctness across different analytical modules.

**Setup & loading CSV**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

# 0.1 Load your synthetic CSV
data_path = "/Workspace/Users/jayasricdsa@gmail.com/synthetic_ctgan_rating_balanced_50x.csv"

df_raw = spark.read.csv(data_path, header=True, inferSchema=True)

# selecting ONLY the columns we need, ignore review_content
df = df_raw.select("user_id", "product_id", "rating")

# Ensure ratings is numeric (float/double)
df = df.withColumn("rating", F.col("rating").cast("double"))

df.printSchema()
df.show(5)
print("Rows:", df.count(),
      "| Users:", df.select("user_id").distinct().count(),
      "| Products:", df.select("product_id").distinct().count())


root
 |-- user_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- rating: double (nullable = true)

+--------------------+----------+------+
|             user_id|product_id|rating|
+--------------------+----------+------+
|AGWU5ZFZCOMADUNNZ...|B0941392C8|   4.1|
|AF7GOEYE5GJO744YP...|B086394NY5|   3.6|
|AHCIMCXVSX6LO3HH7...|B0BQ3K23Y1|   3.4|
|AFNGD6S7UIHBQ2FNX...|B08G1RW2Q3|   3.7|
|AH5SAORYVUN5MGIBL...|B08QW937WV|   3.7|
+--------------------+----------+------+
only showing top 5 rows
Rows: 442450 | Users: 5157 | Products: 1342


Since the recommendation task only requires user–item interactions and ratings, we exclude the review_content column and retain only the essential fields: user_id, product_id, and rating.
he rating column is explicitly cast to a numeric type to ensure compatibility with machine learning models such as ALS.

## Indexing users/items + train/val/test split

In [None]:
from pyspark.ml.feature import StringIndexer

# 1.1 Index user_id and product_id → user_index, item_index
user_indexer = StringIndexer(
    inputCol="user_id",
    outputCol="user_index",
    handleInvalid="skip"
)

item_indexer = StringIndexer(
    inputCol="product_id",
    outputCol="item_index",
    handleInvalid="skip"
)

"""This transformation is required because matrix factorization models such as ALS operate only on numerical inputs."""

df_idx = user_indexer.fit(df).transform(df)
df_idx = item_indexer.fit(df_idx).transform(df_idx)

# Keep only numeric IDs + original
ratings_df = df_idx.select("user_id", "product_id",
                           "user_index", "item_index", "rating")

ratings_df.show(5)

# 1.2 Train / Val / Test split
train_df, val_df, test_df = ratings_df.randomSplit([0.6, 0.2, 0.2], seed=42)

#20% Validation set – for hyperparameter tuning,

print("Train:", train_df.count(),
      "| Val:", val_df.count(),
      "| Test:", test_df.count())


+--------------------+----------+----------+----------+------+
|             user_id|product_id|user_index|item_index|rating|
+--------------------+----------+----------+----------+------+
|AGWU5ZFZCOMADUNNZ...|B0941392C8|     447.0|     284.0|   4.1|
|AF7GOEYE5GJO744YP...|B086394NY5|     156.0|     738.0|   3.6|
|AHCIMCXVSX6LO3HH7...|B0BQ3K23Y1|      56.0|      11.0|   3.4|
|AFNGD6S7UIHBQ2FNX...|B08G1RW2Q3|      48.0|     301.0|   3.7|
|AH5SAORYVUN5MGIBL...|B08QW937WV|      86.0|     391.0|   3.7|
+--------------------+----------+----------+----------+------+
only showing top 5 rows
Train: 266112 | Val: 87981 | Test: 88357


This three-way split ensures unbiased evaluation and proper model generalization.

**Popularity check**

In [None]:
product_popularity = (
    train.groupBy("product_id")
    .agg(
        F.count("*").alias("num_ratings"), #measures how frequently a product is interacted with.
        F.avg("rating_float").alias("avg_rating") #reflects overall user satisfaction with the product.
    )
    .orderBy(F.col("num_ratings").desc(), F.col("avg_rating").desc())
)

product_popularity.show(10)


+----------+-----------+-----------------+
|product_id|num_ratings|       avg_rating|
+----------+-----------+-----------------+
|B0BFBNXS94|      63580|2.299999952316284|
|B0BPJBTB3F|      25476|              2.0|
|B0B53DS4TF|       4646|4.800000190734863|
|B0B23LW7NV|       4338|4.699999809265137|
|B09WN3SRC7|       4302|4.699999809265137|
|B09ZHCJDP1|       4268|              5.0|
|B0B244R4KB|       4154|4.599999904632568|
|B0BQ3K23Y1|       3988|4.800000190734863|
|B0BM9H2NY9|       3954|4.699999809265137|
|B078JT7LTD|       3881|4.599999904632568|
+----------+-----------+-----------------+
only showing top 10 rows


##ALS (tuning + RMSE + Top-N)


****Hyperparameter tuning with RMSE on validation set**

In [None]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

# 2.1.1 Setup ALS and evaluator
als = ALS(
    userCol="user_index",
    itemCol="item_index",
    ratingCol="rating",
    nonnegative=True, # nonnegative ratings for stable factorization
    implicitPrefs=False, # explicit ratings
    coldStartStrategy="drop"   # drop NaN predictions
)

evaluator = RegressionEvaluator(
    labelCol="rating",
    predictionCol="prediction",
    metricName="rmse"
)

# 2.1.2 Simple hyperparameter grid
ranks = [10, 20]
regParams = [0.01, 0.1]
maxIters = [10]

best_rmse = float("inf")
best_model = None
best_params = None

for r in ranks:
    for reg in regParams:
        for it in maxIters:
            print(f"Training ALS(rank={r}, regParam={reg}, maxIter={it})")
            als.setParams(rank=r, regParam=reg, maxIter=it)
            model = als.fit(train_df)

            # evaluate on validation set
            val_preds = model.transform(val_df)
            rmse = evaluator.evaluate(val_preds)
            print(f"  Validation RMSE = {rmse:.4f}")

            if rmse < best_rmse:
                best_rmse = rmse
                best_model = model
                best_params = (r, reg, it)

print("✅ Best ALS params (rank, regParam, maxIter):", best_params)
print("✅ Best validation RMSE:", best_rmse)

als_model = best_model


Training ALS(rank=10, regParam=0.01, maxIter=10)
  Validation RMSE = 0.4428
Training ALS(rank=10, regParam=0.1, maxIter=10)
  Validation RMSE = 0.4080
Training ALS(rank=20, regParam=0.01, maxIter=10)
  Validation RMSE = 0.4446
Training ALS(rank=20, regParam=0.1, maxIter=10)
  Validation RMSE = 0.4037
✅ Best ALS params (rank, regParam, maxIter): (20, 0.1, 10)
✅ Best validation RMSE: 0.4037005788402821


The model with the lowest validation RMSE is selected as the final ALS recommender.

**Final evaluation on test set**

In [None]:
test_preds = als_model.transform(test_df)
test_rmse = evaluator.evaluate(test_preds)

print("✅ Final ALS Test RMSE:", test_rmse)

✅ Final ALS Test RMSE: 0.40462193958086695


The test RMSE provides an unbiased estimate of the model’s generalization performance and indicates how accurately the ALS model predicts unseen user ratings.

**Top-N ALS recommendations for all users**

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

N = 10  # Top N

# 1) Candidate (user, item) pairs for prediction
#    Easiest: use pairs from the test set (for evaluation)
candidates = test_df.select("user_index", "item_index").dropDuplicates()

# 2) Predict ratings for these (user, item) pairs
preds = als_model.transform(candidates)
# preds schema: user_index, item_index, prediction

# 3) For each user, rank items by predicted score and keep Top-N
w = Window.partitionBy("user_index").orderBy(F.desc("prediction"))

topN_preds = (
    preds
    .withColumn("rank", F.row_number().over(w))
    .filter(F.col("rank") <= N)
)

# 4) Map item_index -> product_id
item_lookup = ratings_df.select("item_index", "product_id").dropDuplicates()

als_recs_exploded = (
    topN_preds
    .join(item_lookup, on="item_index", how="left")
    .select(
        "user_index",
        "item_index",
        "product_id",
        F.col("prediction").alias("predicted_rating"),
        "rank"
    )
)

als_recs_exploded.show(10, truncate=False)


+----------+----------+----------+----------------+----+
|user_index|item_index|product_id|predicted_rating|rank|
+----------+----------+----------+----------------+----+
|0         |1117      |B09474JWN6|3.9762316       |1   |
|0         |1093      |B01MY839VW|3.9556901       |2   |
|0         |1123      |B08MWJTST6|3.955677        |3   |
|0         |1019      |B01F262EUU|3.939845        |4   |
|0         |1077      |B09MTLG4TP|3.939609        |5   |
|0         |394       |B08H5L8V1L|3.9383323       |6   |
|0         |882       |B08Y5KXR6Z|3.9334748       |7   |
|0         |676       |B01N90RZ4M|3.9280705       |8   |
|0         |631       |B009DA69W6|3.927261        |9   |
|0         |1245      |B09FPP3R1D|3.925707        |10  |
+----------+----------+----------+----------------+----+
only showing top 10 rows


This step generates personalized Top-N product recommendations for each user using the final tuned ALS model.

- Candidate user–item pairs are created from the test dataset to ensure that recommendations are evaluated only on unseen interactions.

- The trained ALS model predicts ratings for all candidate pairs.

- For each user, products are ranked by predicted rating using a window function, and only the Top-N highest-scoring items are retained.

- The numeric item_index is mapped back to the original product_id for interpretability.

## Cosine-similarity recommender using ALS item factors


**Extract item factors and add L2 norms**

In [None]:
from pyspark.sql.types import FloatType
import math

# 3.1.1 ALS item factors (id = item_index)
item_factors = als_model.itemFactors.withColumnRenamed("id", "item_index")
item_factors.show(5, truncate=False)

# 3.1.2 L2 norm UDF
def l2_norm(vec):
    return float(math.sqrt(sum(x*x for x in vec)))

l2_norm_udf = F.udf(l2_norm, FloatType())

item_factors_norm = item_factors.withColumn(
    "norm",
    l2_norm_udf(F.col("features"))
)

item_factors_norm.show(5, truncate=False)


+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|item_index|features                                                                                                                                                                                                                                        |
+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|0         |[0.34359035, 0.036627363, 0.22529846, 0.39129198, 0.3983644, 0.105124116, 0.42956218, 0.42076716, 0.4872733, 0.7026711, 0.50307924, 0.24117272, 0.4288853, 0.66388124, 0.4547917, 0.4131495, 0.13477689, 0.44661438, 0.36689937, 0

- A latent feature vector from ALS,

- Its corresponding vector magnitude (L2 norm),
which together enable efficient item–item cosine similarity calculations.-


**Compute item-item cosine similarity**

In [None]:
# 3.2.1 Dot product UDF
from pyspark.sql.types import FloatType

def dot_product(a, b):
    return float(sum(x*y for x, y in zip(a, b)))

dot_product_udf = F.udf(dot_product, FloatType())

i = item_factors_norm.alias("i")
j = item_factors_norm.alias("j")

# Only pair i<j to avoid duplicates and self-pairs
item_sims = i.join(j, F.col("i.item_index") < F.col("j.item_index"))

item_sims = (
    item_sims
    .select(
        F.col("i.item_index").alias("item_i"),
        F.col("j.item_index").alias("item_j"),
        dot_product_udf(F.col("i.features"), F.col("j.features")).alias("dot_ij"),
        F.col("i.norm").alias("norm_i"),
        F.col("j.norm").alias("norm_j")
    )
    .withColumn(
        "cosine_sim",
        F.col("dot_ij") / (F.col("norm_i") * F.col("norm_j"))
    )
)

item_sims.show(5, truncate=False)

# 3.2.2 Make similarity symmetric: (i,j) and (j,i)
item_sims_i = item_sims.select("item_i", "item_j", "cosine_sim")
item_sims_j = item_sims.select(
    F.col("item_j").alias("item_i"),
    F.col("item_i").alias("item_j"),
    "cosine_sim"
)

item_sims_full = item_sims_i.union(item_sims_j)
item_sims_full.show(5, truncate=False)


+------+------+---------+---------+---------+------------------+
|item_i|item_j|dot_ij   |norm_i   |norm_j   |cosine_sim        |
+------+------+---------+---------+---------+------------------+
|0     |10    |3.2356787|1.8063382|1.8013742|0.9944029107091776|
|0     |20    |3.2850156|1.8063382|1.8212997|0.9985204478370173|
|0     |30    |3.2490106|1.8063382|1.8027614|0.9977317851290327|
|0     |40    |3.1855087|1.8063382|1.7735597|0.9943377562201776|
|0     |50    |3.2386692|1.8063382|1.7952691|0.998706691897117 |
+------+------+---------+---------+---------+------------------+
only showing top 5 rows
+------+------+------------------+
|item_i|item_j|cosine_sim        |
+------+------+------------------+
|0     |10    |0.9944029107091776|
|0     |20    |0.9985204478370173|
|0     |30    |0.9977317851290327|
|0     |40    |0.9943377562201776|
|0     |50    |0.998706691897117 |
+------+------+------------------+
only showing top 5 rows


### Item–Item Cosine Similarity Using ALS Latent Factors

This step computes the **cosine similarity between products** using the **latent feature vectors learned by ALS**.

- A **dot product UDF** is defined to compute the inner product between two product feature vectors.
- The item factor table is **self-joined** using a strict condition (`item_i < item_j`) to:
  - Avoid duplicate pair calculations,
  - Exclude self-comparisons.

The cosine similarity is calculated as:

$$
\text{Cosine Similarity} = \frac{\mathbf{v_i} \cdot \mathbf{v_j}}{\|\mathbf{v_i}\| \times \|\mathbf{v_j}\|}
$$

Since similarity is symmetric, the results are **mirrored** to generate both \((i, j)\) and \((j, i)\) pairs.

**Outcome:** A complete **item–item similarity matrix** in the ALS latent feature space, which forms the basis for similarity-based recommendations.


**Build item-based cosine recommender (Top-N per user)**


In this step, we build a **neighborhood-based recommender** using the previously computed **item–item cosine similarity matrix**.

1. **Identify liked items**  
   - Using the training data, we select items where the user’s rating is at least `LIKE_THRESHOLD = 4.0`.  
   - These items represent each user’s **explicit positive preferences**.

2. **Expand to candidate items via similarity**  
   - For every liked item, we join with `item_sims_full` to find **similar items** based on cosine similarity in the ALS latent space.  
   - This produces candidate `(user, candidate_item)` pairs with associated `cosine_sim` scores.

3. **Aggregate similarity scores**  
   - For each user–candidate combination, we **sum the cosine similarity** over all liked items.  
   - This aggregated `score` reflects how strongly a candidate item is related to the user’s overall liked item set.

4. **Filter out already-rated items**  
   - Using a `left_anti` join with the user’s training interactions, we remove any items the user has **already rated** to ensure only **new recommendations** are suggested.

5. **Top-N selection**  
   - A window function ranks candidate items by descending `score` for each user.  
   - The top `N = 10` items per user are selected as the final cosine-based recommendations.

6. **Mapping back to product IDs**  
   - The numeric `candidate_item` index is mapped back to the original `product_id` to make the recommendations interpretable.


In [None]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# 3.3.1 Define "like" threshold (assuming ratings 1–5)
LIKE_THRESHOLD = 4.0

# Use "rating" (singular)
user_likes = (
    train_df
    .filter(F.col("rating") >= LIKE_THRESHOLD)
    .select("user_index", "item_index")
    .dropDuplicates()
)

user_likes.show(5, truncate=False)

# 3.3.2 For each liked item, get similar items
user_item_candidates = (
    user_likes
    .join(
        item_sims_full,                  # has columns: item_i, item_j, cosine_sim
        user_likes.item_index == item_sims_full.item_i,
        how="inner"
    )
    .select(
        "user_index",
        F.col("item_j").alias("candidate_item"),
        "cosine_sim"
    )
)

# 3.3.3 Aggregate similarity scores per user–candidate_item
user_item_scores = (
    user_item_candidates
    .groupBy("user_index", "candidate_item")
    .agg(F.sum("cosine_sim").alias("score"))
)

# 3.3.4 Remove items already rated by the user in train_df
#      (we'll use LEFT_ANTI so we don't need any boolean '&' logic)

user_train_items = (
    train_df
    .select("user_index", "item_index")
    .dropDuplicates()
    .withColumnRenamed("item_index", "candidate_item")
)

user_item_scores = (
    user_item_scores
    .join(
        user_train_items,
        on=["user_index", "candidate_item"],
        how="left_anti"   # keep only items NOT in user_train_items
    )
)

# 3.3.5 For each user, pick Top N by score
N = 10
w = Window.partitionBy("user_index").orderBy(F.desc("score"))

cosine_recs = (
    user_item_scores
    .withColumn("rank", F.row_number().over(w))
    .filter(F.col("rank") <= N)
    .drop("rank")
)

cosine_recs.show(10, truncate=False)

# Map candidate_item → product_id
cosine_recs_exploded = (
    cosine_recs
    .join(
        item_lookup,   # item_lookup: item_index, product_id
        cosine_recs.candidate_item == item_lookup.item_index,
        how="left"
    )
    .select(
        "user_index",
        "candidate_item",
        "product_id",
        "score"
    )
)

cosine_recs_exploded.show(10, truncate=False)


+----------+----------+
|user_index|item_index|
+----------+----------+
|4906.0    |348.0     |
|3598.0    |901.0     |
|2189.0    |265.0     |
|300.0     |16.0      |
|2189.0    |93.0      |
+----------+----------+
only showing top 5 rows
+----------+--------------+------------------+
|user_index|candidate_item|score             |
+----------+--------------+------------------+
|0.0       |391           |511.2184434043622 |
|0.0       |1082          |511.2180893290076 |
|0.0       |858           |511.2139601488166 |
|0.0       |984           |511.2122941952645 |
|0.0       |1307          |511.2110212462923 |
|0.0       |1224          |511.2102819084249 |
|0.0       |826           |511.21002341945734|
|0.0       |176           |511.20626819047027|
|0.0       |1206          |511.2026114057317 |
|0.0       |829           |511.2023581469073 |
+----------+--------------+------------------+
only showing top 10 rows
+----------+--------------+----------+------------------+
|user_index|candida

## Compare ALS vs cosine (Precision@N)

**Build ground truth from test set**

To evaluate the quality of the recommendations, a **ground truth set of relevant items** is created from the **test dataset**. For each user:

- Items with a **rating greater than or equal to the defined `LIKE_THRESHOLD`** are treated as **relevant (positive) items**.
- These relevant items are grouped into a **set per user** using `collect_set`, ensuring no duplicates.

This ground truth represents what the user actually liked in the unseen test data and serves as the **reference benchmark** for computing ranking metrics such as Precision@N.


In [None]:
# 4.1 Ground truth: relevant items in TEST
relevant_test = test_df.filter(F.col("rating") >= LIKE_THRESHOLD) \
    .select("user_index", "item_index") \
    .groupBy("user_index") \
    .agg(F.collect_set("item_index").alias("relevant_items"))

relevant_test.show(5, truncate=False)


+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

**ALS Top-N list per user**

An user-wise list of ALS-recommended items that can be directly compared with the ground-truth relevant items from the test data.








In [None]:
# ALS already exploded: als_recs_exploded (user_index, item_index, product_id, predicted_rating)

als_topN = als_recs_exploded.groupBy("user_index").agg(
    F.collect_list("item_index").alias("als_recommended_items")
)

als_topN.show(5, truncate=False)


+----------+---------------------------------------------------------+
|user_index|als_recommended_items                                    |
+----------+---------------------------------------------------------+
|0         |[1117, 1093, 1123, 1019, 1077, 394, 882, 676, 631, 1245] |
|1         |[955, 1117, 1093, 1320, 1077, 394, 611, 1016, 514, 676]  |
|2         |[1178, 1117, 1258, 657, 1320, 1016, 882, 1062, 631, 603] |
|3         |[1178, 858, 1093, 1073, 1020, 1258, 657, 1019, 1282, 611]|
|4         |[1320, 1282, 1062, 514, 603, 880, 836, 726, 1321, 1171]  |
+----------+---------------------------------------------------------+
only showing top 5 rows


**Cosine Top-N list per user**


This step aggregates the **item-based cosine similarity recommendations** at the **user level**.

These user-level recommendation lists are structured to be directly compared against:
- The ALS recommendation lists, and
- The ground-truth relevant items from the test dataset,


In [None]:
cosine_topN = cosine_recs.groupBy("user_index").agg(
    F.collect_list("candidate_item").alias("cosine_recommended_items")
)

cosine_topN.show(5, truncate=False)


+----------+-------------------------------------------------------+
|user_index|cosine_recommended_items                               |
+----------+-------------------------------------------------------+
|0.0       |[391, 1082, 858, 984, 1307, 1224, 826, 176, 1206, 829] |
|14.0      |[391, 1082, 952, 976, 1098, 606, 526, 984, 1307, 826]  |
|22.0      |[1088, 858, 606, 1098, 391, 1224, 976, 1171, 837, 1203]|
|23.0      |[606, 1088, 836, 391, 1098, 976, 952, 1019, 1224, 1276]|
|32.0      |[391, 271, 1082, 837, 836, 1088, 1098, 952, 1224, 1019]|
+----------+-------------------------------------------------------+
only showing top 5 rows


**Precision@N computation**

In this step, the *ground truth relevant items*, *ALS Top-N recommendations*, and *cosine-based Top-N recommendations* are combined into a single evaluation table for each user.

1. Data Integration: The three components—`relevant_test`, `als_topN`, and `cosine_topN`—are joined on `user_index` to enable direct, user-wise comparison.

2. Precision@N Metric Definition
    $$ Precision@N = (Number of relevant items in Top-N) / (Total number of Top-N recommendations)$$

3. Model-wise Precision@N Computation

4. Aggregate Performance Comparison of Precision@N across all users;
  

This evaluation framework directly measures the **ranking quality** of both recommender systems on unseen test data.


In [None]:
from pyspark.sql.types import DoubleType

# Join all together
eval_df = relevant_test \
    .join(als_topN, on="user_index", how="left") \
    .join(cosine_topN, on="user_index", how="left")

# UDF for Precision@N
def precision_at_k(recommended, relevant):
    if not recommended:
        return 0.0
    recommended_set = set(recommended)
    relevant_set = set(relevant or [])
    if len(recommended_set) == 0:
        return 0.0
    hits = len(recommended_set & relevant_set)
    return float(hits) / float(len(recommended))

precision_udf = F.udf(precision_at_k, DoubleType())

eval_df = eval_df.withColumn(
    "als_precision",
    precision_udf(F.col("als_recommended_items"), F.col("relevant_items"))
).withColumn(
    "cosine_precision",
    precision_udf(F.col("cosine_recommended_items"), F.col("relevant_items"))
)

eval_df.select("user_index", "als_precision", "cosine_precision").show(10, truncate=False)

metrics = eval_df.agg(
    F.avg("als_precision").alias("avg_als_precision"),
    F.avg("cosine_precision").alias("avg_cosine_precision")
).collect()[0]

print("Average Precision@N (ALS):", metrics["avg_als_precision"])
print("Average Precision@N (Cosine):", metrics["avg_cosine_precision"])


+----------+-------------+----------------+
|user_index|als_precision|cosine_precision|
+----------+-------------+----------------+
|0.0       |0.7          |0.2             |
|1.0       |0.9          |0.1             |
|6.0       |0.8          |0.1             |
|4.0       |0.9          |0.0             |
|5.0       |0.7          |0.0             |
|8.0       |0.9          |0.0             |
|10.0      |0.9          |0.1             |
|7.0       |0.8          |0.2             |
|2.0       |0.8          |0.0             |
|9.0       |0.7          |0.1             |
+----------+-------------+----------------+
only showing top 10 rows
✅ Average Precision@N (ALS): 0.658420226984789
✅ Average Precision@N (Cosine): 0.0037400752193898846


**Key Insight:**
Model-based collaborative filtering (ALS) clearly outperforms memory-based cosine similarity on CTGAN-generated synthetic data.
This demonstrates that latent factor models are more robust than neighborhood methods when working with statistically generated data.