In [2]:
from pyspark.sql import SparkSession

from google.colab import drive
drive.mount('/content/drive')

spark = SparkSession.builder.appName("ReloadTrainDF").getOrCreate()

train_df = spark.read.parquet("/content/drive/MyDrive/datasets/train_df_parquet")

train_df.printSchema()
print("Line :",train_df.count())
train_df.show(5)

Mounted at /content/drive
root
 |-- visitor_id: integer (nullable = true)
 |-- item_id: integer (nullable = true)
 |-- event_ts: timestamp (nullable = true)
 |-- category_id: integer (nullable = true)
 |-- is_available: integer (nullable = true)
 |-- event_type: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- item_view: long (nullable = true)
 |-- item_addtocart: long (nullable = true)
 |-- item_transaction: long (nullable = true)
 |-- item_ctr: double (nullable = true)
 |-- user_view: long (nullable = true)
 |-- user_addtocart: long (nullable = true)
 |-- user_transaction: long (nullable = true)
 |-- user_conv_rate: double (nullable = true)

Line : 2751205
+----------+-------+-------------------+-----------+------------+----------+----+-------+---------+--------------+----------------+--------------------+---------+--------------+----------------+--------------+
|visitor_id|item_id|           event_ts|category_id|is_available|e

In [3]:
# Simple Recommender "popularity-based"
from pyspark.sql import functions as F

N=10
topN = (
    train_df.groupBy("item_id")
            .agg(F.sum("item_view").alias("views"))
            .orderBy("views", ascending=False)
            .limit(N)
    .select("item_id")
    .rdd.flatMap(lambda r: [r.item_id])
    .collect()
)

# Popularity recommender function
def recommend_popularity(user_id, k=N):
  return topN[:k]

# Example for an user
print("Recommendations for user 10 :", recommend_popularity(10))

Recommendations for user 10 : [187946, 461686, 5411, 370653, 219512, 298009, 96924, 309778, 257040, 384302]


In [6]:
# Train & Test split
seed = 42

train_df, test_df = train_df.randomSplit([0.8, 0.2], seed=seed)

print("Total events :", train_df.count() + test_df.count())
print("Train events :", train_df.count())
print("Test  events :", test_df.count())

Total events : 2751205
Train events : 2200937
Test  events : 550268


In [7]:
from pyspark.sql.types import DoubleType

# ground-truth
gt = (
    test_df.filter("event_type='transaction'")
           .groupBy("visitor_id")
           .agg(F.collect_set("item_id").alias("true_items"))
)
# Return the topN list
pred = gt.selectExpr("visitor_id", "array(%s) as pred_items" % ",".join(map(str, topN)))

recall_udf = F.udf(lambda t,p: float(len(set(t) & set(p))) /
                   len(t) if t else 0.0, DoubleType())

eval_df = (
    gt.join(pred, "visitor_id")
      .withColumn("recall_at_10", recall_udf("true_items","pred_items"))
)

eval_df = (
    gt.join(pred, "visitor_id")
      .withColumn("recall_at_10", recall_udf("true_items","pred_items"))
)

avg_recall = eval_df.agg(F.avg("recall_at_10")).first()[0]
print(f"Recall@10 mean (80/20 split) : {avg_recall:.4f}")

Recall@10 mean (80/20 split) : 0.0139


In [8]:
# Co-occurence recommender
from pyspark.sql import functions as F
from pyspark.sql.window import Window

views = (
    train_df
      .select("visitor_id","item_id")
      .distinct()
)

pairs = (
    views.alias("a")
         .join(views.alias("b"), on="visitor_id")
         .filter("a.item_id < b.item_id")
         .groupBy("a.item_id","b.item_id")
         .count()
         .withColumnRenamed("count","co_count")
)

sym_pairs = (
    pairs
      .select(F.col("a.item_id").alias("src"),
              F.col("b.item_id").alias("dst"),
              "co_count")
    .union(
      pairs
       .select(F.col("b.item_id").alias("src"),
               F.col("a.item_id").alias("dst"),
               "co_count")
    )
)


w = Window.partitionBy("src").orderBy(F.desc("co_count"))

item_sims = (
    sym_pairs
      .withColumn("rank", F.row_number().over(w))
      .filter("rank <= 100")
)


user_seen = views.alias("v")


recos = (
    user_seen
      .join(item_sims.alias("s"),
            user_seen.item_id == item_sims.src,
            how="left")
      .select("visitor_id","dst","co_count")
      .groupBy("visitor_id","dst")
      .agg(F.sum("co_count").alias("score"))
)


w2 = Window.partitionBy("visitor_id").orderBy(F.desc("score"))
user_recos = (
    recos
      .withColumn("rank", F.row_number().over(w2))
      .filter("rank <= 10")
      .select("visitor_id","dst","score")
      .orderBy("visitor_id","rank")
)


from pyspark.sql.functions import collect_list

pred_cos = (
    user_recos
      .groupBy("visitor_id")
      .agg(collect_list("dst").alias("pred_items"))
)

In [9]:
from pyspark.sql import functions as F
from pyspark.sql.functions import collect_set, when, col, array, lit, array_intersect, expr


gt = (
    test_df
      .filter("event_type = 'transaction'")
      .groupBy("visitor_id")
      .agg(collect_set("item_id").alias("true_items"))
)

gt = gt.withColumn("true_items", col("true_items").cast("array<string>"))

k = 10
pred_cos = pred_cos.withColumn(
    "pred_items", when(col("pred_items").isNull(), array().cast("array<string>")).otherwise(col("pred_items"))
)

eval_df = (
    gt.join(pred_cos, on="visitor_id", how="left")
      .withColumn("pred_k", expr(f"slice(pred_items, 1, {k})"))
      .withColumn("recall_at_10",
                  when(F.size("true_items") > 0,
                       F.size(array_intersect("true_items", "pred_k")) / F.size("true_items"))
                  .otherwise(lit(0.0))
      )
)

avg_recall = eval_df.agg(F.avg("recall_at_10")).first()[0]
print(f"Recall@{k} co-occurrence : {avg_recall:.4f}")


Recall@10 co-occurrence : 0.2006


In [12]:
from pyspark.ml.recommendation import ALS
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import col, array, array_intersect, size, when, lit, expr, collect_set


rating_df = (
    train_df
      .withColumn("user",   col("visitor_id").cast("long"))
      .withColumn("item",   col("item_id").cast("int"))
      .withColumn("rating",
           F.when(col("event_type")=="view",        1.0)
            .when(col("event_type")=="addtocart",   3.0)
            .when(col("event_type")=="transaction", 5.0)
            .otherwise(0.0)
      )
      .select("user","item","rating")
)

sample_frac = 1.0
sampled_df = rating_df.sample(fraction=sample_frac, seed=42).cache()
sampled_df.count()

als = ALS(
    userCol="user", itemCol="item", ratingCol="rating",
    implicitPrefs=True,
    rank=20,
    maxIter=6,
    regParam=0.1,
    alpha=5.0,
    coldStartStrategy="drop"
)
model = als.fit(sampled_df)

gt = (
    test_df
      .filter(col("event_type")=="transaction")
      .groupBy("visitor_id")
      .agg(collect_set("item_id").alias("true_items"))
      .withColumn("true_items", col("true_items").cast("array<int>"))
)
gt = gt.cache()
gt.count()

k = 10
users = gt.select(col("visitor_id").alias("user"))
recs = model.recommendForUserSubset(users, k)
pred = (
    recs
      .select(
         col("user").alias("visitor_id"),
         expr("transform(recommendations, x -> x.item)").alias("pred_items")
      )
      .withColumn("pred_items", F.coalesce(col("pred_items"), array().cast("array<int>")))
      .withColumn("pred_k", expr(f"slice(pred_items,1,{k})"))
      .withColumn("pred_k", when(col("pred_k").isNull(), array().cast("array<int>")).otherwise(col("pred_k")))
      .cache()
)

eval_df = (
    gt
      .join(F.broadcast(pred), on="visitor_id", how="left")
      .withColumn("recall_at_10",
          when(
            (size("true_items") > 0) & (size("pred_k") > 0),
            size(array_intersect("true_items", "pred_k")).cast("double") /
            size("true_items").cast("double")
          ).otherwise(lit(0.0))
      )
)

eval_df = eval_df.withColumn(
    "recall_at_10",
    when(col("recall_at_10").isNull(), lit(0.0))
    .when(col("recall_at_10") < 0, lit(0.0))
    .when(col("recall_at_10") > 1, lit(1.0))
    .otherwise(col("recall_at_10"))
)

avg_recall = eval_df.agg(F.avg("recall_at_10").alias("mean_recall")) \
                   .first()["mean_recall"]

print(f"Recall@{k} ALS = {avg_recall:.4f}")

gt.unpersist()
pred.unpersist()

Recall@10 ALS = 0.1855


DataFrame[visitor_id: int, pred_items: array<int>, pred_k: array<int>]