# Live customer state: days since last purchase & replacement “due” signals

Goal:
- Build a "live" customer-product table that answers: **as of a chosen date, how long has it been since customer X last purchased product group Y?**
- Extend the logic for consumables (filters) using **stock-up-aware** and **equipment-aware** due-date estimates.

Key principles:
- Live state includes **all customers**, including one-time buyers.
- Replacement benchmarks (typical cycles) come from historical repeat-purchase intervals (previous notebook outputs).
- We build state "as-of" a selected date (`ASOF_DATE`) so the logic can be refreshed daily.


In [30]:
import pandas as pd
import numpy as np

# ---- Paths (adjust if your folder differs) ----
CUST_DAY_PATH = "../data/interim/cust_day_group.parquet"
BENCH_PATH = "../data/interim/retention_benchmarks.parquet"  # optional but recommended

cust_day = pd.read_parquet(CUST_DAY_PATH)

# Parse dates (important for correct sorting + deltas)
cust_day["date"] = pd.to_datetime(cust_day["date"], errors="coerce", dayfirst=True)
cust_day = cust_day.dropna(subset=["date"])

# Choose "as-of" date for the live snapshot
# (Later you can set this to pd.Timestamp.today().normalize())
ASOF_DATE = cust_day["date"].max()

print("Rows:", len(cust_day))
print("Customers:", cust_day["anon"].nunique())
print("Date range:", cust_day["date"].min().date(), "→", cust_day["date"].max().date())
print("ASOF_DATE:", ASOF_DATE.date())

# Load benchmarks if available (we’ll use them for filter due-date logic)
try:
    benchmarks = pd.read_parquet(BENCH_PATH)
    print("Benchmarks loaded:", benchmarks.shape)
except Exception as e:
    benchmarks = None
    print("Benchmarks not loaded (OK for now):", e)


  cust_day["date"] = pd.to_datetime(cust_day["date"], errors="coerce", dayfirst=True)


Rows: 677481
Customers: 333901
Date range: 2022-01-12 → 2025-12-11
ASOF_DATE: 2025-12-11
Benchmarks loaded: (35, 4)


## Baseline live state: last purchase date and recency (all products)

We create the simplest "live" customer-product table:
- For each customer and product group, find the most recent purchase date.
- Compute `days_since_last_purchase` as of `ASOF_DATE`.

This baseline works for most durable / non-consumable products.
Consumables (filters) will be extended later with equipment-aware "due" logic.


In [31]:
# Only purchases count as "events" for recency
p = cust_day[cust_day["is_purchase"]].copy()

# last purchase per customer x product group
live_last_purchase = (
    p.groupby(["anon", "MATRIX GRUPA PRODUKTOWA"], as_index=False)
     .agg(
         last_purchase_date=("date", "max"),
         # optional context columns - can be useful later for debugging
         last_matrix_name=("MATRIX NAZWA", "last"),
     )
)

# recency as of snapshot date
live_last_purchase["days_since_last_purchase"] = (
    ASOF_DATE - live_last_purchase["last_purchase_date"]
).dt.days

# quick sanity checks
print("Rows (customer x product group):", len(live_last_purchase))
print("Unique customers covered:", live_last_purchase["anon"].nunique())
live_last_purchase.tail()


Rows (customer x product group): 606555
Unique customers covered: 333901


Unnamed: 0,anon,MATRIX GRUPA PRODUKTOWA,last_purchase_date,last_matrix_name,days_since_last_purchase
606550,ANON_0336612,03_butelki filtrujące SOLID,2025-11-30,"SOLID 0,7 1F",11
606551,ANON_0336612,05_butelki i kubki termiczne,2025-11-30,"EASY 0,49",11
606552,ANON_0336612,06_filtry do butelek Soft i Solid,2025-11-30,1 FILTR BUTELKOWY,11
606553,ANON_0336612,10_dzbanki filtrujące Crystal,2025-11-30,CRYSTAL 2L LED CLASSIC 1F,11
606554,ANON_0336612,13_filtry do dzbanków standard,2025-11-30,1 FILTR CLASSIC,11


## Bottle equipment + filter inventory ledger (stock-up aware)

For bottle filters, "days since last purchase" is not enough because:
- customers can stock up (buy multiple filters at once),
- and consumption scales with equipment (number of bottles owned).

Important business rule:
- Each purchased filtering bottle includes **1 filter**.
  Therefore, bottle purchases increase both:
  - equipment state (`bottles_owned`)
  - consumable inventory (`filters_delta += bottles_bought`)


In [32]:
# --- Define groups (explicit + readable) ---
BOTTLE_GROUPS_WITH_INCLUDED_FILTER = [
    "02_butelki filtrujące SOFT",
    "03_butelki filtrujące SOLID",
]

BOTTLE_FILTER_GROUP = "06_filtry do butelek Soft i Solid"

# Only purchase events
p = cust_day[cust_day["is_purchase"]].copy()

# Ensure qty columns numeric
p["matrix_qty"] = pd.to_numeric(p["matrix_qty"], errors="coerce").fillna(0)
p["ILOŚĆ FILTRÓW"] = pd.to_numeric(p["ILOŚĆ FILTRÓW"], errors="coerce").fillna(0)

# --- Build per-row deltas ---
# equipment: bottles purchased (only for filtering bottles)
p["bottles_delta"] = np.where(
    p["MATRIX GRUPA PRODUKTOWA"].isin(BOTTLE_GROUPS_WITH_INCLUDED_FILTER),
    p["matrix_qty"],
    0.0
)

# consumable: filters bought explicitly (from filter group)
# prefer ILOŚĆ FILTRÓW when present, else fallback to matrix_qty
p["filters_bought_delta"] = np.where(
    p["MATRIX GRUPA PRODUKTOWA"].eq(BOTTLE_FILTER_GROUP),
    np.where(p["ILOŚĆ FILTRÓW"] > 0, p["ILOŚĆ FILTRÓW"], p["matrix_qty"]),
    0.0
)

# consumable: filters included with bottles (1 per bottle)
p["filters_included_with_bottles_delta"] = p["bottles_delta"]

# total filter units added on that row
p["filters_delta"] = p["filters_bought_delta"] + p["filters_included_with_bottles_delta"]

# --- Aggregate to customer-day ledger ---
ledger_day = (
    p.groupby(["anon", "date"], as_index=False)
     .agg(
         bottles_delta=("bottles_delta", "sum"),
         filters_delta=("filters_delta", "sum"),
     )
     .sort_values(["anon", "date"])
     .reset_index(drop=True)
)

# --- Convert ledger into cumulative state ---
ledger_day["bottles_owned"] = ledger_day.groupby("anon")["bottles_delta"].cumsum()
ledger_day["filters_acquired_cum"] = ledger_day.groupby("anon")["filters_delta"].cumsum()

print("Ledger days:", len(ledger_day))
ledger_day.head(10)


Ledger days: 410261


Unnamed: 0,anon,date,bottles_delta,filters_delta,bottles_owned,filters_acquired_cum
0,ANON_0000001,2022-09-11,1.0,4.0,1.0,4.0
1,ANON_0000002,2022-09-11,2.0,2.0,2.0,2.0
2,ANON_0000003,2022-09-11,0.0,0.0,0.0,0.0
3,ANON_0000004,2022-09-11,0.0,0.0,0.0,0.0
4,ANON_0000005,2022-09-11,0.0,3.0,0.0,3.0
5,ANON_0000006,2022-09-11,1.0,1.0,1.0,1.0
6,ANON_0000007,2022-09-11,2.0,4.0,2.0,4.0
7,ANON_0000008,2022-09-11,0.0,0.0,0.0,0.0
8,ANON_0000009,2022-10-11,0.0,0.0,0.0,0.0
9,ANON_0000009,2025-11-19,1.0,4.0,1.0,4.0


## Attach equipment state to the latest bottle-filter purchase

To compute "due" signals for bottle filters, we need context at the moment of the last filter-related event:
- how many bottles the customer had at that time (`bottles_owned`)
- how many filter units they had acquired in total (`filters_acquired_cum`)

We attach this context using an as-of merge (`merge_asof`) on customer and date.


In [33]:
# 1) Identify bottle-filter purchase days (group 06)
filter_events = (
    cust_day[
        (cust_day["is_purchase"]) &
        (cust_day["MATRIX GRUPA PRODUKTOWA"] == BOTTLE_FILTER_GROUP)
    ]
    .copy()
)

# ensure numeric filter qty per row (prefer ILOŚĆ FILTRÓW)
filter_events["filter_units"] = np.where(
    pd.to_numeric(filter_events["ILOŚĆ FILTRÓW"], errors="coerce").fillna(0) > 0,
    pd.to_numeric(filter_events["ILOŚĆ FILTRÓW"], errors="coerce").fillna(0),
    pd.to_numeric(filter_events["matrix_qty"], errors="coerce").fillna(0),
)

# aggregate to customer-day (in case multiple lines same day)
filter_day = (
    filter_events.groupby(["anon", "date"], as_index=False)
    .agg(filters_bought=("filter_units", "sum"))
)

# last filter purchase day per customer
last_filter = (
    filter_day.sort_values(["anon", "date"])
    .groupby("anon", as_index=False)
    .tail(1)
    .rename(columns={"date": "last_filter_purchase_date"})
    .sort_values(["anon", "last_filter_purchase_date"])
    .reset_index(drop=True)
)

print("Customers with bottle-filter purchases:", last_filter["anon"].nunique())
last_filter.head()


Customers with bottle-filter purchases: 119602


Unnamed: 0,anon,last_filter_purchase_date,filters_bought
0,ANON_0000001,2022-09-11,3.0
1,ANON_0000005,2022-09-11,3.0
2,ANON_0000007,2022-09-11,2.0
3,ANON_0000009,2025-11-19,3.0
4,ANON_0000012,2024-09-26,3.0


In [34]:
# --- Make both sides use the same on-key name: 'date' ---
last_filter_for_asof = (
    last_filter
    .rename(columns={"last_filter_purchase_date": "date"})
    .copy()
)

# IMPORTANT: sort by ['date','anon'] (global date monotonic)
last_filter_for_asof = last_filter_for_asof.sort_values(["date", "anon"]).reset_index(drop=True)
ledger_for_asof = ledger_day.sort_values(["date", "anon"]).reset_index(drop=True)

last_filter_state = pd.merge_asof(
    last_filter_for_asof,
    ledger_for_asof[["anon", "date", "bottles_owned", "filters_acquired_cum"]],
    on="date",
    by="anon",
    direction="backward",
    allow_exact_matches=True,
)

# Rename back for clarity in downstream code
last_filter_state = last_filter_state.rename(columns={"date": "last_filter_purchase_date"})

last_filter_state[["anon", "last_filter_purchase_date", "filters_bought", "bottles_owned", "filters_acquired_cum"]].head(10)


Unnamed: 0,anon,last_filter_purchase_date,filters_bought,bottles_owned,filters_acquired_cum
0,ANON_0001869,2022-01-12,3.0,1.0,4.0
1,ANON_0001872,2022-01-12,3.0,2.0,5.0
2,ANON_0001874,2022-01-12,4.0,1.0,5.0
3,ANON_0001892,2022-01-12,2.0,0.0,2.0
4,ANON_0001921,2022-01-12,6.0,1.0,7.0
5,ANON_0001924,2022-01-12,6.0,0.0,6.0
6,ANON_0001929,2022-01-12,3.0,2.0,5.0
7,ANON_0001935,2022-01-12,6.0,2.0,8.0
8,ANON_0001938,2022-01-12,3.0,0.0,3.0
9,ANON_0001943,2022-01-12,2.0,0.0,2.0


## Bottle-filter due logic (stock-up + equipment aware)

For bottle filters we estimate a customer's "coverage" from their latest filter-related event:

- Filters added at the last event =
  - explicit filters purchased that day (`filters_bought`)
  - + filters included with any filtering bottles bought on that same day (1 per bottle)

- Coverage is consumed across the customer’s equipment base.
  We estimate coverage duration as:

    coverage_days ≈ benchmark_days_per_unit × (filters_added / bottles_owned)

This yields an estimated `due_date` and `days_to_due` as of `ASOF_DATE`.

Note: some customers may have `bottles_owned = 0` in the recorded history (e.g. bottle purchased before dataset).
We handle this with a conservative fallback (treat as 1 bottle).


In [35]:
# 1) Bottles bought on the same day as the last filter purchase
# We will attach this as "bottles_bought_that_day" (which equals included filters that day)

bottles_day = (
    cust_day[
        (cust_day["is_purchase"]) &
        (cust_day["MATRIX GRUPA PRODUKTOWA"].isin(BOTTLE_GROUPS_WITH_INCLUDED_FILTER))
    ]
    .copy()
)

bottles_day["bottles_bought_that_day"] = pd.to_numeric(bottles_day["matrix_qty"], errors="coerce").fillna(0)

bottles_day = (
    bottles_day.groupby(["anon", "date"], as_index=False)
    .agg(bottles_bought_that_day=("bottles_bought_that_day", "sum"))
)

# merge onto last_filter_state by anon + last_filter_purchase_date
tmp = last_filter_state.merge(
    bottles_day,
    left_on=["anon", "last_filter_purchase_date"],
    right_on=["anon", "date"],
    how="left"
).drop(columns=["date"])

tmp["bottles_bought_that_day"] = tmp["bottles_bought_that_day"].fillna(0)

# filters added on last filter day = explicit + included (1 per bottle bought that day)
tmp["filters_added_last_event"] = tmp["filters_bought"] + tmp["bottles_bought_that_day"]

tmp[["anon", "last_filter_purchase_date", "filters_bought", "bottles_bought_that_day", "filters_added_last_event", "bottles_owned"]].head(10)


Unnamed: 0,anon,last_filter_purchase_date,filters_bought,bottles_bought_that_day,filters_added_last_event,bottles_owned
0,ANON_0001869,2022-01-12,3.0,1.0,4.0,1.0
1,ANON_0001872,2022-01-12,3.0,2.0,5.0,2.0
2,ANON_0001874,2022-01-12,4.0,1.0,5.0,1.0
3,ANON_0001892,2022-01-12,2.0,0.0,2.0,0.0
4,ANON_0001921,2022-01-12,6.0,1.0,7.0,1.0
5,ANON_0001924,2022-01-12,6.0,0.0,6.0,0.0
6,ANON_0001929,2022-01-12,3.0,2.0,5.0,2.0
7,ANON_0001935,2022-01-12,6.0,2.0,8.0,2.0
8,ANON_0001938,2022-01-12,3.0,0.0,3.0,0.0
9,ANON_0001943,2022-01-12,2.0,0.0,2.0,0.0


## Estimate due date for bottle filters using population benchmarks

We estimate when a customer will be "due" for their next bottle-filter purchase.

Inputs:
- `filters_added_last_event`: explicit filters bought + filters included with bottles purchased that day
- `bottles_owned`: equipment base at the time of last filter purchase
- `benchmark_days_per_unit` (median): typical "days per unit" from historical repeat-purchase intervals

Formula:
- coverage_days_est = benchmark_days_per_unit × (filters_added_last_event / bottles_owned_effective)
- due_date = last_filter_purchase_date + coverage_days_est

We use a conservative fallback for customers with `bottles_owned == 0` in the recorded history:
- bottles_owned_effective = max(bottles_owned, 1)


In [36]:
# tmp is the table we just built (last filter purchase + bottles bought that day + filters_added_last_event)

# 1) Extract the benchmark median for bottle filters
# (Adjust column name if your benchmarks table uses something slightly different.)
display(benchmarks.head())

# try common column names
possible_median_cols = ["median", "median_adj_retention_days", "median_adj", "median_adj_retention"]
median_col = next((c for c in possible_median_cols if c in benchmarks.columns), None)
if median_col is None:
    raise ValueError(f"Couldn't find a median column in benchmarks. Columns: {benchmarks.columns.tolist()}")

# find product-group column name
possible_group_cols = ["MATRIX GRUPA PRODUKTOWA", "product_group", "group"]
group_col = next((c for c in possible_group_cols if c in benchmarks.columns), None)
if group_col is None:
    raise ValueError(f"Couldn't find product group column in benchmarks. Columns: {benchmarks.columns.tolist()}")

bf_benchmark = benchmarks.loc[benchmarks[group_col] == BOTTLE_FILTER_GROUP, median_col]
if bf_benchmark.empty:
    raise ValueError(f"No benchmark found for {BOTTLE_FILTER_GROUP} in benchmarks.")
benchmark_days_per_unit = float(bf_benchmark.iloc[0])

print("Bottle-filter benchmark (median days per unit):", benchmark_days_per_unit)


Unnamed: 0,MATRIX GRUPA PRODUKTOWA,p25,median,p75
0,01_bidony,26.0,71.5,150.625
1,02_butelki filtrujące SOFT,30.5,91.0,218.833333
2,03_butelki filtrujące SOLID,55.0,146.25,291.0
3,04_termiczna butelka filtrująca SOLID,30.5,102.0,246.0
4,05_butelki i kubki termiczne,45.75,138.333333,276.0


Bottle-filter benchmark (median days per unit): 40.833333333333336


In [37]:
bf_live = tmp.copy()

# 2) Effective bottles owned (fallback for edge cases)
bf_live["bottles_owned_effective"] = bf_live["bottles_owned"].clip(lower=1)

# 3) Coverage estimate (days)
bf_live["coverage_days_est"] = (
    benchmark_days_per_unit *
    (bf_live["filters_added_last_event"] / bf_live["bottles_owned_effective"])
)

# 4) Due date + days to due as of ASOF_DATE
bf_live["due_date"] = bf_live["last_filter_purchase_date"] + pd.to_timedelta(bf_live["coverage_days_est"], unit="D")
bf_live["days_to_due"] = (bf_live["due_date"] - ASOF_DATE).dt.days
bf_live["days_since_last_filter_purchase"] = (ASOF_DATE - bf_live["last_filter_purchase_date"]).dt.days

# 5) Simple status buckets (tune thresholds later)
bf_live["status"] = np.select(
    [
        bf_live["days_to_due"] < 0,
        (bf_live["days_to_due"] >= 0) & (bf_live["days_to_due"] <= 14),
    ],
    [
        "overdue",
        "due_soon_14d",
    ],
    default="ok"
)

bf_live[[
    "anon",
    "last_filter_purchase_date",
    "filters_added_last_event",
    "bottles_owned",
    "bottles_owned_effective",
    "coverage_days_est",
    "due_date",
    "days_to_due",
    "status"
]].head(10)


Unnamed: 0,anon,last_filter_purchase_date,filters_added_last_event,bottles_owned,bottles_owned_effective,coverage_days_est,due_date,days_to_due,status
0,ANON_0001869,2022-01-12,4.0,1.0,1.0,163.333333,2022-06-24 07:59:59.999999997,-1266,overdue
1,ANON_0001872,2022-01-12,5.0,2.0,2.0,102.083333,2022-04-24 01:59:59.999999997,-1327,overdue
2,ANON_0001874,2022-01-12,5.0,1.0,1.0,204.166667,2022-08-04 04:00:00.000000002,-1225,overdue
3,ANON_0001892,2022-01-12,2.0,0.0,1.0,81.666667,2022-04-03 16:00:00.000000002,-1348,overdue
4,ANON_0001921,2022-01-12,7.0,1.0,1.0,285.833333,2022-10-24 20:00:00.000000005,-1144,overdue
5,ANON_0001924,2022-01-12,6.0,0.0,1.0,245.0,2022-09-14 00:00:00.000000000,-1184,overdue
6,ANON_0001929,2022-01-12,5.0,2.0,2.0,102.083333,2022-04-24 01:59:59.999999997,-1327,overdue
7,ANON_0001935,2022-01-12,8.0,2.0,2.0,163.333333,2022-06-24 07:59:59.999999997,-1266,overdue
8,ANON_0001938,2022-01-12,3.0,0.0,1.0,122.5,2022-05-14 12:00:00.000000000,-1307,overdue
9,ANON_0001943,2022-01-12,2.0,0.0,1.0,81.666667,2022-04-03 16:00:00.000000002,-1348,overdue


In [38]:
bf_live["status"].value_counts(dropna=False).to_frame("customers")


Unnamed: 0_level_0,customers
status,Unnamed: 1_level_1
overdue,94962
ok,22544
due_soon_14d,2096


## Assemble the live customer-product table (baseline + filter due signals)

We combine:
- Baseline recency for all customer × product-group pairs (`days_since_last_purchase`)
with
- Bottle-filter specific due signals (`due_date`, `days_to_due`, `status`, equipment context)

The result is a single "live" table that can power:
- marketing triggers (overdue / due soon)
- dashboards
- future extension to other consumables (pitcher filters, Mg+, etc.)


In [39]:
# 1) Start from baseline live_last_purchase (all products)
live = live_last_purchase.copy()

# 2) Prepare bottle-filter live due table to merge (rename group key to match)
bf_cols = [
    "anon",
    "last_filter_purchase_date",
    "due_date",
    "days_to_due",
    "status",
    "filters_added_last_event",
    "bottles_owned",
    "bottles_owned_effective",
    "coverage_days_est",
]
bf_merge = bf_live[bf_cols].copy()

bf_merge["MATRIX GRUPA PRODUKTOWA"] = BOTTLE_FILTER_GROUP
bf_merge = bf_merge.rename(columns={"last_filter_purchase_date": "last_purchase_date"})

# 3) Merge onto the baseline table (only bottle-filter rows will receive due columns)
live = live.merge(
    bf_merge,
    on=["anon", "MATRIX GRUPA PRODUKTOWA", "last_purchase_date"],
    how="left"
)

# For non-filter groups, status is empty; for filters it is overdue/due soon/ok
# You may want a unified "actionable_status" that defaults to "recency_only"
live["actionable_status"] = np.where(
    live["MATRIX GRUPA PRODUKTOWA"].eq(BOTTLE_FILTER_GROUP),
    live["status"].fillna("ok"),  # should rarely be NaN for filter buyers
    "recency_only"
)

print("Live rows:", len(live))
live.head()


Live rows: 606555


Unnamed: 0,anon,MATRIX GRUPA PRODUKTOWA,last_purchase_date,last_matrix_name,days_since_last_purchase,due_date,days_to_due,status,filters_added_last_event,bottles_owned,bottles_owned_effective,coverage_days_est,actionable_status
0,ANON_0000001,03_butelki filtrujące SOLID,2022-09-11,"SOLID 0,7 1F",1187,NaT,,,,,,,recency_only
1,ANON_0000001,06_filtry do butelek Soft i Solid,2022-09-11,1 FILTR BUTELKOWY,1187,2023-02-21 07:59:59.999999997,-1024.0,overdue,4.0,1.0,1.0,163.333333,overdue
2,ANON_0000001,07_akcesoria do Soft/Solid,2022-09-11,"RURKA TRITANOWA DO SOLID 0,7",1187,NaT,,,,,,,recency_only
3,ANON_0000002,02_butelki filtrujące SOFT,2022-09-11,"SOFT 0,5 1F",1187,NaT,,,,,,,recency_only
4,ANON_0000003,26_podgrzewacze przepływowe,2022-09-11,PRZEPŁYWOWY PODGRZEWACZ - NADUMYLAWKOWY,1187,NaT,,,,,,,recency_only


In [40]:
OUT_PATH = "../data/interim/live_customer_product_state.parquet"
live.to_parquet(OUT_PATH, index=False)

print("Saved:", OUT_PATH)


Saved: ../data/interim/live_customer_product_state.parquet


In [41]:
# sanity: bottle-filter rows should have due_date for customers who bought filters
bf_rows = live["MATRIX GRUPA PRODUKTOWA"].eq(BOTTLE_FILTER_GROUP)
print("Bottle-filter rows:", bf_rows.sum())
print("Bottle-filter due_date missing rate:", live.loc[bf_rows, "due_date"].isna().mean())

# how many customers are overdue for bottle filters?
print(live.loc[bf_rows, "actionable_status"].value_counts())


Bottle-filter rows: 119602
Bottle-filter due_date missing rate: 0.0
actionable_status
overdue         94962
ok              22544
due_soon_14d     2096
Name: count, dtype: int64


## Summary and outputs

In this notebook we built a **live customer–product state table** designed for operational use
(e.g. marketing triggers, reminders, dashboards).

Key steps:
- Constructed a baseline live view showing **days since last purchase** for every customer
  and product group (including one-time buyers).
- Designed **equipment- and stock-aware due logic** for bottle filters:
  - accounted for filters bought explicitly,
  - included filters bundled with bottle purchases,
  - adjusted consumption by the number of bottles owned,
  - anchored expected replacement timing using historical population benchmarks.
- Produced actionable signals:
  - `due_date`
  - `days_to_due`
  - `status` (overdue / due soon / ok)

Output:
- `data/interim/live_customer_product_state.parquet`

This table is intended to be refreshed regularly (e.g. daily) and serves as the foundation
for dashboards, targeting, and future extensions to other consumable products.
