# 1. Item to Item

In [None]:
import pandas as pd
from pyspark.sql import SparkSession, Window
import pyspark.sql.functions as F

# ─── 1) Start Spark Session ─────────────────────────────────────────────────────
spark = (
    SparkSession
    .builder
    .appName("ItemItemCF")
    .config("spark.sql.shuffle.partitions", "200")   # tune shuffle parallelism
    .getOrCreate()
)

# A) Extract only the two columns we need
interactions = pd.read_parquet(
    "../datasets/train_clicks.parquet",
    columns=["user_id", "click_article_id"],
    engine="pyarrow"
)

# Rename for clarity
interactions = interactions.rename(columns={"click_article_id":"item_id"})

# Write back out, coalescing to a single file
interactions.to_parquet(
    "../datasets/train_interactions.parquet",
    index=False,
    engine="pyarrow",
    coerce_timestamps="ms",            # force any timestamps → ms
    allow_truncated_timestamps=True    # just in case
)
print("Wrote minimal interactions to ../datasets/train_interactions.parquet")

# ─── B) Extract minimal validation hold‑out ─────────────────────────────────────
val = pd.read_parquet(
    "../datasets/valid_clicks.parquet",
    columns=["user_id", "click_article_id"],
    engine="pyarrow"
).rename(columns={"click_article_id": "true_item"})

val.to_parquet(
    "../datasets/valid_interactions.parquet",
    index=False,
    engine="pyarrow",
    coerce_timestamps="ms",
    allow_truncated_timestamps=True
)
print("Wrote minimal interactions to ../datasets/valid_interactions.parquet")


In [None]:
# ─── 2) Configuration ────────────────────────────────────────────────────────────
TRAIN_FP     = "../datasets/train_clicks.parquet"
NEIGHBORS_FP = "../datasets/item_neighbors.parquet"
TOP_K_NEIGH  = 50   # number of neighbors to keep per item

In [None]:
# ─── 3) Load only the minimal interactions DataFrame ────────────────────────────
interactions = spark.read.parquet("../datasets/train_interactions.parquet")


In [None]:
# from pyspark.sql import SparkSession, Window
# import pyspark.sql.functions as F

# # ─── 1) Start Spark Session ─────────────────────────────────────────────────────
# spark = (
#     SparkSession
#     .builder
#     .appName("ItemItemCF")
#     .config("spark.sql.shuffle.partitions", "200")   # tune shuffle parallelism
#     .getOrCreate()
# )

# # ─── 2) Configuration ────────────────────────────────────────────────────────────
# TRAIN_FP     = "../datasets/train_clicks.parquet"
# NEIGHBORS_FP = "../datasets/item_neighbors.parquet"
# TOP_K_NEIGH  = 50   # number of neighbors to keep per item

# # ─── 3) Load train clicks DataFrame ─────────────────────────────────────────────
# clicks = spark.read.parquet(TRAIN_FP)

# # Keep only the fields we need for CF
# interactions = (
#     clicks
#     .select("user_id", F.col("click_article_id").alias("item_id"))
#     .distinct()  # one interaction per user‐item
# )

# ─── 4) Compute total clicks per item for normalization ─────────────────────────
item_counts = (
    interactions
    .groupBy("item_id")
    .agg(F.count("*").alias("n_i"))
)

# ─── 5) Generate co‐click counts via self‐join on user_id ────────────────────────
pairs = (
    interactions.alias("a")
    .join(interactions.alias("b"), on="user_id")
    .where(F.col("a.item_id") < F.col("b.item_id"))
    .groupBy("a.item_id", "b.item_id")
    .agg(F.count("*").alias("co_count"))
)

# ─── 6) Join item counts to compute cosine similarity ───────────────────────────
pairs = (
    pairs
    # join on 'a' counts
    .join(
        item_counts.withColumnRenamed("item_id", "i").withColumnRenamed("n_i", "n_i"),
        pairs["a.item_id"] == F.col("i")
    )
    .drop("i")
    # join on 'b' counts
    .join(
        item_counts.withColumnRenamed("item_id", "j").withColumnRenamed("n_i", "n_j"),
        pairs["b.item_id"] == F.col("j")
    )
    .drop("j")
    # cosine similarity = co_count / sqrt(n_i * n_j)
    .withColumn("sim", F.col("co_count") / F.sqrt(F.col("n_i") * F.col("n_j")))
)

# ─── 7) For each item, keep top‐TOP_K_NEIGH neighbors ────────────────────────────
window_spec = Window.partitionBy("a.item_id").orderBy(F.col("sim").desc())

item_neighbors = (
    pairs
    .withColumn("rank", F.row_number().over(window_spec))
    .filter(F.col("rank") <= TOP_K_NEIGH)
    .select(
        F.col("a.item_id").alias("item_id"),
        F.col("b.item_id").alias("neighbor_id"),
        "sim"
    )
)

# ─── 8) Persist the neighbor table ──────────────────────────────────────────────
item_neighbors.write.mode("overwrite").parquet(NEIGHBORS_FP)

# ─── 9) Stop Spark ──────────────────────────────────────────────────────────────
spark.stop()


In [None]:
# from a terminal or notebook Python
!rm -rf "../datasets/cf_item_recs_top5"
!rm -rf "../datasets/cf_item_recs_top10"


In [None]:
from pyspark.sql import SparkSession, Window
import pyspark.sql.functions as F

# ─── (Re)Start Spark ─────────────────────────────────────────────────────────────
spark = (
    SparkSession
    .builder
    .appName("ItemItemCF_and_Eval")
    .config("spark.sql.shuffle.partitions", "200")
    .config("spark.driver.memory",      "32g")    # bump driver JVM to 32 GB
    .config("spark.executor.memory",    "32g")    # (in local mode same JVM)
    .config("spark.sql.autoBroadcastJoinThreshold", "-1")
    .getOrCreate()
)

# Now you can read the Parquet files
train_int = spark.read.parquet("../datasets/train_interactions.parquet")
val       = spark.read.parquet("../datasets/valid_interactions.parquet")


In [None]:

# # ─── 1) Read in the minimal train interactions ──────────────────────────────────
# #    (we wrote this earlier via pandas to drop timestamps)
# train_int = spark.read.parquet("../datasets/train_interactions.parquet")  
# # schema: (user_id: long, item_id: long)

# ─── 2) Read the precomputed item-neighbors ─────────────────────────────────────
neighbors = spark.read.parquet("../datasets/item_neighbors.parquet")
# schema: (item_id: long, neighbor_id: long, sim: double)

# # ─── 3) Read your validation hold-out (one click per user) ─────────────────────
# val = (
#     spark.read.parquet("../datasets/valid_clicks.parquet")
#          .select("user_id", F.col("click_article_id").alias("true_item"))
# )

# ─── 4) Remove the hold-out from your training set (just in case) ──────────────
train_minus_val = train_int.join(val, on="user_id", how="left_anti")

# ─── 5) Explode each user’s clicks into neighbor candidates ─────────────────────
cands = (
    train_minus_val
    .join(neighbors, train_minus_val.item_id == neighbors.item_id)
    .select(
      "user_id",
      neighbors.neighbor_id.alias("candidate"),
      "sim"
    )
)

# ─── 6) Aggregate scores per (user, candidate) ─────────────────────────────────
scores = (
    cands
    .groupBy("user_id", "candidate")
    .agg(F.sum("sim").alias("score"))
)

# ─── 7) Filter out anything the user already clicked in TRAIN  ──────────────────
scores_filtered = (
    scores
    .join(train_minus_val,
          (scores.user_id == train_minus_val.user_id) &
          (scores.candidate == train_minus_val.item_id),
          how="left_anti")
)

# ─── 9) Rank and take top-5 / top-10 ────────────────────────────────────────────
window = Window.partitionBy("user_id").orderBy(F.col("score").desc())

recs = scores_filtered.withColumn("rank", F.row_number().over(window))

top5  = recs.filter("rank <= 5")
top10 = recs.filter("rank <= 10")

# ─── 10) Persist your recommendations ───────────────────────────────────────────
NUM_PARTS = 200
top5 \
    .repartition(NUM_PARTS) \
    .write \
    .mode("overwrite") \
    .parquet("../datasets/cf_item_recs_top5.parquet")

top10 \
    .repartition(NUM_PARTS) \
    .write \
    .mode("overwrite") \
    .parquet("../datasets/cf_item_recs_top10.parquet")

spark.stop()


In [None]:
spark.stop()

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# ─── 1) (Re)start Spark ─────────────────────────────────────────────────────────
spark = SparkSession.builder.appName("EvalRecall").getOrCreate()

# ─── 2) Load validation true-item and CF recs ─────────────────────────────────
val   = spark.read.parquet("../datasets/valid_interactions.parquet")
recs5 = spark.read.parquet(
    "../datasets/cf_item_recs_top5.parquet", header=True, inferSchema=True
)
recs10 = spark.read.parquet(
    "../datasets/cf_item_recs_top10.parquet", header=True, inferSchema=True
)

# ─── 3) Compute hits per user ───────────────────────────────────────────────────
hits5  = recs5.join(
    val,
    (recs5.user_id == val.user_id) &
    (recs5.candidate == val.true_item),
    how="inner"
).select(recs5.user_id).distinct()

hits10 = recs10.join(
    val,
    (recs10.user_id == val.user_id) &
    (recs10.candidate == val.true_item),
    how="inner"
).select(recs10.user_id).distinct()

# ─── 4) Compute recall metrics ─────────────────────────────────────────────────
num_users = val.select("user_id").distinct().count()
recall5   = hits5.count()  / num_users
recall10  = hits10.count() / num_users

print(f"Recall@5:  {recall5:.4f}")
print(f"Recall@10: {recall10:.4f}")

spark.stop()


In [1]:
from pyspark.sql import SparkSession

spark = (
    SparkSession
      .builder
      .master("local[*]")                        # <— run locally on all cores
      .appName("ItemItemCF")
      .config("spark.driver.memory", "24g")
      .config("spark.sql.shuffle.partitions", "200")
      .config("spark.sql.autoBroadcastJoinThreshold", "-1")
      .config("spark.speculation", "false")
      .getOrCreate()
)
spark.sparkContext.setLogLevel("ERROR")

25/04/30 23:58:37 WARN Utils: Your hostname, pysparkcf-ii58cents resolves to a loopback address: 127.0.0.1; using 10.0.0.4 instead (on interface eth0)
25/04/30 23:58:37 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/30 23:58:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [None]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType, FloatType

# ─── 1) Load interactions (user↔item) ───────────────────────────────────────────
# We'll read only user_id and item_id (no timestamps) to avoid the Parquet timestamp issue.
clicks = spark.read.parquet("../datasets/train_clicks.parquet") \
              .select("user_id", F.col("click_article_id").alias("item_id")) \
              .distinct()

# ─── 2) Compute item–item co-occurrence and cosine similarity ────────────────
#  2a) For each (user, item) pair, collect the set of items they saw.
#      Actually we already have one record per (user,item), so:
userItems = clicks

#  2b) Self-join on user_id to get all item–item pairs per user
pairs = (
  userItems.alias("a")
           .join(userItems.alias("b"), on="user_id")
           .where(F.col("a.item_id") < F.col("b.item_id"))  # only one direction
           .select(
               F.col("a.item_id").alias("item_i"),
               F.col("b.item_id").alias("item_j")
           )
)

#  2c) Count co-occurrences and compute norms
coCounts = (
  pairs
    .groupBy("item_i", "item_j")
    .count()
)

#  To get cosine sim, we need each item’s total occurrences:
itemCounts = clicks.groupBy("item_id").count().withColumnRenamed("count", "ni")
# Join to get n_i and n_j
sims = (
  coCounts
    .join(itemCounts.withColumnRenamed("item_id","item_i"), on="item_i")
    .join(itemCounts.withColumnRenamed("item_id","item_j"), on="item_j")
    .withColumn("cosine_sim", F.col("count") / (F.sqrt(F.col("ni")*F.col("nj"))))
    .select("item_i","item_j","cosine_sim")
)

# ─── 3) For each item, grab top-50 neighbors ─────────────────────────────────
window50 = Window.partitionBy("item_i").orderBy(F.col("cosine_sim").desc)
neighbors = (
  sims
    .withColumn("rank", F.row_number().over(window50))
    .where(F.col("rank") <= 50)
    .select("item_i","item_j","cosine_sim")
)
neighbors.write.mode("overwrite").parquet("../datasets/item_neighbors.parquet")

# ─── 4) Scoring for validation users ─────────────────────────────────────────
# Load neighbors and your val interactions
neigh = spark.read.parquet("../datasets/item_neighbors.parquet")
val   = spark.read.parquet("../datasets/valid_clicks.parquet") \
              .select("user_id", F.col("click_article_id").alias("true_item"))

# Build each user's candidate set by:
#   – Exploding their history
#   – Joining to neighbors
#   – Summing scores per candidate
recs = (
  clicks.alias("his")                               # user's history
        .join(neigh, on=(clicks.item_id == neigh.item_i))
        .groupBy("user_id", "item_j")
        .agg(F.sum("cosine_sim").alias("score"))
        .withColumnRenamed("item_j","candidate_item")
)

# Remove already-seen:
recs_filtered = (
  recs.join(clicks, 
            (recs.user_id == clicks.user_id) & 
            (recs.candidate_item == clicks.item_id), 
            how="left_anti")
)

# Rank and pick top-K
windowK = Window.partitionBy("user_id").orderBy(F.col("score").desc)
final = (
  recs_filtered
    .withColumn("rank", F.row_number().over(windowK))
    .where(F.col("rank") <= 10)  # you can do <=5 for top-5 separate if you like
)

# ─── 5) Evaluate Recall@5/10 ────────────────────────────────────────────────
# Bring in the ground truth and check for hits
joined = final.join(val, 
                    (final.user_id == val.user_id) &
                    (final.candidate_item == val.true_item),
                    how="left")
totalUsers = val.select("user_id").distinct().count()

for K in (5,10):
  hits = joined.where(F.col("rank") <= K).filter(F.col("true_item").isNotNull()).count()
  print(f"Recall@{K}: {hits/totalUsers:.4f}")


In [None]:
# ─────────────────────────────────────────────────────────────────────────────
# 0) Imports & start Spark with sane configs
# ─────────────────────────────────────────────────────────────────────────────
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, FloatType

spark = (
    SparkSession
    .builder
    .appName("ItemItemCF")
    # put scratch shuffle files on /mnt/data if available
    .config("spark.local.dir", "/mnt/data/spark-local")
    # disable small broadcast joins
    .config("spark.sql.autoBroadcastJoinThreshold", "-1")
    # disable speculative (to avoid file‐delete races)
    .config("spark.speculation", "false")
    # tune shuffle parallelism to about 200 partitions
    .config("spark.sql.shuffle.partitions", "200")
    # give the driver plenty of heap
    .config("spark.driver.memory", "24g")
    .getOrCreate()
)
spark.sparkContext.setLogLevel("ERROR")

# ─────────────────────────────────────────────────────────────────────────────
# 1) Load minimal train + validation interactions
# ─────────────────────────────────────────────────────────────────────────────
# only pull user_id and click_article_id (drop timestamps entirely)
train = (
    spark.read
         .parquet("../datasets/train_clicks.parquet")
         .select("user_id", "click_article_id")
         .withColumnRenamed("click_article_id", "item_id")
         .distinct()
)
val = (
    spark.read
         .parquet("../datasets/valid_clicks.parquet")
         .select("user_id", "click_article_id")
         .withColumnRenamed("click_article_id", "item_id")
         .distinct()
)

# build a map of held‐out items for recall calculation
heldout = val.cache()

# ─────────────────────────────────────────────────────────────────────────────
# 2) Compute item–item co-occurrence & similarity
#    For scalability we only keep top-50 neighbors per item.
# ─────────────────────────────────────────────────────────────────────────────
# (a) join train to itself on user, filter item1 < item2 to avoid duplicates
pairs = (
    train.alias("A")
         .join(train.alias("B"), on="user_id")
         .where(F.col("A.item_id") < F.col("B.item_id"))
         .select(
             F.col("A.item_id").alias("i1"),
             F.col("B.item_id").alias("i2")
         )
)

# (b) count co-occurrence
co = (
    pairs
    .groupBy("i1", "i2")
    .count()
)

# (c) get individual item support counts
item_counts = (
    train.groupBy("item_id")
         .count()
         .withColumnRenamed("count", "ni")
)

# (d) join counts & compute cosine‐style similarity:
#     sim(i1,i2) = co_count / sqrt(count(i1)*count(i2))
sim = (
    co
    .join(item_counts.withColumnRenamed("item_id","i1"), on="i1")
    .join(item_counts.withColumnRenamed("item_id","i2"), on="i2")
    .withColumn(
        "sim",
        F.col("count") / F.sqrt(F.col("ni") * F.col("nj"))
    )
    .select("i1","i2","sim")
)

# (e) get top-50 neighbors per i1 and per i2 (symmetrically)
topN = (
    sim
    .unionByName(sim.select(F.col("i2").alias("i1"),
                             F.col("i1").alias("i2"),
                             "sim"))
    # window to pick top 50 per i1
    .withColumn(
        "rank",
        F.row_number().over(
           Window.partitionBy("i1")
                 .orderBy(F.col("sim").desc())
        )
    )
    .where(F.col("rank") <= 50)
    .select("i1","i2","sim")
    .cache()
)

# ─────────────────────────────────────────────────────────────────────────────
# 3) For each user, collect neighborhood scores from all items she’s seen
# ─────────────────────────────────────────────────────────────────────────────
# explode user’s train items → join to neighbors → aggregate score
cand = (
    train.alias("T")
         .join(topN.alias("N"), F.col("T.item_id")==F.col("N.i1"))
         .select(
             "user_id",
             F.col("i2").alias("candidate"),
             "sim"
         )
         # sum up similarities from all seed items
         .groupBy("user_id","candidate")
         .agg(F.sum("sim").alias("score"))
)

# ─────────────────────────────────────────────────────────────────────────────
# 4) Filter out already‐seen and held-out items, pick top-K
# ─────────────────────────────────────────────────────────────────────────────
# (a) anti-join seen
cand_filtered = (
    cand.join(
      train.withColumnRenamed("item_id","candidate"),
      on=["user_id","candidate"], how="left_anti"
    )
)

# (b) for Recall compute against held‐out, but to produce recs we also
#     filter out any that are in heldout (so we’re not “predicting” them twice)
cand_filtered = (
    cand_filtered.join(
      heldout.withColumnRenamed("item_id","candidate"),
      on=["user_id","candidate"], how="left_anti"
    )
)

# (c) pick top-5 & top-10 per user
window = Window.partitionBy("user_id").orderBy(F.col("score").desc())
recs = (
    cand_filtered
      .withColumn("rank", F.row_number().over(window))
      .filter(F.col("rank") <= 10)
      .select("user_id","candidate","score","rank")
      .withColumnRenamed("candidate","article_id")
)

top5  = recs.filter(F.col("rank") <= 5)
top10 = recs.filter(F.col("rank") <= 10)

# ─────────────────────────────────────────────────────────────────────────────
# 5) Persist results (Parquet, coalesced to ~50 files)
# ─────────────────────────────────────────────────────────────────────────────
top5 .coalesce(50).write.mode("overwrite").parquet("../datasets/cf_item_recs_top5.parquet")
top10.coalesce(50).write.mode("overwrite").parquet("../datasets/cf_item_recs_top10.parquet")

# ─────────────────────────────────────────────────────────────────────────────
# 6) Compute Recall@5 / Recall@10 on held-out
# ─────────────────────────────────────────────────────────────────────────────
# join recs to held-out → count hits per user → average
for K in (5,10):
    hits = (
        recs.filter(F.col("rank") <= K)
            .join(heldout, ["user_id","article_id"])
            .groupBy("user_id")
            .count()
            .withColumnRenamed("count","hits")
    )
    total_users = heldout.select("user_id").distinct().count()
    recall = hits.agg(F.avg("hits")/1.0).collect()[0][0]  # since each user has exactly one held-out
    print(f"Recall@{K}: {recall:0.4f}")

spark.stop()
