# Purchase Propensity Data Preprocessing (PySpark)

**Objective**: Transform raw events into a training table where 1 row = 1 November cart event.

**Strategy**:
- Training samples: Cart events from November only
- Target: `is_purchased` = 1 if user purchases same product after cart event

**Technology**: PySpark for distributed processing of large datasets

## 1. Setup and Imports

In [1]:
from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, TimestampType
import warnings
warnings.filterwarnings('ignore')

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Purchase Propensity Preprocessing") \
    .config("spark.driver.memory", "8g") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

print(f"Spark version: {spark.version}")
print(f"Spark UI: {spark.sparkContext.uiWebUrl}")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/01/18 20:36:25 WARN Utils: Your hostname, MacBook-Pro-cua-Le-3.local, resolves to a loopback address: 127.0.0.1; using 192.168.1.5 instead (on interface en0)
26/01/18 20:36:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/18 20:36:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark version: 4.1.1
Spark UI: http://192.168.1.5:4040


In [2]:
# Define schema for data loading
schema = StructType([
    StructField("event_time", TimestampType(), True),
    StructField("event_type", StringType(), True),
    StructField("product_id", LongType(), True),
    StructField("category_id", LongType(), True),
    StructField("category_code", StringType(), True),
    StructField("brand", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("user_id", LongType(), True),
    StructField("user_session", StringType(), True)
])

# Load November data
print("Loading November data")
df = spark.read.csv(
    '../data/raw/2019-Nov.csv.gz',
    schema=schema,
    header=True
)

nov_count = df.count()
print(f"Total: {nov_count:,} rows")

# Display schema
df.printSchema()

Loading November data


[Stage 0:>                                                          (0 + 1) / 1]

Total: 67,501,979 rows
root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: long (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: long (nullable = true)
 |-- user_session: string (nullable = true)



                                                                                

In [3]:
# Preview data
df.show(5, truncate=False)

+-------------------+----------+----------+-------------------+-------------------------+------+------+---------+------------------------------------+
|event_time         |event_type|product_id|category_id        |category_code            |brand |price |user_id  |user_session                        |
+-------------------+----------+----------+-------------------+-------------------------+------+------+---------+------------------------------------+
|2019-11-01 07:00:00|view      |1003461   |2053013555631882655|electronics.smartphone   |xiaomi|489.07|520088904|4d3b30da-a5e4-49df-b1a8-ba5943f1dd33|
|2019-11-01 07:00:00|view      |5000088   |2053013566100866035|appliances.sewing_machine|janome|293.65|530496790|8e5f4f83-366c-4f70-860e-ca7417414283|
|2019-11-01 07:00:01|view      |17302664  |2053013553853497655|NULL                     |creed |28.31 |561587266|755422e7-9040-477b-9bd2-6a6e8fd97387|
|2019-11-01 07:00:01|view      |3601530   |2053013563810775923|appliances.kitchen.washer|lg   

## 3. Data Profiling & Cleaning Steps

This block performs basic **EDA checks** and **data cleaning** to prepare the event log for downstream labeling/analysis.

**1) Dataset inspection**
- Prints total row count.
- Shows the distribution of `event_type` (counts per event type).
- Reports missing values per column by counting `NULL`s.
- Checks duplicates by comparing `df.count()` vs `df.distinct().count()`.

**2) Deduplication**
- Removes duplicate events using a key set:
  `user_id`, `user_session`, `event_time`, `event_type`, `product_id`
- Keeps only one record per unique key combination.

**3) Session validity filtering**
- Drops rows where `user_session` is missing or equals the string `"NULL"`.

**4) Normalization for categorical fields**
- Standardizes `brand` and `category_code` by:
  - converting to lowercase
  - filling missing values with `"unknown"`

**5) Basic cardinality + top categories**
- Computes the number of distinct `brand` and `category_code`.
- Displays the top 10 most frequent brands and category codes.

**6) Ordering and caching**
- Sorts the data by `user_id`, `product_id`, `event_time` (important for time-based logic later).
- Triggers caching via `df.count()` to speed up subsequent transformations/actions.

In [4]:
print("Initial dataset:")
print(f"Total rows: {df.count():,}")
print(f"\nEvent type distribution:")
df.groupBy('event_type').count().orderBy('count', ascending=False).show()

print(f"\nMissing values:")
df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

Initial dataset:


                                                                                

Total rows: 67,501,979

Event type distribution:


                                                                                

+----------+--------+
|event_type|   count|
+----------+--------+
|      view|63556110|
|      cart| 3028930|
|  purchase|  916939|
+----------+--------+


Missing values:


[Stage 10:>                                                         (0 + 1) / 1]

+----------+----------+----------+-----------+-------------+-------+-----+-------+------------+
|event_time|event_type|product_id|category_id|category_code|  brand|price|user_id|user_session|
+----------+----------+----------+-----------+-------------+-------+-----+-------+------------+
|         0|         0|         0|          0|     21898171|9218235|    0|      0|          10|
+----------+----------+----------+-----------+-------------+-------+-----+-------+------------+



                                                                                

In [5]:
total_rows = df.count()

distinct_rows = df.distinct().count()

print(f"Total rows    : {total_rows}")
print(f"Distinct rows : {distinct_rows}")
print(f"Duplicate rows: {total_rows - distinct_rows}")



Total rows    : 67501979
Distinct rows : 67401460
Duplicate rows: 100519


                                                                                

In [6]:
# Remove duplicate rows (Deduplication)
dedup_keys = [
    "user_id",
    "user_session",
    "event_time",
    "event_type",
    "product_id"
]

df = df.dropDuplicates(dedup_keys)

In [7]:
print("After deduplication:")
print("Total rows:", df.count())

After deduplication:




Total rows: 67401449


                                                                                

In [8]:
print("Dropping rows with missing user_session")
print("Normalizing for brand and category_code")

# Drop rows where user_session is null or string "NULL"
df = df.filter(
    (F.col("user_session").isNotNull()) & (F.col("user_session") != "NULL")
)

df = (
    df
    .withColumn(
        "brand",
        F.coalesce(F.lower(F.col("brand")), F.lit("unknown"))
    )
    .withColumn(
        "category_code",
        F.coalesce(F.lower(F.col("category_code")), F.lit("unknown"))
    )
)

# cache if reused many times
# df.cache()

brand_count = df.select("brand").distinct().count()
category_count = df.select("category_code").distinct().count()

print(f"Unique brands: {brand_count}")
print(f"Unique category_codes: {category_count}")

print("Top 10 brands:")
df.groupBy("brand").count().orderBy(F.desc("count")).show(10)

print("Top 10 category_codes:")
df.groupBy("category_code").count().orderBy(F.desc("count")).show(10)

Dropping rows with missing user_session
Normalizing for brand and category_code


                                                                                

Unique brands: 4202
Unique category_codes: 130
Top 10 brands:


                                                                                

+-------+-------+
|  brand|  count|
+-------+-------+
|unknown|9209175|
|samsung|7864082|
|  apple|6246469|
| xiaomi|4630595|
| huawei|1407728|
|lucente|1183398|
|     lg|1094788|
|  bosch| 973939|
|   oppo| 809948|
|   sony| 797670|
+-------+-------+
only showing top 10 rows
Top 10 category_codes:




+--------------------+--------+
|       category_code|   count|
+--------------------+--------+
|             unknown|21871420|
|electronics.smart...|16333971|
|electronics.video.tv| 2203947|
|  computers.notebook| 2178918|
|  electronics.clocks| 2085555|
|       apparel.shoes| 1885774|
|electronics.audio...| 1814146|
|appliances.enviro...| 1525426|
|appliances.kitche...| 1425046|
|appliances.kitche...| 1400187|
+--------------------+--------+
only showing top 10 rows


                                                                                

In [9]:
# Sort by user_id, product_id, event_time (critical for labeling)
print("Sorting data")
df = df.orderBy('user_id', 'product_id', 'event_time')
print("Data sorted by user_id, product_id, event_time")

df.count()  # Trigger caching
print("Data cached in memory")

Sorting data
Data sorted by user_id, product_id, event_time




Data cached in memory


                                                                                

## 4. Label Engineering (`is_purchased`)

**Cart-to-Purchase Attribution Logic**

This pipeline attributes each **purchase** event to the **most recent preceding cart** event within the same **user**, **product**, and **session**.

**Method overview:**
- Split the data into `cart` and `purchase` events.
- Join carts to purchases where they share the same user, product, and session, and where the cart happened before the purchase.
- For each purchase, select the **nearest cart in time** before it using a window function.
- Label those carts as `is_purchased = 1`; all other carts receive `0`.
- Compute summary statistics: total carts, credited carts, and cart-to-purchase conversion rate.

This ensures a **last-cart-before-purchase** attribution model at session level.

In [10]:
# 1. Tách carts và purchases
carts = df.filter(F.col("event_type") == "cart")
purchases = df.filter(F.col("event_type") == "purchase")

# 2. Join cart với purchase cùng user, product, session
#    và cart_time < purchase_time
cart_purchase = (
    carts.alias("c")
    .join(
        purchases.alias("p"),
        on=[
            F.col("c.user_id") == F.col("p.user_id"),
            F.col("c.product_id") == F.col("p.product_id"),
            F.col("c.user_session") == F.col("p.user_session"),
            F.col("c.event_time") < F.col("p.event_time"),
        ],
        how="inner"
    )
)

# 3. Với mỗi purchase, chọn cart gần nhất phía trước
w_last_cart = (
    Window
    .partitionBy(
        F.col("p.user_id"),
        F.col("p.product_id"),
        F.col("p.user_session"),
        F.col("p.event_time")   # anchor = purchase
    )
    .orderBy(F.col("c.event_time").desc())
)

cart_purchase = cart_purchase.withColumn(
    "rank_to_purchase",
    F.row_number().over(w_last_cart)
)

# 4. Cart được credit (rank = 1)
credited_carts = (
    cart_purchase
    .filter(F.col("rank_to_purchase") == 1)
    .select(
        F.col("c.user_id").alias("user_id"),
        F.col("c.product_id").alias("product_id"),
        F.col("c.user_session").alias("user_session"),
        F.col("c.event_time").alias("event_time")
    )
    .withColumn("is_purchased", F.lit(1))
)

# 5. Gán label về dataframe gốc
df = (
    df.join(
        credited_carts,
        on=["user_id", "product_id", "user_session", "event_time"],
        how="left"
    )
    .withColumn("is_purchased", F.coalesce(F.col("is_purchased"), F.lit(0)))
)

# 6. Stats
cart_stats = (
    df.filter(F.col("event_type") == "cart")
      .agg(
          F.count("*").alias("total_carts"),
          F.sum("is_purchased").alias("credited_carts"),
          F.avg("is_purchased").alias("conversion_rate")
      )
      .collect()[0]
)

print(f"Total cart events: {cart_stats['total_carts']:,}")
print(f"Credited cart events: {cart_stats['credited_carts']:,}")
print(f"Conversion rate: {cart_stats['conversion_rate']:.2%}")

[Stage 79:>                                                       (0 + 14) / 15]

Total cart events: 2,933,439
Credited cart events: 763,334
Conversion rate: 26.02%


                                                                                

In [11]:
# Drop Helper Columns
df = df.drop(
    "purchase_time",
    "has_future_purchase"
)

## 5. Split Category Code

- Split `category_code` into `category_code_level1` and `category_code_level2`

In [12]:
df = df.withColumn(
    "category_code_level1",
    F.when(F.col("category_code").isNull(), F.lit("unknown"))
     .otherwise(F.split(F.col("category_code"), r"\.")[0])
)

df = df.withColumn(
    "category_code_level2",
    F.when(F.col("category_code").isNull(), F.lit("unknown"))
     .otherwise(
         F.when(F.size(F.split(F.col("category_code"), r"\.")) > 1,
                F.split(F.col("category_code"), r"\.")[1]
         ).otherwise(F.lit("unknown"))
     )
)

## 6. Create Event Weekday

- Create `event_weekday` column from `event_time`



In [13]:
# Monday = 0, Sunday = 6
df = df.withColumn(
    "event_weekday",
    F.dayofweek("event_time") - 2
)

# Fix Sunday (Spark: Sunday = 1 → -1)
df = df.withColumn(
    "event_weekday",
    F.when(F.col("event_weekday") == -1, 6)
     .otherwise(F.col("event_weekday"))
)

## 7. Create Activity Count (session-level engagement)

- Create `activity_count` column from `event_time`

In [14]:
# Window theo session, nhìn về quá khứ
w_session_past = (
    Window
    .partitionBy("user_session")
    .orderBy("event_time")
    .rowsBetween(Window.unboundedPreceding, 0)
)

# Đếm số event đến thời điểm hiện tại
df = df.withColumn(
    "activity_count",
    F.count("*").over(w_session_past)
)

## 8. Final Sample Definition

- Filter cart events

In [15]:
print("Filtering to cart events")

training_samples = df.filter((F.col("event_type") == "cart"))

training_stats = training_samples.agg(
    F.count("*").alias("total_samples"),
    F.sum("is_purchased").alias("positive_samples"),
    F.avg("is_purchased").alias("conversion_rate")
).collect()[0]

print(f"Training samples (Nov cart events, price > 0): {training_stats['total_samples']:,}")
print(f"Positive samples (is_purchased=1): {training_stats['positive_samples']:,}")
print(f"Conversion rate: {training_stats['conversion_rate']:.2%}")

training_samples = training_samples.cache()
training_samples.count()
print("\nTraining samples cached")

Filtering to cart events


                                                                                

Training samples (Nov cart events, price > 0): 2,933,439
Positive samples (is_purchased=1): 763,334
Conversion rate: 26.02%





Training samples cached


                                                                                

In [16]:
#Final dataset summary
print("Final dataset summary:")

final_stats = training_samples.agg(
    F.count('*').alias('total'),
    F.sum('is_purchased').alias('positive'),
    F.sum(F.when(F.col('is_purchased') == 0, 1).otherwise(0)).alias('negative'),
    F.avg('is_purchased').alias('positive_rate')
).collect()[0]

print(f"Total training samples: {final_stats['total']:,}")
print(f"Positive class (is_purchased=1): {final_stats['positive']:,} ({final_stats['positive_rate']:.2%})")
print(f"Negative class (is_purchased=0): {final_stats['negative']:,} ({1-final_stats['positive_rate']:.2%})")
print(f"\nFeatures available: {training_samples.columns}")

Final dataset summary:
Total training samples: 2,933,439
Positive class (is_purchased=1): 763,334 (26.02%)
Negative class (is_purchased=0): 2,170,105 (73.98%)

Features available: ['user_id', 'product_id', 'user_session', 'event_time', 'event_type', 'category_id', 'category_code', 'brand', 'price', 'is_purchased', 'category_code_level1', 'category_code_level2', 'event_weekday', 'activity_count']


## 9. Save Processed Data

In [17]:
# Select relevant columns for output
# event_time is required for Feast's event_timestamp
output_columns = [
    "event_time",
    "user_id",
    "product_id",
    "category_code_level1",
    "category_code_level2",
    "brand",
    "event_weekday",
    "price",
    "activity_count",
    "is_purchased"
]

# Sample and convert to Pandas for single CSV file output
df_output = (
    training_samples
    .select(*output_columns)
    .toPandas()
)

# Save as single CSV file (not folder)
output_path = "../data/processed/df_processed_pyspark_v1.csv"
df_output.to_csv(output_path, index=False)
print(f"Saved {len(df_output):,} rows to {output_path}")

Saved 2,933,439 rows to ../data/processed/df_processed_pyspark_v1.csv


## Summary

✅ **Completed Steps**:
1. Initialized PySpark session with optimized configuration
2. Loaded Nov + Dec data with explicit schema definition
3. Preprocessed data (null removal, brand normalization, sorting)
4. Engineered `is_purchased` label using window functions
5. Split Category Code into Level 1 and Level 2
6. Computed anti-leakage session features using window functions
7. Verified labeling correctness and data quality
8. Saved processed training data (CSV and Parquet formats)

**PySpark Advantages**:
- Distributed processing for large datasets
- Lazy evaluation for query optimization
- Automatic memory management
- Native support for window functions
- Parquet format for efficient storage and retrieval

**Next Steps**:
- Perform EDA on processed data
- Engineer additional features (user history, product popularity, etc.)
- Train purchase propensity models using Spark MLlib
- Evaluate model performance

In [18]:
# # Clean up and stop Spark session
# # Uncomment when done
spark.stop()