In [1]:
# ============================================
# Notebook 3 — Model Input Builder (V2)
# Block 1 — Setup, Paths, Helpers, Snapshot
# ============================================

import os, json, gc, hashlib, warnings, textwrap
from pathlib import Path
from datetime import datetime
from typing import Dict, Any, List

import numpy as np
import pandas as pd

# ---- Reproducibility & display
RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)
pd.set_option("display.max_columns", 120)
pd.set_option("display.width", 160)
pd.set_option("display.float_format", lambda x: f"{x:,.4f}")
warnings.filterwarnings("ignore")

# ---- RUN_ID & paths
RUN_ID = datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")

DATA_DIR_IN   = "clean"                                # outputs from NB01/NB02
DATA_DIR_OUT  = os.path.join(DATA_DIR_IN, "model_input")
REPORT_DIR    = os.path.join("clean", "report", "nb3")
PLOTS_DIR     = os.path.join(REPORT_DIR, "plots")
TABLES_DIR    = os.path.join(REPORT_DIR, "tables")

for d in [DATA_DIR_OUT, REPORT_DIR, PLOTS_DIR, TABLES_DIR]:
    os.makedirs(d, exist_ok=True)

# ---- Policy (consistent with NB02)
POLICY = {
    "salesdate_missing_in_time_series": "exclude",
    "salesdate_missing_in_global_kpis": "include"
}

# ---- Contract (official file names must not change)
OFFICIAL = {
    "daily":         os.path.join(DATA_DIR_OUT, "model_input_sales.parquet"),
    "daily_csv":     os.path.join(DATA_DIR_OUT, "model_input_sales.csv"),
    "tx_parquet":    os.path.join(DATA_DIR_OUT, "model_input_sales_transactions.parquet"),
    "tx_sample_csv": os.path.join(DATA_DIR_OUT, "model_input_sales_transactions_sample.csv"),
    # clustering inputs (not strict contracts, but we publish them)
    "cust_parquet":  os.path.join(DATA_DIR_OUT, "model_input_customers.parquet"),
    "cust_csv":      os.path.join(DATA_DIR_OUT, "model_input_customers.csv"),
    "full_feat_pq":  os.path.join(DATA_DIR_OUT, "model_features_customers_full.parquet"),
    "full_feat_csv": os.path.join(DATA_DIR_OUT, "model_features_customers_full.csv"),
    "rfm3_feat_pq":  os.path.join(DATA_DIR_OUT, "model_features_customers_rfm3.parquet"),
    "rfm3_feat_csv": os.path.join(DATA_DIR_OUT, "model_features_customers_rfm3.csv"),
    "feature_catalog": os.path.join(DATA_DIR_OUT, "feature_catalog.json"),
    "metadata":        os.path.join(DATA_DIR_OUT, "model_input_metadata.json"),
    "manifest":        os.path.join(DATA_DIR_OUT, "manifest.json"),
    "readme":          os.path.join(DATA_DIR_OUT, "README_ModelInputs.md"),
}

# ---- Required inputs
REQ = {
    "sales_enriched": os.path.join(DATA_DIR_IN, "sales_enriched.parquet"),
    "customers":      os.path.join(DATA_DIR_IN, "customers.parquet"),
    "products":       os.path.join(DATA_DIR_IN, "products.parquet"),
    "cities":         os.path.join(DATA_DIR_IN, "cities.parquet"),
    "countries":      os.path.join(DATA_DIR_IN, "countries.parquet"),
    # optional reuse
    "customers_rfm_opt": os.path.join(DATA_DIR_IN, "customers_rfm.parquet"),
}

def assert_required(req: Dict[str, str]) -> None:
    missing = [k for k, p in req.items() if k != "customers_rfm_opt" and not os.path.isfile(p)]
    if missing:
        raise FileNotFoundError(f"Missing required inputs in {DATA_DIR_IN}: {missing}")

def md5_of_file(path: str) -> str:
    h = hashlib.md5()
    with open(path, "rb") as f:
        for chunk in iter(lambda: f.read(1024 * 1024), b""):
            h.update(chunk)
    return h.hexdigest()

def atomic_write_df(df: pd.DataFrame, path: str, kind: str = "parquet") -> None:
    tmp = path + f".{RUN_ID}.tmp"
    if kind == "parquet":
        df.to_parquet(tmp, index=False)
    elif kind == "csv":
        df.to_csv(tmp, index=False)
    else:
        raise ValueError("Unsupported kind")
    os.replace(tmp, path)

def save_json(obj: Any, path: str) -> None:
    with open(path, "w", encoding="utf-8") as f:
        json.dump(obj, f, indent=2, ensure_ascii=False)

def dtype_string_map(s: pd.Series) -> str:
    # Normalize dtype names for schema reporting
    d = str(s.dtype)
    return d

# ---- Snapshot inputs
assert_required(REQ)
snapshot = {}
for k, p in REQ.items():
    e = os.path.isfile(p)
    if e:
        try:
            df = pd.read_parquet(p)
            snapshot[k] = {"path": p, "exists": True, "rows": int(len(df)), "cols": int(df.shape[1]), "md5": md5_of_file(p)}
            del df
        except Exception as ex:
            snapshot[k] = {"path": p, "exists": True, "error": str(ex)}
    else:
        snapshot[k] = {"path": p, "exists": False}

metadata = {
    "notebook": "Notebook 3 — Model Input Builder (V2)",
    "run_id": RUN_ID,
    "generated_at_utc": datetime.utcnow().isoformat(timespec="seconds"),
    "random_seed": RANDOM_SEED,
    "policy": POLICY,
    "inputs_snapshot": snapshot,
}

save_json(metadata, OFFICIAL["metadata"])
print("✅ Block 1 OK — inputs snapshot & metadata initialized.")


✅ Block 1 OK — inputs snapshot & metadata initialized.


In [2]:
# ============================================
# Block 1b — Environment snapshot (versions)
# ============================================
import sys, platform

def _safe_ver(mod):
    try:
        return __import__(mod).__version__
    except Exception:
        return None

env_versions = {
    "python": sys.version.split()[0],
    "platform": platform.platform(),
    "numpy": _safe_ver("numpy"),
    "pandas": _safe_ver("pandas"),
    "pyarrow": _safe_ver("pyarrow"),
    "scikit_learn": _safe_ver("sklearn"),
    "matplotlib": _safe_ver("matplotlib"),
    "seaborn": _safe_ver("seaborn"),
}

# Attach to metadata (idempotent)
with open(OFFICIAL["metadata"], "r", encoding="utf-8") as f:
    _meta = json.load(f)
_meta.setdefault("env", {})["versions"] = env_versions
save_json(_meta, OFFICIAL["metadata"])

print("[env] versions:", env_versions)


[env] versions: {'python': '3.12.7', 'platform': 'Windows-11-10.0.26100-SP0', 'numpy': '1.26.4', 'pandas': '2.2.2', 'pyarrow': '16.1.0', 'scikit_learn': '1.5.1', 'matplotlib': '3.9.2', 'seaborn': '0.13.2'}


In [3]:
# ============================================
# Block 2 — Load, Normalize, Health Checks
# ============================================

import numpy as np
import pandas as pd
from datetime import datetime

np.random.seed(42)

def log(msg): 
    print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {msg}")

log("Loading base tables…")
se = pd.read_parquet(REQ["sales_enriched"])
cu = pd.read_parquet(REQ["customers"])
pr = pd.read_parquet(REQ["products"])

log(f"sales_enriched: {len(se):,} rows × {se.shape[1]}")
log(f"customers:      {len(cu):,} rows × {cu.shape[1]}")
log(f"products:       {len(pr):,} rows × {pr.shape[1]}")

# -- Normalize key dtypes
se["SalesDate"] = pd.to_datetime(se["SalesDate"], errors="coerce")
for c in ["TotalPrice","Quantity","Discount","Price","UnitNetPrice","ComputedTotalPrice"]:
    if c in se.columns:
        se[c] = pd.to_numeric(se[c], errors="coerce")

for c in ["SalesID","CustomerID","ProductID"]:
    if c in se.columns:
        se[c] = pd.to_numeric(se[c], errors="coerce").astype("Int64")

# -- Health
salesdate_null_pct = se["SalesDate"].isna().mean()*100 if "SalesDate" in se.columns else np.nan
min_date = se["SalesDate"].min()
max_date = se["SalesDate"].max()
neg_total = int((se["TotalPrice"] < 0).sum()) if "TotalPrice" in se.columns else 0
nonpos_qty = int((se["Quantity"] <= 0).sum()) if "Quantity" in se.columns else 0
out_disc = int(((se["Discount"] < 0) | (se["Discount"] > 1)).sum()) if "Discount" in se.columns else 0

print(f"- SalesDate missing: {salesdate_null_pct:.2f}%")
print(f"- SalesDate range:   {min_date} → {max_date}")
print(f"- Negative TotalPrice: {neg_total:,}")
print(f"- Non-positive Quantity: {nonpos_qty:,}")
print(f"- Discount out of [0,1]: {out_disc:,}")

# -- Policy view for time series
SE_FULL = se.copy()
SE_TIME = se.dropna(subset=["SalesDate"]).copy()

# -- Persist checkpoint in metadata
with open(OFFICIAL["metadata"], "r", encoding="utf-8") as f:
    meta_so_far = json.load(f)

meta_so_far.setdefault("checkpoints", []).append({
    "block": 2,
    "rows_total": int(len(SE_FULL)),
    "rows_time_series_view": int(len(SE_TIME)),
    "salesdate_missing_pct": float(round(salesdate_null_pct, 4)),
    "date_min": str(min_date) if pd.notna(min_date) else None,
    "date_max": str(max_date) if pd.notna(max_date) else None,
})
save_json(meta_so_far, OFFICIAL["metadata"])

log("✅ Block 2 OK — normalized & checked.")


[2025-08-27 15:05:59] Loading base tables…
[2025-08-27 15:06:01] sales_enriched: 6,758,125 rows × 20
[2025-08-27 15:06:01] customers:      98,759 rows × 8
[2025-08-27 15:06:01] products:       452 rows × 15
- SalesDate missing: 1.00%
- SalesDate range:   2018-01-01 00:00:04.070000 → 2018-05-09 23:59:59.400000
- Negative TotalPrice: 0
- Non-positive Quantity: 0
- Discount out of [0,1]: 0
[2025-08-27 15:06:03] ✅ Block 2 OK — normalized & checked.


In [4]:
# =========================================================
# Block 3 — Build DAILY + TRANSACTION datasets (with guards)
# =========================================================

from datetime import datetime

def log(msg): 
    print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {msg}")

# ---------- A) DAILY aggregate
log("Building DAILY aggregate…")
daily_rev = (SE_TIME
             .set_index("SalesDate")
             .resample("D")["TotalPrice"]
             .sum()
             .rename("TotalRevenue"))

observed_days = (SE_TIME["SalesDate"].dt.floor("D")
                 .value_counts()
                 .rename_axis("SalesDate")
                 .rename("__n_obs__")
                 .sort_index())

full_idx = pd.date_range(daily_rev.index.min(), daily_rev.index.max(), freq="D")
daily = pd.DataFrame(index=full_idx)
daily["TotalRevenue"] = daily_rev.reindex(full_idx, fill_value=0.0)
daily = daily.join(observed_days, how="left").fillna({"__n_obs__": 0})
daily["observed"] = (daily["__n_obs__"] > 0).astype("int8")
daily.index.name = "SalesDate"
daily = daily.reset_index().drop(columns=["__n_obs__"])

# ---------- B) TRANSACTION table (lean, ML-ready)
log("Building TRANSACTION-level table…")
keep_cols = [
    "SalesID","SalesDate","CustomerID","ProductID","SalesPersonID","TransactionNumber",
    "TotalPrice","Quantity","Discount","Price","UnitNetPrice","ComputedTotalPrice",
    "Category","Class","Perishable","CityID","CityName","CountryID","CountryName"
]
keep_cols = [c for c in keep_cols if c in SE_FULL.columns]

tx = SE_FULL[keep_cols].copy()
# Domain fixes
if "Discount" in tx.columns:
    tx["Discount"] = pd.to_numeric(tx["Discount"], errors="coerce").clip(0, 1)
tx["TotalPrice"] = pd.to_numeric(tx.get("TotalPrice"), errors="coerce")

pre_rows = len(tx)
tx = tx.dropna(subset=["SalesDate","TotalPrice"])
post_rows = len(tx)

# ---------- C) CONTRACT CHECKS (abort overwrite if fail)
def check_daily_contract(df: pd.DataFrame) -> List[str]:
    errs = []
    must_have = ["SalesDate","TotalRevenue","observed"]
    missing = [c for c in must_have if c not in df.columns]
    if missing: errs.append(f"daily missing columns: {missing}")
    if "SalesDate" in df.columns and df["SalesDate"].isna().any():
        errs.append("daily has NaT SalesDate")
    if "observed" in df.columns:
        bad_obs = ~df["observed"].isin([0,1]).all()
        if bad_obs: errs.append("daily 'observed' not in {0,1}")
    return errs

def check_tx_contract(df: pd.DataFrame) -> List[str]:
    errs = []
    minimal = ["SalesID","SalesDate","CustomerID","ProductID","TotalPrice","Quantity","Discount"]
    missing = [c for c in minimal if c not in df.columns]
    if missing: errs.append(f"transactions missing columns: {missing}")
    if "SalesDate" in df.columns and df["SalesDate"].isna().any():
        errs.append("transactions contain NaT SalesDate")
    if "TotalPrice" in df.columns and (df["TotalPrice"] < 0).any():
        errs.append("transactions have negative TotalPrice")
    if "Discount" in df.columns and ((df["Discount"] < 0) | (df["Discount"] > 1)).any():
        errs.append("transactions have Discount outside [0,1]")
    return errs

# ---------- C0) Revenue reconciliation (guard)
# Make sure DAILY and TX agree on total revenue (prevents silent drift)
rev_daily = float(daily["TotalRevenue"].sum())
rev_tx    = float(tx["TotalPrice"].sum())

mismatch = []
if not np.isclose(rev_daily, rev_tx, rtol=1e-9, atol=1e-6):
    mismatch_msg = (f"revenue mismatch: sum(DAILY.TotalRevenue)={rev_daily:.6f} "
                    f"vs sum(TX.TotalPrice)={rev_tx:.6f}")
    mismatch = [mismatch_msg]

# ---------- C) CONTRACT CHECKS (abort overwrite if fail)
daily_errs = mismatch + check_daily_contract(daily)
tx_errs    = mismatch + check_tx_contract(tx)
all_errs   = daily_errs + tx_errs

if all_errs:
    # Write drafts to audit only, do NOT overwrite official artifacts
    draft_daily = os.path.join(REPORT_DIR, f"draft_daily_{RUN_ID}.parquet")
    draft_tx    = os.path.join(REPORT_DIR, f"draft_transactions_{RUN_ID}.parquet")
    daily.to_parquet(draft_daily, index=False)
    tx.to_parquet(draft_tx, index=False)
    save_json({"run_id": RUN_ID, "errors": all_errs}, os.path.join(REPORT_DIR, "contract_errors.json"))
    raise RuntimeError(f"[ABORT] Data contract failed. Drafts saved to report/nb3. Errors: {all_errs}")

# ---------- D) Atomic writes of OFFICIAL artifacts
atomic_write_df(daily, OFFICIAL["daily"], kind="parquet")
atomic_write_df(daily, OFFICIAL["daily_csv"], kind="csv")
atomic_write_df(tx,    OFFICIAL["tx_parquet"], kind="parquet")

# CSV sample for human inspection
sample_cap = 200_000
tx_sample = tx.sample(n=min(sample_cap, len(tx)), random_state=42) if len(tx) > sample_cap else tx
atomic_write_df(tx_sample, OFFICIAL["tx_sample_csv"], kind="csv")

# ---------- E) Row-counts table (audit)
row_counts = pd.DataFrame([
    {"artifact":"model_input_sales.parquet",               "rows": len(daily), "cols": daily.shape[1]},
    {"artifact":"model_input_sales_transactions.parquet",  "rows": len(tx),    "cols": tx.shape[1]},
    {"artifact":"sales_enriched (SE_FULL)",                "rows": len(SE_FULL), "cols": SE_FULL.shape[1]},
    {"artifact":"sales_enriched (SE_TIME non-null dates)", "rows": len(SE_TIME), "cols": SE_TIME.shape[1]},
])
row_counts.to_csv(os.path.join(TABLES_DIR, "row_counts.csv"), index=False)

# ---------- F) Schema & hashes (audit)
schema = {
    "run_id": RUN_ID,
    "daily": {
        "columns": [{ "name": c, "dtype": dtype_string_map(daily[c]) } for c in daily.columns],
        "date_min": str(daily["SalesDate"].min()),
        "date_max": str(daily["SalesDate"].max())
    },
    "transactions": {
        "columns": [{ "name": c, "dtype": dtype_string_map(tx[c]) } for c in tx.columns],
        "date_min": str(tx["SalesDate"].min()),
        "date_max": str(tx["SalesDate"].max())
    }
}
save_json(schema, os.path.join(REPORT_DIR, "schema_out.json"))

hashes = {
    "run_id": RUN_ID,
    "inputs": {k: v["md5"] for k, v in snapshot.items() if snapshot[k].get("md5")},
    "outputs": {
        "model_input_sales.parquet":              md5_of_file(OFFICIAL["daily"]),
        "model_input_sales.csv":                  md5_of_file(OFFICIAL["daily_csv"]),
        "model_input_sales_transactions.parquet": md5_of_file(OFFICIAL["tx_parquet"]),
        "model_input_sales_transactions_sample.csv": md5_of_file(OFFICIAL["tx_sample_csv"]),
    }
}
save_json(hashes, os.path.join(REPORT_DIR, "hashes.json"))

last_run = {
    "run_id": RUN_ID,
    "generated_at_utc": datetime.utcnow().isoformat(timespec="seconds"),
    "policy": POLICY,
    "daily": {
        "rows": int(len(daily)),
        "date_min": str(daily["SalesDate"].min()),
        "date_max": str(daily["SalesDate"].max()),
        "observed_days": int(daily["observed"].sum()),
        "zero_revenue_days": int((daily["TotalRevenue"] == 0).sum())
    },
    "transactions": {
        "rows": int(len(tx)),
        "dropped_missing_target_or_date": int(pre_rows - post_rows),
        "cols": int(tx.shape[1])
    }
}
save_json(last_run, os.path.join(REPORT_DIR, "last_run.json"))

print(f"Saved DAILY → {OFFICIAL['daily']} & {OFFICIAL['daily_csv']}")
print(f"Saved TX   → {OFFICIAL['tx_parquet']} (+ sample CSV)")
print("✅ Block 3 OK — contracts validated & audit written.")


[2025-08-27 15:06:03] Building DAILY aggregate…
[2025-08-27 15:06:08] Building TRANSACTION-level table…
Saved DAILY → clean\model_input\model_input_sales.parquet & clean\model_input\model_input_sales.csv
Saved TX   → clean\model_input\model_input_sales_transactions.parquet (+ sample CSV)
✅ Block 3 OK — contracts validated & audit written.


In [5]:
# ======================================================
# Block 4 — Customers RFM + Extras + Scaled (for NB04)
# ======================================================

from collections import Counter
from datetime import datetime

def log(msg): 
    print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {msg}")

log("Preparing customer-level features (RFM + extras)…")

# Optional dims
try:
    ci = pd.read_parquet(REQ["cities"])[["CityID","CityName","CountryID"]].drop_duplicates()
    co = pd.read_parquet(REQ["countries"])[["CountryID","CountryName"]].drop_duplicates()
except Exception:
    ci, co = None, None

# Prefer NB02 RFM if schema ok
rfm = None
if os.path.isfile(REQ["customers_rfm_opt"]):
    cand = pd.read_parquet(REQ["customers_rfm_opt"])
    expect = {"CustomerID","last_date","frequency","monetary","recency_days"}
    if expect.issubset(cand.columns):
        rfm = cand.copy()
        rfm["CustomerID"]  = pd.to_numeric(rfm["CustomerID"], errors="coerce").astype("Int64")
        rfm["last_date"]   = pd.to_datetime(rfm["last_date"], errors="coerce")
        for c in ["frequency","monetary","recency_days"]:
            rfm[c] = pd.to_numeric(rfm[c], errors="coerce")
        print(f"- Reused customers_rfm.parquet: {len(rfm):,} rows")
    else:
        print("- customers_rfm.parquet schema unexpected; recomputing.")

if rfm is None:
    # Build from SE_TIME
    txn = (SE_TIME.groupby(["SalesID","CustomerID"], as_index=False)
           .agg(SalesDate=("SalesDate","max"), TotalPrice=("TotalPrice","sum")))
    ref_date = SE_TIME["SalesDate"].max()
    rfm = (txn.groupby("CustomerID", as_index=False)
             .agg(last_date=("SalesDate","max"),
                  frequency=("SalesID","count"),
                  monetary=("TotalPrice","sum")))
    rfm["recency_days"] = (ref_date - rfm["last_date"]).dt.days.astype(float)

# Extras
rfm["avg_ticket"] = (rfm["monetary"] / rfm["frequency"].replace({0: np.nan})).fillna(0.0)

if "Discount" in SE_TIME.columns:
    disc_txn = (SE_TIME.assign(discounted=(SE_TIME["Discount"] > 0).astype(int))
                        .groupby(["SalesID","CustomerID"], as_index=False)
                        .agg(discounted=("discounted","max")))
    pct_disc = (disc_txn.groupby("CustomerID", as_index=False)
                        .agg(pct_discounted=("discounted","mean")))
    rfm = rfm.merge(pct_disc, on="CustomerID", how="left")
else:
    rfm["pct_discounted"] = np.nan

if "Category" in SE_TIME.columns:
    ncat = (SE_TIME.groupby("CustomerID")["Category"]
                  .nunique(dropna=True)
                  .rename("n_categories")
                  .reset_index())
    rfm = rfm.merge(ncat, on="CustomerID", how="left")
else:
    rfm["n_categories"] = np.nan

# top_city / top_country (mode)
geo = cu[["CustomerID","CityID"]].drop_duplicates()
if ci is not None and co is not None:
    geo = (geo.merge(ci, on="CityID", how="left", validate="many_to_one")
              .merge(co, on="CountryID", how="left", validate="many_to_one"))
    def _mode(s):
        s = s.dropna().astype(str)
        return Counter(s).most_common(1)[0][0] if not s.empty else np.nan
    top_geo = (geo.groupby("CustomerID", as_index=False)
                  .agg(top_city=("CityName", _mode),
                       top_country=("CountryName", _mode)))
    rfm = rfm.merge(top_geo, on="CustomerID", how="left")
else:
    rfm["top_city"] = np.nan
    rfm["top_country"] = np.nan

# Scaling
from sklearn.preprocessing import MinMaxScaler
base_feats = ["recency_days","frequency","monetary","avg_ticket","pct_discounted","n_categories"]
present   = [c for c in base_feats if c in rfm.columns]
scaler = MinMaxScaler()
scaled = scaler.fit_transform(rfm[present].fillna(0.0).values)
for i, c in enumerate(present):
    rfm[f"{c}_scaled"] = scaled[:, i]

# Save
atomic_write_df(rfm, OFFICIAL["cust_parquet"], kind="parquet")
atomic_write_df(rfm, OFFICIAL["cust_csv"],     kind="csv")

# Update metadata
with open(OFFICIAL["metadata"], "r", encoding="utf-8") as f:
    meta = json.load(f)
meta.setdefault("checkpoints", []).append({
    "block": 4,
    "model_input_customers": {
        "rows": int(len(rfm)),
        "cols": int(rfm.shape[1]),
        "features": present,
        "scaled_features": [f"{c}_scaled" for c in present]
    },
    "scaler": {
        "feature_order": present,
        "data_min_": [float(x) for x in scaler.data_min_],
        "data_max_": [float(x) for x in scaler.data_max_],
        "data_range_": [float(x) for x in scaler.data_range_],
    }
})
save_json(meta, OFFICIAL["metadata"])

print(f"Saved model_input_customers → {OFFICIAL['cust_parquet']} & {OFFICIAL['cust_csv']}")
print("✅ Block 4 OK — customer matrix ready.")


[2025-08-27 15:06:14] Preparing customer-level features (RFM + extras)…
- Reused customers_rfm.parquet: 98,759 rows
Saved model_input_customers → clean\model_input\model_input_customers.parquet & clean\model_input\model_input_customers.csv
✅ Block 4 OK — customer matrix ready.


In [6]:
# ============================================================
# Block 5 — Go/No-Go + Export Feature Matrices (FULL & RFM3)
# ============================================================

import pandas as pd
from datetime import datetime

def log(msg): 
    print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {msg}")

df = pd.read_parquet(OFFICIAL["cust_parquet"])

BASE   = ["recency_days","frequency","monetary","avg_ticket","pct_discounted","n_categories"]
SCALED = [f"{c}_scaled" for c in BASE if f"{c}_scaled" in df.columns]
RFM3   = [c for c in ["recency_days_scaled","frequency_scaled","monetary_scaled"] if c in df.columns]
id_col = "CustomerID"

# Go/No-Go
checks = []
def add_check(name, cond): checks.append({"Check": name, "Pass": bool(cond)})

add_check("≥ 1,000 customers", len(df) >= 1000)
add_check("≥ 5 scaled features", len(SCALED) >= 5)
add_check("1 row per CustomerID", df[id_col].isna().sum()==0 and df[id_col].nunique()==len(df))
nan_rates = df[SCALED].isna().mean() if SCALED else pd.Series([1.0])
add_check("No NaNs in scaled features", float(nan_rates.max()) == 0.0)
mins = df[SCALED].min() if SCALED else pd.Series([1.0])
maxs = df[SCALED].max() if SCALED else pd.Series([0.0])
eps = 1e-9
add_check("Scaled features in [0,1]", (mins >= -eps).all() and (maxs <= 1+eps).all())
add_check("RFM3 present", len(RFM3) == 3)

go_df = pd.DataFrame(checks)
display(go_df)

# ---------- Enforce Go/No-Go
if not bool(go_df["Pass"].all()):
    # Persist failing checklist for audit
    fail_csv = os.path.join(TABLES_DIR, f"go_no_go_{RUN_ID}.csv")
    go_df.to_csv(fail_csv, index=False)

    # Log into metadata with explicit FAIL status
    with open(OFFICIAL["metadata"], "r", encoding="utf-8") as f:
        _meta = json.load(f)
    _meta.setdefault("checkpoints", []).append({
        "block": 5,
        "go_no_go": go_df.to_dict(orient="records"),
        "status": "FAIL"
    })
    save_json(_meta, OFFICIAL["metadata"])

    raise RuntimeError(
        f"[ABORT] Go/No-Go failed; feature matrices NOT exported. See {fail_csv} and metadata."
    )

# If we reach here, all checks passed → proceed to export matrices


# Matrices
X_full = pd.concat([df[[id_col]], df[SCALED]], axis=1)
X_rfm3 = pd.concat([df[[id_col]], df[RFM3]], axis=1)

atomic_write_df(X_full, OFFICIAL["full_feat_pq"], kind="parquet")
atomic_write_df(X_full, OFFICIAL["full_feat_csv"], kind="csv")
atomic_write_df(X_rfm3, OFFICIAL["rfm3_feat_pq"], kind="parquet")
atomic_write_df(X_rfm3, OFFICIAL["rfm3_feat_csv"], kind="csv")

catalog = {
    "row_count": int(len(df)),
    "id_column": id_col,
    "features_full": SCALED,
    "features_rfm3": RFM3,
    "nan_rates_scaled": {k: float(v) for k, v in nan_rates.to_dict().items()} if not nan_rates.empty else {},
    "bounds_scaled": {c: {"min": float(mins[c]), "max": float(maxs[c])} for c in SCALED} if len(SCALED)>0 else {},
    "notes": [
        "Scaled with MinMax on current population.",
        "RFM3 = recency_days_scaled, frequency_scaled, monetary_scaled."
    ]
}
save_json(catalog, OFFICIAL["feature_catalog"])

# Update metadata
with open(OFFICIAL["metadata"], "r", encoding="utf-8") as f:
    meta = json.load(f)
meta.setdefault("checkpoints", []).append({
    "block": 5,
    "go_no_go": go_df.to_dict(orient="records"),
    "features": {"full": SCALED, "rfm3": RFM3, "rows": int(len(df))}
})
save_json(meta, OFFICIAL["metadata"])

print("✅ Block 5 OK — feature matrices exported.")


Unnamed: 0,Check,Pass
0,"≥ 1,000 customers",True
1,≥ 5 scaled features,True
2,1 row per CustomerID,True
3,No NaNs in scaled features,True
4,"Scaled features in [0,1]",True
5,RFM3 present,True


✅ Block 5 OK — feature matrices exported.


In [7]:
# ============================================
# Block 6 — Manifest + README + Final Checkpoint
# ============================================

import pyarrow.parquet as pq

def summarize_parquet(path: Path):
    info = {"size_bytes": path.stat().st_size}
    try:
        m = pq.ParquetFile(path).metadata
        info["rows"] = m.num_rows
        info["cols"] = m.num_columns
    except Exception:
        try:
            df = pd.read_parquet(path)
            info["rows"], info["cols"] = int(df.shape[0]), int(df.shape[1])
        except Exception:
            pass
    return info

def artifact_entry(p: Path):
    ftype = p.suffix.lstrip(".").lower()
    entry = {"file": str(p), "name": p.name, "type": ftype, "exists": p.exists()}
    if entry["exists"] and ftype == "parquet":
        entry.update(summarize_parquet(p))
    if entry["exists"] and ftype in {"csv","json","md"}:
        entry["size_bytes"] = p.stat().st_size
    return entry

# Manifest
expected = [Path(v) for v in OFFICIAL.values()]
seen = set(p.name for p in expected)
for p in Path(DATA_DIR_OUT).iterdir():
    if p.suffix.lower() in {".parquet",".csv",".json",".md"} and p.name not in seen:
        expected.append(p); seen.add(p.name)

manifest = {"run_id": RUN_ID, "generated_at": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"), "artifacts": []}
for p in expected:
    manifest["artifacts"].append(artifact_entry(Path(p)))

save_json(manifest, OFFICIAL["manifest"])
print(f"[OK] Manifest saved → {OFFICIAL['manifest']}")

# Final checkpoint
with open(OFFICIAL["metadata"], "r", encoding="utf-8") as f:
    meta = json.load(f)

final_ckpt = {
    "block": 6,
    "status": "complete",
    "artifacts_count": len(manifest["artifacts"]),
    "ready_for": {
        "forecasting_random_forest": "model_input_sales_transactions.parquet",
        "forecasting_timeseries_baseline": "model_input_sales.parquet",
        "clustering_full": "model_features_customers_full.parquet",
        "clustering_rfm3": "model_features_customers_rfm3.parquet",
    }
}
meta.setdefault("checkpoints", []).append(final_ckpt)
save_json(meta, OFFICIAL["metadata"])

# README
readme_text = textwrap.dedent(f"""
# Model Inputs — Notebook 3 (V2)
Run: {RUN_ID}

## Official artifacts
- **model_input_sales.parquet/csv** — Daily `SalesDate`, `TotalRevenue`, `observed` (0/1).
- **model_input_sales_transactions.parquet** (+ sample CSV) — Transaction-level ML dataset.
- **model_input_customers.parquet/csv** — RFM + extras + scaled columns.
- **model_features_customers_full / _rfm3** — Tidy matrices for K-Means.
- **feature_catalog.json** — Feature list, NaNs, bounds.
- **model_input_metadata.json** — All checkpoints & snapshots.
- **manifest.json** — Presence, sizes, row/col counts.

## Data-contract guards
- DAILY must contain: `SalesDate` (no NaT), `TotalRevenue`, `observed` ∈ {{0,1}}.
- TRANSACTIONS must contain: `SalesID`, `SalesDate`, `CustomerID`, `ProductID`, `TotalPrice`, `Quantity`, `Discount`.
- If any guard fails, official files are not overwritten — drafts under `clean/report/nb3/`.

## Policy
- Exclude null `SalesDate` rows from time-based metrics; seed=42; run_id={RUN_ID}.
""").strip()
with open(OFFICIAL["readme"], "w", encoding="utf-8") as f:
    f.write(readme_text + "\n")
print(f"[OK] README written → {OFFICIAL['readme']}")

# Console summary
missing = [a["file"] for a in manifest["artifacts"] if not a["exists"]]
print("\n=== SUMMARY ===")
print(f"Artifacts found: {len(manifest['artifacts'])} | Missing: {len(missing)}")
for a in manifest["artifacts"]:
    if a["exists"] and a["type"] == "parquet":
        print(f"- {a['file']}: rows={a.get('rows')}, cols={a.get('cols')}, size={a['size_bytes']} bytes")

print("\n✅ Notebook 3 (V2) complete — contracts validated & audit ready.")


[OK] Manifest saved → clean\model_input\manifest.json
[OK] README written → clean\model_input\README_ModelInputs.md

=== SUMMARY ===
Artifacts found: 15 | Missing: 0
- clean\model_input\model_input_sales.parquet: rows=129, cols=3, size=4694 bytes
- clean\model_input\model_input_sales_transactions.parquet: rows=6690599, cols=15, size=318517937 bytes
- clean\model_input\model_input_customers.parquet: rows=98759, cols=16, size=6090888 bytes
- clean\model_input\model_features_customers_full.parquet: rows=98759, cols=7, size=2935971 bytes
- clean\model_input\model_features_customers_rfm3.parquet: rows=98759, cols=4, size=1753534 bytes

✅ Notebook 3 (V2) complete — contracts validated & audit ready.


In [8]:
# --- NB03 add-on: hashes.json + row_counts.csv (audit hardening) ---
import os, json, hashlib
from pathlib import Path
import pandas as pd

BASE = Path("clean/model_input")
REPORT_DIR = Path("clean/report/nb3/tables")
REPORT_DIR.mkdir(parents=True, exist_ok=True)

files = [
    "model_input_sales.parquet",
    "model_input_sales_transactions.parquet",
    "model_input_customers.parquet",
    "model_features_customers_full.parquet",
    "model_features_customers_rfm3.parquet",
]

def md5sum(path, blocksize=1<<20):
    h = hashlib.md5()
    with open(path, "rb") as f:
        for chunk in iter(lambda: f.read(blocksize), b""):
            h.update(chunk)
    return h.hexdigest()

# 1) hashes.json
hashes = {}
for fname in files:
    p = BASE / fname
    if p.exists():
        hashes[fname] = md5sum(p)
with open(BASE / "hashes.json", "w", encoding="utf-8") as f:
    json.dump(hashes, f, indent=2)
print(f"[audit] wrote {BASE/'hashes.json'}")

# 2) row_counts.csv
rows = []
for fname in files:
    p = BASE / fname
    if p.exists() and p.suffix == ".parquet":
        try:
            import pyarrow.parquet as pq
            m = pq.ParquetFile(p).metadata
            rows.append({"file": str(p), "rows": m.num_rows, "cols": m.num_columns})
        except Exception:
            df = pd.read_parquet(p)
            rows.append({"file": str(p), "rows": len(df), "cols": df.shape[1]})
rc = pd.DataFrame(rows)
rc.to_csv(REPORT_DIR / "row_counts.csv", index=False)
print(f"[audit] wrote {REPORT_DIR/'row_counts.csv'}")


[audit] wrote clean\model_input\hashes.json
[audit] wrote clean\report\nb3\tables\row_counts.csv
