On each processing day, enrich the impressions dataset by joining with the customer’s latest 1000 actions prior to the impression date, using a minimum one-year lookback window.
For the initial full data load (extracting actions and processing impressions), run the following notebooks in order:
- preprocessing (only if inputs are in CSV format)
- impressions_pipeline
For subsequent daily incremental runs (incremental actions extraction and impressions processing), run:
- impressions_pipeline_incremental
- Transforms input csv files to parquet format with defined schemas
Action Data Extraction
-
Extract past customer actions from
clicks
,add to carts
, andprevious orders
tables. Each table accumulates large volumes over time (150M, 15M, 2M rows per 7-day period), making full historical scans for 1–5 years extremely costly in compute and latency. -
Extract a configurable number of recent actions (e.g., last 1000 per customer per action type) into
clicks_extract
,carts_extract
, andorders_extract
. -
Subsequent runs only need to process new daily partitions, avoiding full scans while preserving correct action history for impressions.
Action Union
- Union the three extracted tables into a consolidated actions dataset, which contains up to 1000 latest actions per customer per action type, ready for downstream processing.
Main Transformation
- Join the consolidated actions with impressions to enrich each impression with the customer’s most recent 1000 actions prior to the impression date, ordered from most recent to oldest.
1. Incremental Actions Extraction
- Query the max processed date from the extracted tables and load only new daily partitions since then, appending to clicks_extract, carts_extract, and orders_extract.
2. Action Union Union the updated extracted tables into a consolidated dataset holding up to 1000 latest actions per customer per action type.
3. Main Transformation Join the consolidated actions with impressions to attach each customer’s most recent 1000 actions prior to the impression date, ordered most recent → oldest.
👉 The sample data used for running the pipelines is available at tests/data.
-
input_base_path
Location of parquet input datasets.- If inputs are in CSV format, place them under a parallel
csv
folder (see spark_config.csv_data_location). - A preprocessing step will convert them to parquet and place them here.
- If inputs are in CSV format, place them under a parallel
-
output_base_path
Location where pipeline-generated parquet outputs are written. -
datasets
Names of input parquet datasets expected in input_base_path. -
tables
Input tables available directly from the warehouse (if applicable). -
outputs
Names of output parquet datasets, which will be written to output_base_path.
-
actions_lookback_days
Number of consecutive past days of actions to load when processing impressions. -
actions_start_date
Alternative to actions_lookback_days; process actions starting from a specific date instead of consecutive recent days. -
actions_normalize_output_mode
Write mode for normalized action tables.- Options:
overwrite
: replace existing dataappend
: incrementally add data
- Options:
-
actions_incremental_output_mode
Write mode for incremental action datasets (typicallyappend
). -
impressions_lookback_days
Number of consecutive past days of impressions to load (same as or less than actions_lookback_days). -
impressions_date
Alternative to impressions_lookback_days; process impressions for a specific date instead of a consecutive range. -
impressions_output_mode
Write mode for impressions datasets.- Options:
overwrite
,append
.
- Options:
-
past_actions_count
Number of previous actions per customer to retain when extracting actions and enriching impressions. -
zero_actions_padding
Controls whether to pad customers with zero actions (True
) or skip them (False
). -
csv_data_location
Location of CSV input datasets (if provided instead of parquet). -
catalog_schema
Target schema where views (tables) are created over the output parquet datasets. -
skip_tables
IfTrue
, skips creating views in the catalog and only generates parquet datasets.