<a href="https://colab.research.google.com/github/LAworkspace/retail-recommender-MLOPS/blob/main/retail_rocket_feature_engineering.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 🧠 Retail Rocket Dynamic Pricing: Feature Engineering Notebook

**Sections Covered:**
- 🧱 Section 1: Load & Preprocess Raw Data
- 📊 Section 2: Feature Engineering

📦 Dataset: [Retail Rocket E-commerce Dataset](https://www.kaggle.com/datasets/retailrocket/ecommerce-dataset)

🔧 Libraries used: `pandas`, `polars`, `datetime`, `numpy`


In [1]:
!pip install polars pandas



In [2]:
import pandas as pd
import polars as pl
import numpy as np
from datetime import datetime, timedelta
import os
import matplotlib.pyplot as plt
from tqdm import tqdm
tqdm.pandas()

## 🧱 Section 1: Load & Preprocess Raw Data

In [3]:
# Define schema to ensure correct parsing
schema = {
    'timestamp': pl.Int64,
    'itemid': pl.Int64,
    'property': pl.Utf8,
    'value': pl.Utf8,
}

# Load both parts with ragged line handling
item_props1 = pl.read_csv('/content/item_properties_part1.csv', schema=schema, truncate_ragged_lines=True)
item_props2 = pl.read_csv('/content/item_properties_part2.csv', schema=schema, truncate_ragged_lines=True)

# Merge and convert to pandas
item_properties = pl.concat([item_props1, item_props2]).to_pandas()

# Load events
events = pd.read_csv('/content/events.csv')


In [4]:
# Convert timestamps from ms to datetime
events['timestamp'] = pd.to_datetime(events['timestamp'], unit='ms')
events['timestamp_hour'] = events['timestamp'].dt.floor('h')

item_properties['timestamp'] = pd.to_datetime(item_properties['timestamp'], unit='ms')
item_properties['timestamp_hour'] = item_properties['timestamp'].dt.floor('h')


In [5]:
# Separate by event type
views = events[events['event'] == 'view']
carts = events[events['event'] == 'addtocart']
purchases = events[events['event'] == 'transaction']

# Group by item + hour
views_agg = views.groupby(['itemid', 'timestamp_hour']).size().reset_index(name='views')
carts_agg = carts.groupby(['itemid', 'timestamp_hour']).size().reset_index(name='carts')
purchases_agg = purchases.groupby(['itemid', 'timestamp_hour']).size().reset_index(name='purchases')


In [6]:
# Merge and fill missing values
agg_df = views_agg.merge(carts_agg, on=['itemid', 'timestamp_hour'], how='left')
agg_df = agg_df.merge(purchases_agg, on=['itemid', 'timestamp_hour'], how='left')
agg_df[['carts', 'purchases']] = agg_df[['carts', 'purchases']].fillna(0)

# Compute conversion metrics
agg_df['cart_conversion_rate'] = agg_df['carts'] / agg_df['views']
agg_df['purchase_conversion_rate'] = agg_df['purchases'] / agg_df['views']


In [7]:
import polars as pl

# Load both CSVs
part1 = pl.read_csv("/content/item_properties_part1.csv", has_header=True)
part2 = pl.read_csv("/content/item_properties_part2.csv", has_header=True)

# Make sure both have the same column names
part1.columns = ['timestamp', 'itemid', 'property', 'value']
part2.columns = ['timestamp', 'itemid', 'property', 'value']

# Concatenate them
item_properties = pl.concat([part1, part2])



In [8]:
item_properties = item_properties.with_columns([
    pl.col("timestamp").cast(pl.Datetime("ms")).alias("timestamp_hour")
])


In [9]:
import polars as pl

# Load the item properties file (if not already loaded)
item_props1 = pl.read_csv("/content/item_properties_part1.csv")
item_props2 = pl.read_csv("/content/item_properties_part2.csv")

# Merge both parts
item_properties = pl.concat([item_props1, item_props2])

# Ensure timestamp is in datetime format
item_properties = item_properties.with_columns([
    pl.col("timestamp").cast(pl.Datetime(time_unit="ms"))
])

# Optional: filter only relevant properties
relevant_props = ["categoryid", "brand", "price"]
item_properties = item_properties.filter(pl.col("property").is_in(relevant_props))

# Get the latest value for each item-property pair
latest_values = (
    item_properties.sort("timestamp")  # sort so latest comes last
    .group_by(["itemid", "property"])
    .agg(pl.last("value").alias("latest_value"))
)

# Pivot to wide format: one row per itemid, columns = properties
pivoted_item_properties = (
    latest_values.pivot(
        index="itemid",
        columns="property",
        values="latest_value"
    )
)

# Preview result
pivoted_item_properties.head()


  latest_values.pivot(


itemid,categoryid
i64,str
192559,"""1113"""
1884,"""330"""
21714,"""842"""
258802,"""1248"""
448227,"""1098"""


In [10]:
# Final: create category-only pivot and clean types
category_only = (
    item_properties
    .filter(pl.col("property") == "categoryid")
    .sort("timestamp")
    .group_by("itemid")
    .agg(pl.last("value").alias("categoryid"))
    .with_columns([
        pl.col("categoryid").cast(pl.Int64, strict=False)
    ])
)


In [11]:
SCHEMA = {
    "categoryid": {"dtype": pl.Int64, "clean": "strip_quotes"}
}

def pivot_item_properties(props_df, schema):
    # Filter only properties of interest
    props_df = props_df.filter(pl.col("property").is_in(list(schema.keys())))

    # Get the latest value for each item-property combination
    latest = (
        props_df.sort("timestamp", descending=True)
        .unique(subset=["itemid", "property"])
    )

    # Pivot so each property becomes a column
    pivoted = latest.pivot(index="itemid", on="property", values="value")

    # Sanitize and convert data types
    for key, cfg in schema.items():
        if key in pivoted.columns:
            col = pl.col(key)
            if cfg["clean"] == "strip_quotes":
                col = col.str.strip_chars('"')
            elif cfg["clean"] == "coerce_float":
                col = col.cast(pl.Float64, strict=False)
            col = col.cast(cfg["dtype"], strict=False)
            pivoted = pivoted.with_columns([col.alias(key)])
        else:
            print(f"⚠️ Warning: '{key}' not found in pivoted columns.")

    return pivoted

item_meta = pivot_item_properties(item_properties, SCHEMA).to_pandas()


In [12]:
# Check distinct properties in the combined item_properties
print(item_properties.select("property").unique().sort("property"))


shape: (1, 1)
┌────────────┐
│ property   │
│ ---        │
│ str        │
╞════════════╡
│ categoryid │
└────────────┘


In [13]:
events = events.merge(item_meta, how="left", on="itemid")

In [14]:
events["hour_of_day"] = events["timestamp"].dt.hour
events["day_of_week"] = events["timestamp"].dt.dayofweek

In [15]:
print(events.columns)


Index(['timestamp', 'visitorid', 'event', 'itemid', 'transactionid',
       'timestamp_hour', 'categoryid', 'hour_of_day', 'day_of_week'],
      dtype='object')


In [16]:
import polars as pl
from datetime import timedelta

# Load events
events = pl.read_csv("/content/events.csv")

# Convert timestamp to datetime
events = events.with_columns([
    pl.col("timestamp").cast(pl.Datetime("ms"))
])

# Add time-based features
events = events.with_columns([
    pl.col("timestamp").dt.hour().alias("hour_of_day"),
    pl.col("timestamp").dt.weekday().alias("day_of_week")
])

# Sort by visitor and time to define sessions
events = events.sort(["visitorid", "timestamp"])

# Define new session based on 30-min gap
events = events.with_columns([
    (pl.col("timestamp").diff().over("visitorid") > pl.duration(minutes=30)).fill_null(True).alias("is_new_session")
])

# Cumulative session number
events = events.with_columns([
    pl.col("is_new_session").cast(pl.Int32).cum_sum().over("visitorid").alias("session_number")
])

# Create session ID
events = events.with_columns([
    (pl.col("visitorid").cast(str) + "_" + pl.col("session_number").cast(str)).alias("sessionid")
])

# --- Feature: views_last_24h ---

# Calculate cutoff time
cutoff_time = events.select(pl.col("timestamp").max()).item() - timedelta(days=1)

# Filter recent views
recent_views = events.filter(
    (pl.col("timestamp") >= cutoff_time) & (pl.col("event") == "view")
)

# Count views in the last 24h per session
views_24h = recent_views.group_by("sessionid").agg(
    pl.len().alias("views_last_24h")

)

# Merge back into main events data
events = events.join(views_24h, on="sessionid", how="left").fill_null(0)

# Save or preview
events.write_parquet("/content/enriched_events.parquet")
events.head()


timestamp,visitorid,event,itemid,transactionid,hour_of_day,day_of_week,is_new_session,session_number,sessionid,views_last_24h
datetime[ms],i64,str,i64,str,i8,i8,bool,i32,str,u32
2015-09-11 20:49:49.439,0,"""view""",285930,,20,5,True,1,"""0_1""",0
2015-09-11 20:52:39.591,0,"""view""",357564,,20,5,False,1,"""0_1""",0
2015-09-11 20:55:17.175,0,"""view""",67045,,20,5,False,1,"""0_1""",0
2015-08-13 17:46:06.444,1,"""view""",72028,,17,4,True,1,"""1_1""",0
2015-08-07 17:51:44.567,2,"""view""",325215,,17,5,True,1,"""2_1""",0


In [17]:
print(type(recent_views))
print(recent_views.columns)



<class 'polars.dataframe.frame.DataFrame'>
['timestamp', 'visitorid', 'event', 'itemid', 'transactionid', 'hour_of_day', 'day_of_week', 'is_new_session', 'session_number', 'sessionid']


In [18]:
import polars as pl

# Event type splits
views = events.filter(pl.col("event") == "view")
carts = events.filter(pl.col("event") == "cart")
purchases = events.filter(pl.col("event") == "purchase")

# Count events by item
view_counts = views.group_by("itemid").agg(pl.len().alias("view_count"))
cart_counts = carts.group_by("itemid").agg(pl.len().alias("cart_count"))
purchase_counts = purchases.group_by("itemid").agg(pl.len().alias("purchase_count"))

# Join with custom suffix to avoid duplicate `itemid` columns
conversion_stats = (
    view_counts
    .join(cart_counts, on="itemid", how="full", suffix="_cart")
    .join(purchase_counts, on="itemid", how="full", suffix="_purchase")
    .fill_null(0)
)

# Compute conversion rates
conversion_stats = conversion_stats.with_columns([
    (pl.col("cart_count") / pl.col("view_count")).alias("cart_conversion_rate"),
    (pl.col("purchase_count") / pl.col("view_count")).alias("purchase_conversion_rate")
])

# Join back to event stream
events = events.join(
    conversion_stats.select(["itemid", "cart_conversion_rate", "purchase_conversion_rate"]),
    on="itemid",
    how="left"
).fill_null(0)

# Save result
events.write_parquet("/content/enriched_events_with_conversion.parquet")
print("✅ Done: Saved to enriched_events_with_conversion.parquet")


✅ Done: Saved to enriched_events_with_conversion.parquet


In [19]:
pl.read_csv("events.csv").columns


['timestamp', 'visitorid', 'event', 'itemid', 'transactionid']

In [20]:
import polars as pl

# Load in lazy mode
events = pl.read_csv("events.csv").lazy()

# Convert timestamp and add time-based features
events = events.with_columns([
    pl.col("timestamp").cast(pl.Datetime("ms")).alias("timestamp")
])

events = events.with_columns([
    pl.col("timestamp").dt.hour().alias("hour_of_day"),
    pl.col("timestamp").dt.weekday().alias("day_of_week")
])

# Preview 5 rows safely
events.head(5).collect()


timestamp,visitorid,event,itemid,transactionid,hour_of_day,day_of_week
datetime[ms],i64,str,i64,str,i8,i8
2015-06-02 05:02:12.117,257597,"""view""",355908,,5,2
2015-06-02 05:50:14.164,992329,"""view""",248676,,5,2
2015-06-02 05:13:19.827,111016,"""view""",318965,,5,2
2015-06-02 05:12:35.914,483717,"""view""",253185,,5,2
2015-06-02 05:02:17.106,951259,"""view""",367447,,5,2


In [21]:
import polars as pl
from datetime import timedelta

# Load events as LazyFrame
events = pl.read_csv("events.csv").lazy()

# Step 1: Cast timestamp to Datetime
events = events.with_columns([
    pl.col("timestamp").cast(pl.Datetime("ms")).alias("timestamp")
])

# Step 2: Add hour_of_day and day_of_week
events = events.with_columns([
    pl.col("timestamp").dt.hour().alias("hour_of_day"),
    pl.col("timestamp").dt.weekday().alias("day_of_week"),
])


In [22]:
events.collect().head(5)



timestamp,visitorid,event,itemid,transactionid,hour_of_day,day_of_week
datetime[ms],i64,str,i64,str,i8,i8
2015-06-02 05:02:12.117,257597,"""view""",355908,,5,2
2015-06-02 05:50:14.164,992329,"""view""",248676,,5,2
2015-06-02 05:13:19.827,111016,"""view""",318965,,5,2
2015-06-02 05:12:35.914,483717,"""view""",253185,,5,2
2015-06-02 05:02:17.106,951259,"""view""",367447,,5,2


In [23]:
from datetime import timedelta
import polars as pl

# Step 1: Sort and detect new sessions
# Ensure the data is sorted by visitor and timestamp within the lazy context
events = events.sort(["visitorid", "timestamp"])

# Step 2: Calculate time difference and detect new sessions
events = events.with_columns([
    (pl.col("timestamp").diff().over("visitorid") > pl.duration(minutes=30))
    .fill_null(True)
    .alias("is_new_session")
])

# Step 3: Assign session_number per visitor using a window function
events = events.with_columns([
    pl.col("is_new_session").cast(pl.Int32).cum_sum().over("visitorid").alias("session_number")
])

# Step 4: Create sessionid column
events = events.with_columns([
    (pl.col("visitorid").cast(pl.Utf8) + "_" + pl.col("session_number").cast(pl.Utf8)).alias("sessionid")
])

# Preview the result (collecting only a small portion)
print(events.head(5))


naive plan: (run LazyFrame.explain(optimized=True) to see the optimized plan)

SLICE[offset: 0, len: 5]
   WITH_COLUMNS:
   [col("visitorid").strict_cast(String).str.concat_horizontal([String(_), col("session_number").strict_cast(String)]).alias("sessionid")] 
     WITH_COLUMNS:
     [col("is_new_session").strict_cast(Int32).cum_sum().over([col("visitorid")]).alias("session_number")] 
       WITH_COLUMNS:
       [[(col("timestamp").diff().over([col("visitorid")])) > (30m.cast(Duration(Milliseconds)))].fill_null([true]).alias("is_new_session")] 
        SORT BY [col("visitorid"), col("timestamp")]
           WITH_COLUMNS:
           [col("timestamp").dt.hour().alias("hour_of_day"), col("timestamp").dt.weekday().alias("day_of_week")] 
             WITH_COLUMNS:
             [col("timestamp").strict_cast(Datetime(Milliseconds, None)).alias("timestamp")] 
              DF ["timestamp", "visitorid", "event", "itemid", ...]; PROJECT */5 COLUMNS
