<span style="color:red; font-family:Helvetica Neue, Helvetica, Arial, sans-serif; font-size:2em;">An Exception was encountered at '<a href="#papermill-error-cell">In [2]</a>'.</span>

# Chapter 1d: Event Aggregation (Event Bronze Track → Entity Bronze Track)

**Purpose:** Aggregate event-level data to entity-level, producing a dataset ready for standard exploration.

**When to use this notebook:**
- After completing 01a, 01b, 01c (temporal exploration)
- Your dataset is EVENT_LEVEL granularity
- You want to create entity-level features from events

**What this notebook produces:**
- Aggregated parquet file (one row per entity)
- New findings file for the aggregated data
- Updated original findings with aggregation metadata

**Aggregation Strategy:**

| Feature Type | Examples | Purpose |
|--------------|----------|--------|
| **Event Counts** | event_count_7d, event_count_30d | Activity level |
| **Value Aggregations** | amount_sum_30d, clicks_mean_7d | Behavior magnitude |
| **Recency** | days_since_last_event | Recent engagement |
| **Tenure** | days_since_first_event | Customer lifecycle |

---

## Understanding the Shape Transformation

```
EVENT-LEVEL (input)              ENTITY-LEVEL (output)
┌─────────────────────┐          ┌─────────────────────────────────────┐
│ customer │ date     │          │ customer │ events_7d │ events_30d │ ...
├──────────┼──────────┤    →     ├──────────┼───────────┼────────────┤
│ A        │ Jan 1    │          │ A        │ 3         │ 12         │
│ A        │ Jan 5    │          │ B        │ 1         │ 5          │
│ A        │ Jan 10   │          │ C        │ 0         │ 2          │
│ B        │ Jan 3    │          └──────────┴───────────┴────────────┘
│ ...      │ ...      │
└──────────┴──────────┘
Many rows per entity           One row per entity
```

## 1d.1 Load Findings and Data

In [1]:
from customer_retention.analysis.auto_explorer import ExplorationFindings, DataExplorer
from customer_retention.analysis.visualization import ChartBuilder, display_figure, display_table
from customer_retention.core.config.column_config import ColumnType, DatasetGranularity
from customer_retention.stages.profiling import TimeWindowAggregator
from datetime import datetime
from pathlib import Path
import pandas as pd
import numpy as np

<span id="papermill-error-cell" style="color:red; font-family:Helvetica Neue, Helvetica, Arial, sans-serif; font-size:2em;">Execution using papermill encountered an exception here and stopped:</span>

In [2]:
# === CONFIGURATION ===
FINDINGS_DIR = Path("../experiments/findings")

# Find findings files (exclude multi_dataset and already-aggregated)
findings_files = [
    f for f in FINDINGS_DIR.glob("*_findings.yaml") 
    if "multi_dataset" not in f.name and "_aggregated" not in f.name
]
if not findings_files:
    raise FileNotFoundError(f"No findings files found in {FINDINGS_DIR}. Run notebook 01 first.")

findings_files.sort(key=lambda f: f.stat().st_mtime, reverse=True)
FINDINGS_PATH = str(findings_files[0])

print(f"Using: {FINDINGS_PATH}")
findings = ExplorationFindings.load(FINDINGS_PATH)
print(f"Loaded findings for {findings.column_count} columns from {findings.source_path}")

FileNotFoundError: No findings files found in ../experiments/findings. Run notebook 01 first.

In [None]:
# Verify this is event-level data
if not findings.is_time_series:
    print("\u26a0\ufe0f This dataset is NOT event-level. Aggregation not needed.")
    print("   Proceed directly to 02_column_deep_dive.ipynb")
    raise SystemExit("Skipping aggregation - data is already entity-level")

ts_meta = findings.time_series_metadata
ENTITY_COLUMN = ts_meta.entity_column
TIME_COLUMN = ts_meta.time_column

print(f"\u2705 Dataset confirmed as EVENT-LEVEL")
print(f"   Entity column: {ENTITY_COLUMN}")
print(f"   Time column: {TIME_COLUMN}")
print(f"   Unique entities: {ts_meta.unique_entities:,}")
print(f"   Avg events/entity: {ts_meta.avg_events_per_entity:.1f}")

In [None]:
from customer_retention.stages.temporal import load_data_with_snapshot_preference, TEMPORAL_METADATA_COLS

# Load source data (prefers snapshots over raw files)
df, data_source = load_data_with_snapshot_preference(findings, output_dir="../experiments/findings")
df[TIME_COLUMN] = pd.to_datetime(df[TIME_COLUMN])
charts = ChartBuilder()

print(f"Loaded {len(df):,} events x {len(df.columns)} columns")
print(f"Data source: {data_source}")
print(f"Date range: {df[TIME_COLUMN].min()} to {df[TIME_COLUMN].max()}")

## 1d.2 Configure Aggregation

Configure the time windows and aggregation functions to use.

In [None]:
# === AGGREGATION CONFIGURATION ===

# Time windows (from findings or defaults)
DEFAULT_WINDOWS = ["7d", "30d", "90d", "180d", "365d", "all_time"]
WINDOWS = ts_meta.suggested_aggregations if ts_meta.suggested_aggregations else DEFAULT_WINDOWS

# Reference date for window calculations
# Options: use max date in data, or a fixed date
REFERENCE_DATE = df[TIME_COLUMN].max()
# REFERENCE_DATE = pd.Timestamp("2024-01-01")  # Uncomment to use fixed date

# Value columns to aggregate (numeric columns excluding entity/time)
numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
VALUE_COLUMNS = [c for c in numeric_cols if c not in [ENTITY_COLUMN]]

# Aggregation functions
AGG_FUNCTIONS = ["sum", "mean", "max", "count"]

# Include recency and tenure features
INCLUDE_RECENCY = True
INCLUDE_TENURE = True

print("Aggregation Configuration:")
print(f"   Windows: {WINDOWS}")
print(f"   Reference date: {REFERENCE_DATE}")
print(f"   Value columns: {VALUE_COLUMNS[:5]}{'...' if len(VALUE_COLUMNS) > 5 else ''}")
print(f"   Aggregation functions: {AGG_FUNCTIONS}")
print(f"   Include recency: {INCLUDE_RECENCY}")
print(f"   Include tenure: {INCLUDE_TENURE}")

## 1d.3 Preview Aggregation Plan

See what features will be created before executing.

In [None]:
# Initialize aggregator
aggregator = TimeWindowAggregator(
    entity_column=ENTITY_COLUMN,
    time_column=TIME_COLUMN
)

# Generate plan
plan = aggregator.generate_plan(
    windows=WINDOWS,
    value_columns=VALUE_COLUMNS,
    agg_funcs=AGG_FUNCTIONS,
    include_event_count=True,
    include_recency=INCLUDE_RECENCY,
    include_tenure=INCLUDE_TENURE
)

print("\n" + "="*60)
print("AGGREGATION PLAN")
print("="*60)
print(f"\nEntity column: {plan.entity_column}")
print(f"Time column: {plan.time_column}")
print(f"Windows: {[w.name for w in plan.windows]}")
print(f"\nFeatures to be created ({len(plan.feature_columns)}):")
for feat in plan.feature_columns[:20]:
    print(f"   - {feat}")
if len(plan.feature_columns) > 20:
    print(f"   ... and {len(plan.feature_columns) - 20} more")

## 1d.4 Execute Aggregation

In [None]:
print("Executing aggregation...")
print(f"   Input: {len(df):,} events")
print(f"   Expected output: {df[ENTITY_COLUMN].nunique():,} entities")

df_aggregated = aggregator.aggregate(
    df,
    windows=WINDOWS,
    value_columns=VALUE_COLUMNS,
    agg_funcs=AGG_FUNCTIONS,
    reference_date=REFERENCE_DATE,
    include_event_count=True,
    include_recency=INCLUDE_RECENCY,
    include_tenure=INCLUDE_TENURE
)

print(f"\n\u2705 Aggregation complete!")
print(f"   Output: {len(df_aggregated):,} entities x {len(df_aggregated.columns)} features")
print(f"   Memory: {df_aggregated.memory_usage(deep=True).sum() / 1024**2:.1f} MB")

In [None]:
# Preview aggregated data
print("\nAggregated Data Preview:")
display(df_aggregated.head(10))

In [None]:
# Summary statistics
print("\nFeature Summary Statistics:")
display(df_aggregated.describe().T)

## 1d.5 Quality Check on Aggregated Data

Quick validation of the aggregated output.

In [None]:
print("="*60)
print("AGGREGATED DATA QUALITY CHECK")
print("="*60)

# Check for nulls
null_counts = df_aggregated.isnull().sum()
cols_with_nulls = null_counts[null_counts > 0]

if len(cols_with_nulls) > 0:
    print(f"\n\u26a0\ufe0f Columns with null values ({len(cols_with_nulls)}):")
    for col, count in cols_with_nulls.items():
        pct = count / len(df_aggregated) * 100
        print(f"   {col}: {count:,} ({pct:.1f}%)")
    print("\n   Note: Nulls in aggregated features typically mean no events in that window.")
    print("   Consider filling with 0 for count/sum features.")
else:
    print("\n\u2705 No null values in aggregated data")

# Check entity count matches
original_entities = df[ENTITY_COLUMN].nunique()
aggregated_entities = len(df_aggregated)

if original_entities == aggregated_entities:
    print(f"\n\u2705 Entity count matches: {aggregated_entities:,}")
else:
    print(f"\n\u26a0\ufe0f Entity count mismatch!")
    print(f"   Original: {original_entities:,}")
    print(f"   Aggregated: {aggregated_entities:,}")

## 1d.6 Save Aggregated Data and Findings

In [None]:
# Generate output paths
original_name = Path(findings.source_path).stem
findings_name = Path(FINDINGS_PATH).stem.replace("_findings", "")

# Save aggregated data as parquet
AGGREGATED_DATA_PATH = FINDINGS_DIR / f"{findings_name}_aggregated.parquet"
df_aggregated.to_parquet(AGGREGATED_DATA_PATH, index=False)

print(f"\u2705 Aggregated data saved to: {AGGREGATED_DATA_PATH}")
print(f"   Size: {AGGREGATED_DATA_PATH.stat().st_size / 1024:.1f} KB")

In [None]:
# Create new findings for aggregated data using DataExplorer
print("\nGenerating findings for aggregated data...")

explorer = DataExplorer(output_dir=str(FINDINGS_DIR))
aggregated_findings = explorer.explore(
    str(AGGREGATED_DATA_PATH),
    dataset_name=f"{findings_name}_aggregated"
)

AGGREGATED_FINDINGS_PATH = explorer.last_findings_path
print(f"\u2705 Aggregated findings saved to: {AGGREGATED_FINDINGS_PATH}")

In [None]:
# Update original findings with aggregation metadata
findings.time_series_metadata.aggregation_executed = True
findings.time_series_metadata.aggregated_data_path = str(AGGREGATED_DATA_PATH)
findings.time_series_metadata.aggregated_findings_path = str(AGGREGATED_FINDINGS_PATH)
findings.time_series_metadata.aggregation_windows_used = WINDOWS
findings.time_series_metadata.aggregation_timestamp = datetime.now().isoformat()

findings.save(FINDINGS_PATH)
print(f"\u2705 Original findings updated with aggregation metadata: {FINDINGS_PATH}")

In [None]:
# Summary of outputs
print("\n" + "="*60)
print("AGGREGATION COMPLETE - OUTPUT SUMMARY")
print("="*60)
print(f"\n\U0001f4c1 Files created:")
print(f"   1. Aggregated data: {AGGREGATED_DATA_PATH}")
print(f"   2. Aggregated findings: {AGGREGATED_FINDINGS_PATH}")
print(f"   3. Updated original findings: {FINDINGS_PATH}")

print(f"\n\U0001f4ca Aggregation stats:")
print(f"   Input events: {len(df):,}")
print(f"   Output entities: {len(df_aggregated):,}")
print(f"   Features created: {len(df_aggregated.columns)}")
print(f"   Windows used: {WINDOWS}")

---

## Summary: What We Did

In this notebook, we transformed event-level data to entity-level:

1. **Loaded event data** with entity and time columns
2. **Configured aggregation** windows and functions
3. **Executed aggregation** using TimeWindowAggregator
4. **Quality checked** the aggregated output
5. **Saved outputs** - data file, findings, and metadata

## Output Files

| File | Purpose | Next Use |
|------|---------|----------|
| `*_aggregated.parquet` | Entity-level data | Input for notebooks 02-04 |
| `*_aggregated_findings.yaml` | Auto-profiled findings | Loaded by 02_column_deep_dive |
| Original findings (updated) | Aggregation tracking | Reference |

---

## Next Steps

**Event Bronze Track complete!** Continue with the **Entity Bronze Track** on the aggregated data:

1. **02_column_deep_dive.ipynb** - Profile the aggregated feature distributions
2. **03_quality_assessment.ipynb** - Run quality checks on entity-level data
3. **04_relationship_analysis.ipynb** - Analyze feature correlations and target relationships

The notebooks will auto-discover the aggregated findings file (most recently modified).

```python
# The aggregated findings file is now the most recent, so notebooks 02-04
# will automatically use it via the standard discovery pattern.
```