In [29]:
import os
import yaml

# -----------------------------------------------------------------------------
# CONFIGURATION DICTIONARY
# -----------------------------------------------------------------------------
# Defines all the inputs, column mappings, cost assumptions, and modeling
# parameters needed for the pricing & shipping optimization pipeline.
# -----------------------------------------------------------------------------

CONFIG = {
    # -------------------------------------------------------------------------
    # File paths for inputs
    # -------------------------------------------------------------------------
    "paths": {
        "amazon_sales": "Amazon Sale Report.csv",              # Amazon transactions
        "intl_sales": "International sale Report.csv",         # International transactions
        "price_catalogs": ["May-2022.csv", "P  L March 2021.csv"],  # SKU cost catalogs
        "warehouse_costs": "Cloud Warehouse Compersion Chart.csv",  # Storage & warehouse info
        "inventory": "Sale Report.csv",                        # Local inventory/sales
    },

    # -------------------------------------------------------------------------
    # Column alias mapping
    # -------------------------------------------------------------------------
    # Handles messy/inconsistent headers across different CSV exports.
    # Each key maps to a list of possible header names we might encounter.
    "columns": {
        "date": ["Date", "DATE"],
        "sku": ["SKU", "SKU Code", "Sku"],
        "style": ["Style", "Style Id", "Design No."],
        "category": ["Category"],
        "channel": ["Sales Channel ", "CUSTOMER"],
        "qty": ["Qty", "PCS"],
        "amount": ["Amount", "GROSS AMT"],  # used to back-calc price if missing
        "price": [],                        # empty → derive from amount/qty if needed
        "size": ["Size"],
        "status": ["Status"],
        "city": ["ship-city"],
        "state": ["ship-state"],
        "country": ["ship-country"],
        "asin": ["ASIN"],
        "b2b": ["B2B"],
    },

    # -------------------------------------------------------------------------
    # Cost model assumptions
    # -------------------------------------------------------------------------
    # Used when deriving per-unit cost and margins.
    "cost_model": {
        "default_unit_cost": 250.0,          # fallback cost per unit (₹)
        "outbound_pick_pack": 7.0,           # outbound logistics (₹/unit)
        "inbound_per_unit": 4.0,             # inbound logistics (₹/unit)
        "storage_per_cft_per_day": 0.15,     # storage cost (₹ per cubic foot/day)
        "return_qc_per_unit": 15.5,          # quality check cost per return (₹/unit)
        "shipping_flat": 40.0,               # flat shipping cost (₹/unit)
        "return_rate": 0.05,                 # expected return rate (5%)
    },

    # -------------------------------------------------------------------------
    # Modeling parameters
    # -------------------------------------------------------------------------
    "modeling": {
        "min_obs_per_sku": 20,               # minimum observations required per SKU
        "min_price_cv": 0.05,                # minimum price variation (CV) to trust elasticity
        "group_level": "sku",                # modeling granularity: "sku" or "category"
        "price_grid_pct": [                  # candidate price change grid (% deltas)
            -0.3, -0.2, -0.1, 0.0,
             0.05, 0.1, 0.2, 0.3
        ],
        "top_k_plots": 5,                    # how many top categories to plot
    }
}

# -----------------------------------------------------------------------------
# Write CONFIG to disk for reproducibility
# -----------------------------------------------------------------------------
os.makedirs("artifacts", exist_ok=True)  # ensure artifacts folder exists
with open("config_notebook.yaml", "w") as fh:
    yaml.safe_dump(CONFIG, fh)

print("Wrote config_notebook.yaml")


Wrote config_notebook.yaml


In [30]:
from __future__ import annotations

from typing import Dict, List, Optional, Iterable, Union
from pathlib import Path
import warnings

import numpy as np
import pandas as pd

# Silence a noisy, known deprecation warning
warnings.filterwarnings("ignore", message="The argument 'infer_datetime_format' is deprecated")


PathLike = Union[str, Path]


def safe_read_csv(
    path: PathLike,
    *,
    na_values: Iterable[str] = ("", "NA", "NaN", "null", "None"),
    usecols: Optional[Iterable[str]] = None,
) -> pd.DataFrame:
    """
    Read *messy* CSVs by progressively relaxing parser options.

    Strategy:
      1) Try common encodings (utf-8 / utf-8-sig) with C engine
      2) Fall back to python engine (sniffer, custom quote/escape)
      3) Last resort: skip bad lines to salvage data

    Parameters
    ----------
    path : str | Path
        File path to CSV.
    na_values : Iterable[str], optional
        Additional strings to treat as NA (merged with pandas defaults).
    usecols : Iterable[str], optional
        Subset of columns to read (faster, memory friendly).

    Returns
    -------
    pd.DataFrame
        Parsed dataframe; may be empty if the file is malformed.
    """
    path = str(path)

    attempts = [
        dict(low_memory=False, encoding="utf-8",     usecols=usecols, na_values=na_values),
        dict(low_memory=False, encoding="utf-8-sig", usecols=usecols, na_values=na_values),
        dict(low_memory=False, engine="python", encoding="utf-8",     usecols=usecols, na_values=na_values),
        dict(low_memory=False, engine="python", encoding="utf-8-sig", usecols=usecols, na_values=na_values),
        dict(low_memory=False, engine="python", encoding="utf-8", quotechar='"', escapechar="\\",
             usecols=usecols, na_values=na_values),
        # sep=None triggers the csv sniffer (python engine only)
        dict(low_memory=False, engine="python", encoding="utf-8", sep=None, dtype=str,
             usecols=usecols, na_values=na_values),
        dict(low_memory=False, engine="python", encoding="utf-8", lineterminator="\n",
             usecols=usecols, na_values=na_values),
        # last resort: skip bad lines to salvage good rows
        dict(low_memory=False, engine="python", encoding="utf-8", on_bad_lines="skip",
             usecols=usecols, na_values=na_values),
    ]

    last_err: Optional[Exception] = None
    for kw in attempts:
        try:
            return pd.read_csv(path, **kw)
        except Exception as e:
            last_err = e
            continue

    warnings.warn(f"[safe_read_csv] Falling back to skip bad lines for {path} due to: {last_err!r}")
    return pd.read_csv(path, engine="python", on_bad_lines="skip", low_memory=False,
                       usecols=usecols, na_values=na_values)


def _first_present(df: pd.DataFrame, candidates: List[str]) -> Optional[str]:
    """
    Return the first column name from `candidates` that exists in `df`,
    else None. Useful for messy header aliases.
    """
    for c in candidates or []:
        if c in df.columns:
            return c
    return None


def _parse_dates(series: pd.Series) -> pd.Series:
    """
    Parse date-ish strings robustly.

    Approach:
      - Try pandas default parsing
      - If too many NaT (>40%), attempt a few common explicit formats
      - Return the best-performing parse

    Returns
    -------
    pd.Series[datetime64[ns]]
    """
    s0 = pd.to_datetime(series, errors="coerce")
    best = s0
    best_good = s0.notna().sum()

    # If default parse succeeds for >=60% rows, accept it
    if best_good >= 0.6 * len(series):
        return best

    # Otherwise try explicit formats commonly seen in exports
    fmt_try = ["%m-%d-%y", "%d-%m-%y", "%Y-%m-%d", "%d/%m/%Y", "%m/%d/%Y"]
    for fmt in fmt_try:
        s1 = pd.to_datetime(series, errors="coerce", format=fmt)
        good = s1.notna().sum()
        if good > best_good:
            best, best_good = s1, good
    return best


def _clean_numeric(series: pd.Series) -> pd.Series:
    """
    Coerce currency/amount-like strings to numeric.

    Steps:
      - Strip common currency signs and spaces (₹, $, commas)
      - Remove all non [0-9 . -]
      - Convert to float; invalid → NaN
    """
    s = series.astype(str)
    s = s.str.replace(r"[₹$, ]", "", regex=True)
    s = s.str.replace(r"[^0-9\.\-]", "", regex=True)
    return pd.to_numeric(s, errors="coerce")


def _boolify(series: pd.Series) -> pd.Series:
    """
    Convert common truthy strings to booleans.

    Recognized truthy: {"true","1","yes","y"} (case/space-insensitive)
    Everything else becomes False (including NaN).
    """
    s = series.astype(str).str.strip().str.lower()
    return s.isin({"true", "1", "yes", "y"})


In [31]:
from typing import Dict, List, Optional
import pandas as pd

def derive_costs(sales: pd.DataFrame, cfg: Dict, catalogs: Optional[List[pd.DataFrame]] = None) -> pd.DataFrame:
    """
    Enrich a sales transaction DataFrame with cost information.

    Parameters
    ----------
    sales : pd.DataFrame
        Transaction-level sales data. Must contain a 'sku' column.
    cfg : dict
        Configuration dictionary containing 'cost_model' assumptions.
    catalogs : list[pd.DataFrame], optional
        Optional list of catalog DataFrames with SKU-level cost columns
        (e.g., 'TP', 'TP1', 'TP2'). Used to override default cost per SKU.

    Returns
    -------
    pd.DataFrame
        Copy of `sales` with added columns:
        - unit_cost   : per-SKU cost (from catalogs if available, else default)
        - ship_cost   : flat shipping cost per unit
        - return_rate : expected return probability
        - return_cost : expected per-unit return QC cost × return_rate
    """
    # --- Extract cost assumptions from config ---
    cm = cfg.get("cost_model", {})
    default_unit_cost = float(cm.get("default_unit_cost", 250.0))
    ship_flat        = float(cm.get("shipping_flat", 40.0))
    return_rate      = float(cm.get("return_rate", 0.05))
    return_qc_cost   = float(cm.get("return_qc_per_unit", 15.5))

    # Start with all SKUs using default cost
    unit_cost = pd.Series(default_unit_cost, index=sales.index, dtype=float)

    # --- Override with catalog costs if provided ---
    if catalogs:
        # Merge all catalog files
        cat = pd.concat(catalogs, ignore_index=True, sort=False)

        # Try to find SKU column in catalogs (case-insensitive)
        sku_candidates = [c for c in cat.columns if str(c).strip().lower() == "sku"]
        sku_col = sku_candidates[0] if sku_candidates else None

        # Look for cost columns (TP, TP1, TP2, case-insensitive, space-insensitive)
        cost_cols = [
            c for c in cat.columns
            if str(c).strip().upper().replace(" ", "") in {"TP", "TP1", "TP2"}
        ]
        if not cost_cols:
            cost_cols = [
                c for c in cat.columns
                if str(c).strip().lower().replace(" ", "") in {"tp", "tp1", "tp2"}
            ]

        if sku_col and cost_cols:
            # Convert cost columns to numeric, take min across them
            cc = cat[cost_cols].apply(pd.to_numeric, errors="coerce")
            cat["_cost"] = cc.min(axis=1)

            # Build lookup table: SKU → min cost
            lut = (
                cat[[sku_col, "_cost"]]
                .dropna()
                .assign(sku=lambda x: x[sku_col].astype(str).str.strip())
                .groupby("sku", as_index=True)["_cost"]
                .min()  # unique cost per SKU
            )

            # Map catalog costs to sales SKUs, fall back to default
            mapped = sales["sku"].astype(str).str.strip().map(lut)
            unit_cost = mapped.fillna(default_unit_cost)

    # --- Final enriched DataFrame ---
    out = sales.copy()
    out["unit_cost"]   = unit_cost
    out["ship_cost"]   = ship_flat
    out["return_rate"] = return_rate
    out["return_cost"] = return_qc_cost * return_rate

    return out


In [32]:
from __future__ import annotations

from dataclasses import dataclass
from typing import Optional
import numpy as np
import pandas as pd
import statsmodels.api as sm


@dataclass
class ModelConfig:
    """
    Configuration for elasticity estimation & filtering.
    """
    min_obs_per_sku: int = 20         # minimum rows per group (SKU/category)
    min_price_cv: float = 0.05        # minimum price variability (coefficient of variation)
    group_level: str = "sku"          # 'sku' or 'category'


def _cv(x: pd.Series) -> float:
    """
    Coefficient of Variation (std/mean) with safe coercion.
    Returns 0.0 if mean <= 0 or not enough data.
    """
    x = pd.to_numeric(x, errors="coerce").dropna()
    if len(x) < 2:
        return 0.0
    m = float(x.mean())
    if m <= 0:
        return 0.0
    s = float(x.std(ddof=1))
    return float(s / m) if m > 0 else 0.0


def estimate_group_elasticity(df: pd.DataFrame, group: str) -> pd.DataFrame:
    """
    Estimate log-log price elasticity per group using OLS:

        ln(qty) ~ ln(price) + weekday_dummies + month_dummies + const

    Parameters
    ----------
    df : DataFrame
        Must include columns: ['date','qty','price', group]
    group : str
        Column name to group by (e.g., 'sku' or 'category').

    Returns
    -------
    DataFrame with columns:
        [group, n_obs, price_cv, elasticity, tstat, r2, avg_price, avg_qty]
    """
    rows = []

    # Guard: keep positive qty/price and valid dates
    d = df.copy()
    d = d[(pd.to_numeric(d["qty"], errors="coerce") > 0) &
          (pd.to_numeric(d["price"], errors="coerce") > 0)].dropna(subset=["date", "qty", "price"])
    if d.empty:
        return pd.DataFrame(rows)

    # Core regressors
    d["ln_q"] = np.log(pd.to_numeric(d["qty"], errors="coerce"))
    d["ln_p"] = np.log(pd.to_numeric(d["price"], errors="coerce"))

    # Calendar controls (weekday, month)
    d["weekday"] = d["date"].dt.weekday
    d["month"] = d["date"].dt.month

    for g_val, part in d.groupby(group, dropna=False):
        part = part.dropna(subset=["ln_q", "ln_p"])
        n = len(part)
        if n < 5:
            continue

        # Require some price variation within the group
        pcv = _cv(part["price"])
        if pcv < 1e-9:
            continue

        # Design matrix: ln_p + dummies
        X = pd.get_dummies(
            part[["ln_p", "weekday", "month"]].astype(float),
            columns=["weekday", "month"],
            drop_first=True
        )
        X = sm.add_constant(X, has_constant="add")
        y = part["ln_q"].astype(float)

        try:
            model = sm.OLS(y, X, missing="drop").fit()

            # Extract key stats safely
            beta = float(model.params.get("ln_p", np.nan))
            tstat = float(model.tvalues.get("ln_p", np.nan))
            r2 = float(model.rsquared)

            rows.append({
                group: g_val,
                "n_obs": int(n),
                "price_cv": float(pcv),
                "elasticity": beta,         # price elasticity (β on ln(price))
                "tstat": tstat,             # t-stat of elasticity
                "r2": r2,                   # model fit
                "avg_price": float(part["price"].mean()),
                "avg_qty": float(part["qty"].mean()),
            })
        except Exception:
            # Skip pathological groups (e.g., singular matrix after dummies)
            continue

    return pd.DataFrame(rows)


def filter_quality_safe(df: Optional[pd.DataFrame], cfg: ModelConfig) -> pd.DataFrame:
    """
    Apply conservative quality filters to an elasticity table.
    - Keep groups with at least `min_obs_per_sku` observations
    - Keep groups with price CV >= `min_price_cv`
    - Sort by elasticity (most negative first) if present
    """
    if df is None or len(df) == 0:
        print("[info] Elasticity table is empty; skipping quality filters.")
        return pd.DataFrame(columns=[
            cfg.group_level, "n_obs", "price_cv", "elasticity", "tstat", "r2", "avg_price", "avg_qty"
        ])

    out = df.copy()
    if "n_obs" in out.columns:
        out = out[out["n_obs"] >= cfg.min_obs_per_sku]
    if "price_cv" in out.columns:
        out = out[out["price_cv"] >= cfg.min_price_cv]

    return out.sort_values(["elasticity"]) if "elasticity" in out.columns else out


In [33]:

from __future__ import annotations

from pathlib import Path
from typing import Iterable, Optional, Dict, List
import numpy as np
import pandas as pd

REQUIRED_COLS = ["date", "sku", "category", "qty", "price"]

def _normalize_sales(df: pd.DataFrame, CFG: Dict, *, source_name: Optional[str] = None) -> pd.DataFrame:
    """
    Normalize a raw sales export into a canonical schema:
      ['date','sku','category','qty','price'].

    - Resolves messy headers via CFG['columns'] aliases
    - Coerces date/qty/price
    - Derives price from Amount/Qty when explicit price is missing
    - Drops rows with non-positive qty/price and NaT dates
    """
    cols = CFG["columns"]

    # (1) Make headers robust: strip and keep original case for matching via alias list
    df = df.copy()
    df.columns = [str(c).strip() for c in df.columns]

    def pick(name: str) -> Optional[str]:
        # Return the first candidate that appears in df columns
        cands = cols.get(name, []) or []
        for c in cands:
            if c in df.columns:
                return c
        return None

    out = pd.DataFrame(index=df.index).copy()

    # DATE
    c = pick("date")
    out["date"] = _parse_dates(df[c]) if c is not None else pd.NaT

    # SKU
    c = pick("sku")
    out["sku"] = df[c].astype(str).str.strip() if c is not None else ""

    # CATEGORY
    c = pick("category")
    out["category"] = df[c].astype(str).str.strip() if c is not None else "Uncategorized"
    out["category"] = (
        out["category"]
        .replace({"": "Uncategorized", "nan": "Uncategorized", "None": "Uncategorized"})
        .fillna("Uncategorized")
    )

    # QTY
    c = pick("qty")
    out["qty"] = _clean_numeric(df[c]).fillna(0.0) if c is not None else 0.0

    # PRICE (prefer explicit; else derive from Amount / Qty)
    price_col = pick("price")
    amount_col = pick("amount")

    price_raw = _clean_numeric(df[price_col]) if price_col is not None else pd.Series(np.nan, index=df.index)
    amount    = _clean_numeric(df[amount_col]) if amount_col is not None else pd.Series(np.nan, index=df.index)

    price = price_raw.copy()
    needs_deriv = price.isna() | (price <= 0)
    with np.errstate(divide="ignore", invalid="ignore"):
        derived = amount / out["qty"].replace({0: np.nan})
    price = price.where(~needs_deriv, derived)
    out["price"] = price

    # Keep only required cols and basic filters
    out = out[REQUIRED_COLS].copy()
    out["date"]  = pd.to_datetime(out["date"], errors="coerce")
    out["qty"]   = pd.to_numeric(out["qty"], errors="coerce")
    out["price"] = pd.to_numeric(out["price"], errors="coerce")

    out = out.dropna(subset=["date"])
    out = out[(out["qty"] > 0) & (out["price"] > 0)]

    # attach source for lineage/debug
    if source_name is not None:
        out["source"] = source_name

    return out


def load_all(CFG: Dict) -> pd.DataFrame:
    """
    Load all configured sales sources, normalize, and concatenate.
    Returns a well-typed DataFrame with REQUIRED_COLS (and optional 'source').
    """
    paths = CFG["paths"]
    sales_sources = [
        ("amazon_sales",   paths.get("amazon_sales")),
        ("intl_sales",     paths.get("intl_sales")),
        ("inventory",      paths.get("inventory")),  # include only if transactional
    ]

    frames: List[pd.DataFrame] = []
    for name, p in sales_sources:
        if not p:
            continue
        fp = Path(p)
        if not fp.exists():
            print(f"[info] File not found, skipping: {p}")
            continue
        try:
            df_raw  = safe_read_csv(fp)
            df_norm = _normalize_sales(df_raw, CFG, source_name=name)
            if len(df_norm):
                frames.append(df_norm)
            else:
                print(f"[info] No usable rows after normalization: {p}")
        except Exception as e:
            print(f"[warn] Skipped {p}: {e}")

    if not frames:
        # Well-typed empty frame so downstream code doesn't crash
        empty = pd.DataFrame(columns=REQUIRED_COLS)
        empty["date"] = pd.to_datetime(empty["date"])
        for c in ["sku", "category"]:
            empty[c] = empty[c].astype("string")
        for c in ["qty", "price"]:
            empty[c] = empty[c].astype("float")
        return empty

    sales = pd.concat(frames, ignore_index=True, sort=False)

    #  tiny dedup pass: same date/sku/price/qty rows
    sales = sales.drop_duplicates(subset=["date", "sku", "category", "qty", "price"])

    return sales


def load_catalogs(CFG: Dict):
    """
    Load price catalog files (for unit_cost inference).
    Looks for TP/TP1/TP2 columns.
    Returns: list[pd.DataFrame] or None if none found.
    """
    catalogs = []
    for p in CFG["paths"].get("price_catalogs", []) or []:
        if not p:
            continue
        fp = Path(p)
        if not fp.exists():
            print(f"[info] Catalog not found, skipping: {p}")
            continue
        try:
            catalogs.append(safe_read_csv(fp))
        except Exception as e:
            print(f"[warn] Skipped catalog {p}: {e}")

    return catalogs if catalogs else None




In [34]:
# ========================= DIAGNOSTICS =========================
import pandas as pd
import numpy as np
import sys

def _exists(name: str) -> bool:
    return name in globals() and globals()[name] is not None

# 1) Ensure sales_costed exists (recreate if missing)
if not _exists("sales_costed"):
    try:
        with open("config_notebook.yaml", "r") as fh:
            CFG = yaml.safe_load(fh)
    except FileNotFoundError:
        raise FileNotFoundError(
            "[fatal] config_notebook.yaml not found. Run the CONFIG cell that writes it."
        )

    # Check loader availability
    missing = [fn for fn in ("load_all", "load_catalogs", "derive_costs") if fn not in globals()]
    if missing:
        raise NameError(
            f"[fatal] Missing functions: {', '.join(missing)}. "
            f"Define/import the minimal loaders and derive_costs before running diagnostics."
        )

    # Load data & derive costs
    sales = load_all(CFG)
    catalogs = load_catalogs(CFG)
    sales_costed = derive_costs(sales, CFG, catalogs)

# 2) Guard: empty dataset -> short-circuit with helpful message
if sales_costed is None or len(sales_costed) == 0:
    print("[info] sales_costed is empty. Check that source CSVs exist and header aliases match your CONFIG.")
    display(pd.DataFrame({
        "checklist": [
            "Do input CSVs exist in the working directory?",
            "Do column aliases in CONFIG['columns'] match your headers?",
            "Does load_all() return any rows after normalization filters?",
        ]
    }))
else:
    # 3) Basic shape / quality
    print("Rows:", len(sales_costed))
    # Ensure required cols exist
    required = ["date","sku","category","qty","price"]
    missing_cols = [c for c in required if c not in sales_costed.columns]
    if missing_cols:
        raise KeyError(f"[fatal] sales_costed missing columns: {missing_cols}")

    print("Date null %:", sales_costed["date"].isna().mean().round(4))
    print("Qty>0 & Price>0 rows:", ((pd.to_numeric(sales_costed["qty"], errors="coerce")>0) &
                                    (pd.to_numeric(sales_costed["price"], errors="coerce")>0)).sum())
    print("Unique SKUs:", sales_costed["sku"].nunique())
    print("Unique Categories:", sales_costed["category"].nunique())

    # 4) Helper: coefficient of variation
    def cv(x: pd.Series):
        x = pd.to_numeric(x, errors="coerce").dropna()
        if len(x) < 2:
            return np.nan
        m = x.mean();
        if m <= 0:
            return np.nan
        s = x.std(ddof=1)
        return float(s/m) if m > 0 else np.nan

    # 5) CV by SKU / Category (price variation tells if elasticity is estimable)
    sku_cv = (sales_costed.groupby("sku", dropna=False)["price"]
              .apply(cv).rename("price_cv").reset_index())
    cat_cv = (sales_costed.groupby("category", dropna=False)["price"]
              .apply(cv).rename("price_cv").reset_index())

    # Share of groups with any price variation
    sku_any = (sku_cv["price_cv"].fillna(0) > 0).mean().round(3)
    cat_any = (cat_cv["price_cv"].fillna(0) > 0).mean().round(3)
    print(f"\n% SKUs with any price variation (cv>0): {sku_any}")
    print(f"% Categories with any price variation: {cat_any}")

    # 6) Show the 10 lowest-CV SKUs/Categories (useful to spot flat prices)
    #    (Guarded display to avoid errors in non-notebook envs)
    try:
        display(sku_cv.sort_values("price_cv", ascending=True).head(10))
        display(cat_cv.sort_values("price_cv", ascending=True).head(10))
    except Exception:
        print("[info] display() not available; printing heads instead.")
        print(sku_cv.sort_values("price_cv", ascending=True).head(10).to_string(index=False))
        print(cat_cv.sort_values("price_cv", ascending=True).head(10).to_string(index=False))

    # 7) Optional: quick counts of zero/negative qty/price rows (should be 0 post-clean)
    bad_rows = sales_costed[(pd.to_numeric(sales_costed["qty"], errors="coerce")<=0) |
                            (pd.to_numeric(sales_costed["price"], errors="coerce")<=0)]
    if len(bad_rows):
        print(f"[warn] Found {len(bad_rows)} rows with non-positive qty/price after cleaning.")


Rows: 132336
Date null %: 0.0
Qty>0 & Price>0 rows: 132336
Unique SKUs: 8039
Unique Categories: 10

% SKUs with any price variation (cv>0): 0.794
% Categories with any price variation: 0.9


Unnamed: 0,sku,price_cv
4280,JNE3652-TP-L,0.0
4245,JNE3645-TP-XL,0.0
3302,JNE3441-KR-XS,0.0
6392,SET075-KR-DH-XL,0.0
6390,SET075-KR-DH-M,0.0
4966,JNE3766-KR-XXL,0.0
4964,JNE3766-KR-XL,0.0
4957,JNE3765-KR-XL,0.0
2700,JNE2294-KR-A-XXXL,0.0
2696,JNE2294-KR-A-L,0.0


Unnamed: 0,category,price_cv
2,Dupatta,0.0
8,Western Dress,0.167247
4,Saree,0.174835
6,Top,0.23965
1,Bottom,0.245396
9,kurta,0.282369
0,Blouse,0.284205
5,Set,0.309584
3,Ethnic Dress,0.319227
7,Uncategorized,0.455336


In [35]:
# pooled elasticities with strict numeric coercion (fixes "dtype of object" error)
import warnings
from typing import Tuple, Dict

import numpy as np
import pandas as pd
import statsmodels.api as sm

warnings.filterwarnings("ignore", message="Could not infer format, so each element will be parsed individually")


def fit_pooled_elasticities(
    df: pd.DataFrame,
    topN: int = 50,
    weekly_fallback: bool = True,
    *,
    robust_se: bool = False,           # set True to report HC1-robust R²/t-stats (elasticities unaffected)
) -> Tuple[pd.DataFrame, Dict[str, float]]:
    """
    Fit a pooled log-log elasticity model with category interactions:

        ln(qty) ~ ln(price) + weekday_dummies + month_dummies
                  + (ln(price) x category_top_dummies) + const

    - Uses transaction-level data first; if insufficient, falls back to weekly aggregates.
    - Returns a table of per-category elasticities (includes a 'BASE' row)
      and a dict with model metadata (R², n, level).

    Parameters
    ----------
    df : pd.DataFrame
        Must include ['qty','price'] and ideally ['date','category'].
    topN : int
        Keep the top-N most frequent categories; all others pooled into "_OTHER_".
    weekly_fallback : bool
        If txn-level fit fails (too few rows), aggregate to weekly level and retry.
    robust_se : bool, default False
        If True, compute HC1-robust covariance for the label-refit when reporting R²/t-stats.

    Returns
    -------
    (elastic_table, model_info)
        elastic_table : DataFrame with columns ['category','elasticity']
                        (includes 'BASE' row for the overall slope)
        model_info    : dict with keys {'level','R2','n'}
                        where 'level' ∈ {'txn','weekly','none'}
    """

    def _prep(d: pd.DataFrame) -> pd.DataFrame:
        """Minimal cleaning + feature engineering for modeling."""
        # Keep positive qty/price
        d = d[(pd.to_numeric(d["qty"], errors="coerce") > 0) &
              (pd.to_numeric(d["price"], errors="coerce") > 0)].copy()

        # Core logs (coerce then log)
        d["ln_q"] = np.log(pd.to_numeric(d["qty"], errors="coerce"))
        d["ln_p"] = np.log(pd.to_numeric(d["price"], errors="coerce"))

        # Calendar controls if date exists and is datetime-like
        if "date" in d.columns and pd.api.types.is_datetime64_any_dtype(d["date"]):
            d["weekday"] = d["date"].dt.weekday
            d["month"] = d["date"].dt.month
        else:
            d["weekday"] = 0
            d["month"] = 1

        # Compact category set for stability
        if "category" not in d.columns:
            d["category"] = "_MISSING_"
        top_cats = d["category"].astype(str).value_counts().head(int(topN)).index.tolist()
        d["cat_top"] = np.where(d["category"].astype(str).isin(top_cats),
                                d["category"].astype(str),
                                "_OTHER_")

        # Final clean: drop any rows with NA logs
        d = d.dropna(subset=["ln_q", "ln_p"])

        return d

    def _run(d: pd.DataFrame):
        """Fit pooled model and recover per-category elasticities."""
        if len(d) < 50:
            return None, {"n": len(d), "note": "too few rows"}

        base = d[["ln_p", "cat_top", "weekday", "month"]].copy()
        base["ln_p"] = pd.to_numeric(base["ln_p"], errors="coerce")

        # Dummies
        X = pd.get_dummies(base, columns=["cat_top", "weekday", "month"], drop_first=True)

        # Interactions: ln_p * each category dummy
        cat_cols = [c for c in X.columns if c.startswith("cat_top_")]
        for ccol in cat_cols:
            X[f"lnp_x_{ccol}"] = pd.to_numeric(X["ln_p"], errors="coerce") * pd.to_numeric(X[ccol], errors="coerce")

        # Strict numeric matrix
        X = X.apply(pd.to_numeric, errors="coerce")
        X = X.replace([np.inf, -np.inf], np.nan).fillna(0.0).astype(float)
        X = sm.add_constant(X, has_constant="add")

        y = pd.to_numeric(d["ln_q"], errors="coerce").replace([np.inf, -np.inf], np.nan)

        # Align & drop any remaining NaNs
        mask = y.notna() & np.isfinite(y)
        if "const" in X.columns:
            mask = mask & X["const"].notna()
        X = X.loc[mask]
        y = y.loc[mask].astype(float)

        # Convert to numpy (avoids pandas object dtype surprises)
        X_np = np.asarray(X, dtype=float)
        y_np = np.asarray(y, dtype=float)

        # Fit (numeric arrays)
        model = sm.OLS(y_np, X_np, missing="drop").fit()

        # Refit with labels (now that X/y are guaranteed numeric) to read param names
        model_lbl = sm.OLS(y, X, missing="drop").fit()
        if robust_se:
            # Use robust covariance for reporting (does not change params)
            model_lbl = model_lbl.get_robustcov_results(cov_type="HC1")

        base_beta = float(model_lbl.params.get("ln_p", np.nan))
        rows = [{"category": "BASE", "elasticity": base_beta}]

        for ccol in cat_cols:
            inter = model_lbl.params.get(f"lnp_x_{ccol}", np.nan)
            cat_name = ccol.replace("cat_top_", "")
            beta_cat = base_beta + (float(inter) if pd.notna(inter) else 0.0)
            rows.append({"category": cat_name, "elasticity": beta_cat})

        etab = pd.DataFrame(rows).dropna()
        info = {"R2": float(model_lbl.rsquared), "n": int(model_lbl.nobs)}
        return etab, info

    # ---------- 1) Transaction-level pooled ----------
    d1 = _prep(df.copy())
    etab, info = _run(d1)
    if etab is not None and len(etab):
        print(f"[pooled] Transaction-level OK — R²={info['R2']:.3f}, n={info['n']}")
        return etab, {"level": "txn", **info}

    # ---------- 2) Weekly aggregation fallback ----------
    if weekly_fallback and "date" in df.columns and pd.api.types.is_datetime64_any_dtype(df["date"]):
        d2 = df.copy()
        d2 = d2[(pd.to_numeric(d2["qty"], errors="coerce") > 0) &
                (pd.to_numeric(d2["price"], errors="coerce") > 0)].dropna(subset=["qty", "price", "date"]).copy()
        d2["week"] = d2["date"].dt.to_period("W").dt.start_time
        d2 = (d2.groupby(["category", "week"], as_index=False)
                .agg(price=("price", "mean"), qty=("qty", "sum")))
        d2["date"] = pd.to_datetime(d2["week"])
        d2 = _prep(d2)
        etab2, info2 = _run(d2)
        if etab2 is not None and len(etab2):
            print(f"[pooled] Weekly fallback OK — R²={info2['R2']:.3f}, n={info2['n']}")
            return etab2, {"level": "weekly", **info2}

    # ---------- 3) Failure ----------
    print("[pooled] Could not fit pooled model (even with fallback).")
    return pd.DataFrame(columns=["category", "elasticity"]), {"level": "none", "R2": np.nan, "n": 0}


In [36]:
# ========= Minimal simulator + plotting utilities (refined) =========
from __future__ import annotations

from typing import Tuple, Dict, Optional, Sequence, Any
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt


def simulate_prices(
    sales_costed: pd.DataFrame,
    elastic_for_sim: pd.DataFrame,
    CFG: Dict[str, Any],
) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Simulate revenue/profit across a price-change grid using per-group elasticities.

    Parameters
    ----------
    sales_costed : DataFrame
        Transactions with at least:
        ['price','qty','unit_cost','ship_cost','return_cost', group_col, ...]
    elastic_for_sim : DataFrame
        Per-group elasticities with columns: [group_col, 'elasticity'].
        May also contain a 'BASE' row under either column 'category' or group_col.
    CFG : dict
        Must include:
          CFG["modeling"]["group_level"] ∈ {'sku','category'}
          CFG["modeling"]["price_grid_pct"] : Sequence[float], e.g. [-0.3,...,0.3]

    Returns
    -------
    sim_df : DataFrame
        One row per (group, delta_pct) with:
        [group_col, delta_pct, price_new, qty_new, revenue, profit, elasticity, base_price, base_qty]
    best_df : DataFrame
        One row per group with the max-profit candidate:
        [group_col, best_delta_pct, best_price, best_qty, best_revenue, best_profit, elasticity]
    """
    group_col: str = CFG["modeling"].get("group_level", "category")
    grid: Sequence[float] = CFG["modeling"].get(
        "price_grid_pct", [-0.3, -0.2, -0.1, 0.0, 0.05, 0.1, 0.2, 0.3]
    )

    # --- aggregate baseline stats per group ---
    # Use TOTAL qty (sum) for realistic scale; mean costs/prices as proxies
    base = (
        sales_costed.groupby(group_col, as_index=False)
        .agg(
            base_price=("price", "mean"),
            base_qty=("qty", "sum"),
            unit_cost=("unit_cost", "mean"),
            ship_cost=("ship_cost", "mean"),
            return_cost=("return_cost", "mean"),
        )
    )

    # Merge in elasticities
    ef = elastic_for_sim[[group_col, "elasticity"]].copy() if group_col in elastic_for_sim.columns else None
    if ef is None:
        # Try to fall back to 'category' if the user passed that instead
        if "category" in elastic_for_sim.columns:
            ef = elastic_for_sim[["category", "elasticity"]].rename(columns={"category": group_col}).copy()
        else:
            ef = pd.DataFrame({group_col: [], "elasticity": []})

    merged = base.merge(ef, on=group_col, how="left")

    # Fallback elasticity if group missing: use 'BASE' if present; else mild prior
    base_el: Optional[float] = None
    # Look for a BASE row in either the group_col or 'category' column
    if group_col in elastic_for_sim.columns and "BASE" in elastic_for_sim[group_col].astype(str).values:
        base_el = float(
            elastic_for_sim.loc[elastic_for_sim[group_col] == "BASE", "elasticity"].iloc[0]
        )
    elif "category" in elastic_for_sim.columns and "BASE" in elastic_for_sim["category"].astype(str).values:
        base_el = float(
            elastic_for_sim.loc[elastic_for_sim["category"] == "BASE", "elasticity"].iloc[0]
        )

    merged["elasticity"] = merged["elasticity"].astype(float)
    merged["elasticity"] = merged["elasticity"].fillna(base_el if base_el is not None else -0.25)

    # Guardrails on beta (no upward-sloping demand; cap extremes)
    merged["elasticity"] = merged["elasticity"].clip(lower=-5.0, upper=0.0)

    # Simulate
    rows = []
    for _, r in merged.iterrows():
        g = r[group_col]
        p0 = float(max(np.float64(r["base_price"]), 1e-9))
        q0 = float(max(np.float64(r["base_qty"]), 1e-9))
        c = float(np.float64(r["unit_cost"]))
        s = float(np.float64(r["ship_cost"]))
        rc = float(np.float64(r["return_cost"]))
        beta = float(np.float64(r["elasticity"]))

        # If nearly flat demand, restrict moves to a conservative grid
        local_grid = [-0.05, 0.0, 0.05] if beta >= -0.02 and any(abs(x) > 0.05 for x in grid) else grid

        for pct in local_grid:
            p1 = p0 * (1.0 + float(pct))
            # Demand response (isoelastic)
            q1 = q0 * (p1 / p0) ** beta
            # Economics
            rev = p1 * q1
            prof = (p1 - c - s) * q1 - rc * q1  # equals (p1 - c - s - rc) * q1

            rows.append(
                {
                    group_col: g,
                    "delta_pct": float(pct),
                    "price_new": float(p1),
                    "qty_new": float(q1),
                    "revenue": float(rev),
                    "profit": float(prof),
                    "elasticity": float(beta),
                    "base_price": float(p0),
                    "base_qty": float(q0),
                }
            )

    sim_df = pd.DataFrame(rows)

    if sim_df.empty:
        # Return well-typed empties to avoid downstream crashes
        return (
            pd.DataFrame(
                columns=[
                    group_col,
                    "delta_pct",
                    "price_new",
                    "qty_new",
                    "revenue",
                    "profit",
                    "elasticity",
                    "base_price",
                    "base_qty",
                ]
            ),
            pd.DataFrame(
                columns=[
                    group_col,
                    "best_delta_pct",
                    "best_price",
                    "best_qty",
                    "best_revenue",
                    "best_profit",
                    "elasticity",
                ]
            ),
        )

    best_df = (
        sim_df.sort_values("profit", ascending=False)
        .groupby(group_col, as_index=False)
        .first()
        .rename(
            columns={
                "delta_pct": "best_delta_pct",
                "price_new": "best_price",
                "qty_new": "best_qty",
                "revenue": "best_revenue",
                "profit": "best_profit",
            }
        )
    )

    return sim_df, best_df


def plot_elasticity_hist(elastic_table: pd.DataFrame, out_path: str) -> None:
    """
    Save a histogram of elasticities to `out_path`.
    Expects column 'elasticity'; safely no-ops if not present/empty.
    """
    if "elasticity" not in elastic_table.columns or elastic_table["elasticity"].dropna().empty:
        print("[info] No elasticity values to plot.")
        return

    plt.figure(figsize=(6, 4))
    elastic_table["elasticity"].dropna().astype(float).plot(kind="hist", bins=30)
    plt.title("Elasticity Distribution")
    plt.xlabel("Elasticity (β)")
    plt.ylabel("Count")
    plt.tight_layout()
    plt.savefig(out_path, dpi=150)
    plt.close()
    print(f"[ok] Saved histogram: {out_path}")


def plot_profit_surface(sim_df: pd.DataFrame, group_value: Any, group_col: str, out_path: str) -> None:
    """
    For a given group (e.g., category/SKU), plot profit vs. price change (%).
    Safely no-ops if no rows for that group.
    """
    sub = sim_df[sim_df[group_col] == group_value].copy()
    if sub.empty:
        print(f"[info] No sim rows for {group_col}={group_value}; skipping plot.")
        return
    sub = sub.sort_values("delta_pct")

    plt.figure(figsize=(6, 4))
    plt.plot(sub["delta_pct"] * 100.0, sub["profit"])
    plt.title(f"Profit vs Price Change — {group_value}")
    plt.xlabel("Price change (%)")
    plt.ylabel("Profit")
    plt.tight_layout()
    plt.savefig(out_path, dpi=150)
    plt.close()
    print(f"[ok] Saved surface: {out_path}")


def print_recommendations(best_df: pd.DataFrame, k: int = 10, group: str = "category") -> None:
    """
    Console-friendly summary of top-k groups by best_profit.
    """
    if best_df.empty:
        print("[info] No recommendations to print.")
        return

    top = best_df.sort_values("best_profit", ascending=False).head(k)
    print(f"\nTop {len(top)} {group} recommendations:")
    for _, r in top.iterrows():
        dpct = f"{100 * r['best_delta_pct']:+.1f}%"
        print(
            f" - {str(r[group]):<24} | Δp={dpct:>7} | "
            f"best_price={r['best_price']:.2f} | "
            f"best_profit={r['best_profit']:.2f}"
        )


# ---quick portfolio summary helper ---
def summarize_portfolio(best_df: pd.DataFrame, sales_costed: pd.DataFrame, group_col: str = "category") -> pd.DataFrame:
    """
    Build a 1-row summary comparing baseline vs. best across all groups.
    Assumes you've already created row-level baseline revenue/profit columns
    OR will compute them here quickly.

    Returns a DataFrame with:
      ['baseline_revenue','baseline_profit','best_revenue','best_profit',
       'revenue_lift_abs','profit_lift_abs','revenue_lift_pct','profit_lift_pct']
    """
    # Baselines from actual data (compute if missing)
    if not {"revenue_row", "profit_row"}.issubset(set(sales_costed.columns)):
        tmp = sales_costed.copy()
        tmp["revenue_row"] = tmp["price"] * tmp["qty"]
        tmp["profit_row"] = ((tmp["price"] - tmp["unit_cost"] - tmp["ship_cost"]) * tmp["qty"]) - (tmp["return_cost"] * tmp["qty"])
    else:
        tmp = sales_costed

    base_rev = float(tmp["revenue_row"].sum())
    base_prof = float(tmp["profit_row"].sum())

    best_rev = float(best_df["best_revenue"].sum()) if "best_revenue" in best_df.columns else np.nan
    best_prof = float(best_df["best_profit"].sum()) if "best_profit" in best_df.columns else np.nan

    lift_rev = best_rev - base_rev
    lift_prof = best_prof - base_prof

    out = pd.DataFrame(
        [
            {
                "baseline_revenue": base_rev,
                "baseline_profit": base_prof,
                "best_revenue": best_rev,
                "best_profit": best_prof,
                "revenue_lift_abs": lift_rev,
                "profit_lift_abs": lift_prof,
                "revenue_lift_pct": (lift_rev / base_rev) if base_rev > 0 else np.nan,
                "profit_lift_pct": (lift_prof / base_prof) if base_prof > 0 else np.nan,
            }
        ]
    )
    return out



In [37]:
# --- Fit pooled elasticities
elastic_pooled, pooled_info = fit_pooled_elasticities(sales_costed, topN=50, weekly_fallback=True)
display(elastic_pooled.head(10))

# --- Prep per-category features for simulation
def _cv_safe(x: pd.Series) -> float:
    x = pd.to_numeric(x, errors="coerce").dropna()
    if len(x) < 2:
        return 0.0
    m = x.mean()
    if m <= 0:
        return 0.0
    s = x.std(ddof=1)
    return float(s/m)

elastic_for_sim = (
    sales_costed.groupby("category", as_index=False)
    .agg(
        n_obs=("price","count"),
        price_cv=("price", _cv_safe)
    )
    .merge(elastic_pooled, on="category", how="left")
)

# Fallback elasticity = BASE (or mild prior)
base_el = (
    elastic_pooled.loc[elastic_pooled["category"]=="BASE","elasticity"].iloc[0]
    if "BASE" in elastic_pooled["category"].values else -0.25
)
elastic_for_sim["elasticity"] = elastic_for_sim["elasticity"].fillna(base_el)

#  filter out very small categories from simulation (too noisy)
MIN_N = 50
elastic_for_sim_sim = elastic_for_sim.query("n_obs >= @MIN_N").copy()
if elastic_for_sim_sim.empty:
    print("[warn] No categories pass MIN_N filter; using all categories instead.")
    elastic_for_sim_sim = elastic_for_sim.copy()

# --- Configure and simulate
CFG_run = CFG.copy()
CFG_run["modeling"]["group_level"] = "category"

sim, best = simulate_prices(sales_costed, elastic_for_sim_sim, CFG_run)

# Hard safety cap on deltas, just in case (should already be enforced inside simulate_prices)
best["best_delta_pct"] = best["best_delta_pct"].clip(-0.30, 0.30)

# --- Save artifacts
sim.to_csv("artifacts/simulated_pricing_category_pooled.csv", index=False)
best.to_csv("artifacts/best_price_recos_category_pooled.csv", index=False)

plot_elasticity_hist(elastic_for_sim.rename(columns={"elasticity":"elasticity"}),
                     "artifacts/elasticity_hist_category_pooled.png")

top_k = int(CFG_run["modeling"]["top_k_plots"])
for cat in best.sort_values("best_profit", ascending=False).head(top_k)["category"]:
    plot_profit_surface(sim, cat, "category", f"artifacts/profit_surface_category_{cat}.png")

print(f"Artifacts saved (pooled={pooled_info['level']}): ./artifacts")
print("Top recommendations:")
print_recommendations(best, k=10, group="category")

# 1-row portfolio summary
portfolio = summarize_portfolio(best, sales_costed, group_col="category")
display(portfolio)


[pooled] Transaction-level OK — R²=0.157, n=132336


Unnamed: 0,category,elasticity
0,BASE,-0.021854
1,Bottom,-0.007923
2,Dupatta,-0.025124
3,Ethnic Dress,-0.010194
4,Saree,-0.030447
5,Set,-0.002121
6,Top,-0.003911
7,Uncategorized,-0.141421
8,Western Dress,-0.002852
9,kurta,-0.000912


[ok] Saved histogram: artifacts/elasticity_hist_category_pooled.png
[ok] Saved surface: artifacts/profit_surface_category_Set.png
[ok] Saved surface: artifacts/profit_surface_category_Uncategorized.png
[ok] Saved surface: artifacts/profit_surface_category_kurta.png
[ok] Saved surface: artifacts/profit_surface_category_Western Dress.png
[ok] Saved surface: artifacts/profit_surface_category_Top.png
Artifacts saved (pooled=txn): ./artifacts
Top recommendations:

Top 10 category recommendations:
 - Set                      | Δp=  +5.0% | best_price=894.66 | best_profit=26696051.72
 - Uncategorized            | Δp= +30.0% | best_price=891.10 | best_profit=13892732.40
 - kurta                    | Δp=  +5.0% | best_price=486.83 | best_profit=8648114.78
 - Western Dress            | Δp=  +5.0% | best_price=814.22 | best_profit=7174344.86
 - Top                      | Δp=  +5.0% | best_price=561.82 | best_profit=2635440.05
 - Ethnic Dress             | Δp=  +5.0% | best_price=783.47 | best_pro

Unnamed: 0,baseline_revenue,baseline_profit,best_revenue,best_profit,revenue_lift_abs,profit_lift_abs,revenue_lift_pct,profit_lift_pct
0,91170668.19,50998357.29,99933520.0,60020530.0,8762851.0,9022177.0,0.096115,0.176911


In [38]:
import pandas as pd
import numpy as np

# -------------------- 0) Preconditions & small helpers --------------------
def _as_category_safe(s: pd.Series) -> pd.Series:
    return (s.astype(str)
             .replace({"nan": "Uncategorized", "None": "Uncategorized", "": "Uncategorized"})
             .fillna("Uncategorized"))

def _grid_for_beta(beta: float) -> np.ndarray:
    # Conservative grid when elasticity ~ 0; wider grid otherwise
    return np.array([-0.05, 0.0, 0.05]) if beta >= -0.02 else np.array([-0.30, -0.20, -0.10, 0.0, 0.05, 0.10, 0.20, 0.30])

# -------------------- 1) Clean category labels --------------------
sales_costed = sales_costed.copy()
elastic_for_sim = elastic_for_sim.copy()

sales_costed["category"] = _as_category_safe(sales_costed["category"])
elastic_for_sim["category"] = _as_category_safe(elastic_for_sim["category"])

# -------------------- 2) Guardrails on elasticities --------------------
elastic_guard = elastic_for_sim.copy()
# Demand shouldn’t increase with price in this isoelastic toy model; cap extremes
elastic_guard["elasticity"] = pd.to_numeric(elastic_guard["elasticity"], errors="coerce").clip(lower=-5.0, upper=0.0)

# -------------------- 3) Baseline aggregates (use TOTAL qty for scale) --------------------
# Mean price approximates current price level; total qty sets the scale realistically
base = (sales_costed.groupby("category", as_index=False)
        .agg(base_price=("price","mean"),
             base_qty=("qty","sum"),          # <-- total qty (not mean) for correct scaling
             unit_cost=("unit_cost","mean"),
             ship_cost=("ship_cost","mean"),
             return_cost=("return_cost","mean")))

merged = elastic_guard.merge(base, on="category", how="inner")

# Optional: drop very small categories (too noisy)
MIN_N = 50
n_per_cat = sales_costed.groupby("category")["qty"].size().rename("n_orders").reset_index()
merged = merged.merge(n_per_cat, on="category", how="left")
merged = merged[merged["n_orders"].fillna(0) >= MIN_N].copy()
if merged.empty:
    print(f"[warn] No categories pass MIN_N={MIN_N}; continuing with all categories.")
    merged = elastic_guard.merge(base, on="category", how="inner")

# -------------------- 4) Simulate per-category guarded grids --------------------
rows = []
for _, row in merged.iterrows():
    beta = float(row["elasticity"])
    p0   = float(row["base_price"]) if row["base_price"] > 0 else 1e-6
    q0   = float(max(row["base_qty"], 1e-6))
    c, s, r = float(row["unit_cost"]), float(row["ship_cost"]), float(row["return_cost"])

    for pct in _grid_for_beta(beta):
        p1 = p0 * (1.0 + pct)
        q1 = q0 * (p1 / p0) ** beta
        rev = p1 * q1
        prof = (p1 - c - s) * q1 - r * q1
        rows.append({
            "category": row["category"],
            "delta_pct": float(pct),
            "price_new": float(p1),
            "qty_new": float(q1),
            "revenue": float(rev),
            "profit": float(prof),
            "elasticity": beta
        })

sim_guard = pd.DataFrame(rows)

best_guard = (sim_guard.sort_values(["profit"], ascending=False)
                        .groupby("category", as_index=False)
                        .first()
                        .rename(columns={
                            "delta_pct":"best_delta_pct",
                            "price_new":"best_price",
                            "qty_new":"best_qty",
                            "revenue":"best_revenue",
                            "profit":"best_profit"
                        }))

# Final safety cap on recommended moves (just in case)
best_guard["best_delta_pct"] = best_guard["best_delta_pct"].clip(-0.30, 0.30)

# -------------------- 5) Baseline actuals (row-level, consistent with costs) --------------------
sales_costed = sales_costed.assign(
    revenue_row = pd.to_numeric(sales_costed["price"], errors="coerce") * pd.to_numeric(sales_costed["qty"], errors="coerce"),
    profit_row  = ((pd.to_numeric(sales_costed["price"], errors="coerce")
                   - pd.to_numeric(sales_costed["unit_cost"], errors="coerce")
                   - pd.to_numeric(sales_costed["ship_cost"], errors="coerce"))
                   * pd.to_numeric(sales_costed["qty"], errors="coerce"))
                   - pd.to_numeric(sales_costed["return_cost"], errors="coerce") * pd.to_numeric(sales_costed["qty"], errors="coerce")
)

baseline_cat = (sales_costed.groupby("category", as_index=False)
                .agg(baseline_revenue=("revenue_row","sum"),
                     baseline_profit=("profit_row","sum"),
                     baseline_avg_price=("price","mean"),
                     baseline_avg_qty=("qty","mean"),
                     n_orders=("qty","size")))

# -------------------- 6) Memo table with safe fallbacks & lifts --------------------
memo = (baseline_cat.merge(best_guard, on="category", how="left")
                    .sort_values("best_profit", ascending=False))

memo_cols = ["category","n_orders",
             "baseline_revenue","baseline_profit",
             "best_delta_pct","best_price","best_revenue","best_profit"]
memo_out = memo[memo_cols].copy()

# Safe defaults for categories with no sim row
memo_out["best_delta_pct"] = memo_out["best_delta_pct"].fillna(0.0)
memo_out["best_price"]     = memo_out["best_price"].fillna(memo["baseline_avg_price"])
memo_out["best_revenue"]   = memo_out["best_revenue"].fillna(memo["baseline_revenue"])
memo_out["best_profit"]    = memo_out["best_profit"].fillna(memo["baseline_profit"])

# Lifts
memo_out["revenue_lift_abs"] = memo_out["best_revenue"] - memo_out["baseline_revenue"]
memo_out["profit_lift_abs"]  = memo_out["best_profit"]  - memo_out["baseline_profit"]
memo_out["revenue_lift_pct"] = np.where(memo_out["baseline_revenue"]>0,
                                        memo_out["revenue_lift_abs"]/memo_out["baseline_revenue"], 0.0)
memo_out["profit_lift_pct"]  = np.where(memo_out["baseline_profit"]>0,
                                        memo_out["profit_lift_abs"]/memo_out["baseline_profit"], 0.0)

# -------------------- 7) Save + preview --------------------
memo_out.to_csv("artifacts/memo_category_recos_guarded.csv", index=False)
print("Saved: artifacts/memo_category_recos_guarded.csv")
display(memo_out.head(10))

# -------------------- 8) Optional: plots --------------------
plot_elasticity_hist(elastic_guard[["elasticity"]], "artifacts/elasticity_hist_category_guarded.png")
for cat in memo_out.sort_values("profit_lift_abs", ascending=False).head(int(CFG_guard["modeling"]["top_k_plots"]))["category"]:
    if isinstance(cat, str) and cat:
        plot_profit_surface(sim_guard, cat, "category", f"artifacts/profit_surface_category_guarded_{cat}.png")


Saved: artifacts/memo_category_recos_guarded.csv


Unnamed: 0,category,n_orders,baseline_revenue,baseline_profit,best_delta_pct,best_price,best_revenue,best_profit,revenue_lift_abs,profit_lift_abs,revenue_lift_pct,profit_lift_pct
5,Set,44077,37662424.0,24806680.0,0.05,894.656625,39550470.0,26696050.0,1888042.0,1889372.0,0.050131,0.076164
7,Uncategorized,18635,15768162.19,8784619.0,0.3,891.095113,20621910.0,13892730.0,4853745.0,5108113.0,0.307819,0.581484
9,kurta,43905,20452141.0,7625474.0,0.05,486.832757,21474210.0,8648115.0,1022070.0,1022641.0,0.049974,0.134108
8,Western Dress,13663,10629096.0,6643152.0,0.05,814.217017,11159730.0,7174345.0,530638.0,531192.6,0.049923,0.079961
6,Top,9692,5203733.0,2375946.0,0.05,561.823127,5462687.0,2635440.0,258954.3,259493.9,0.049763,0.109217
3,Ethnic Dress,1018,760711.0,463829.7,0.05,783.469155,799524.3,502790.6,38813.26,38960.88,0.051022,0.083998
0,Blouse,815,434751.0,195734.0,0.3,689.411534,563456.3,325805.8,128705.3,130071.8,0.296044,0.664534
4,Saree,147,118509.0,74601.98,0.3,1022.012245,153096.0,109538.3,34586.99,34936.33,0.291851,0.468303
1,Bottom,381,140226.0,28277.62,0.05,382.806299,147323.5,35418.36,7097.463,7140.731,0.050614,0.252522
2,Dupatta,3,915.0,42.675,0.0,305.0,915.0,42.675,0.0,0.0,0.0,0.0


[ok] Saved histogram: artifacts/elasticity_hist_category_guarded.png
[ok] Saved surface: artifacts/profit_surface_category_guarded_Uncategorized.png
[ok] Saved surface: artifacts/profit_surface_category_guarded_Set.png
[ok] Saved surface: artifacts/profit_surface_category_guarded_kurta.png
[ok] Saved surface: artifacts/profit_surface_category_guarded_Western Dress.png
[ok] Saved surface: artifacts/profit_surface_category_guarded_Top.png


In [39]:
# ✅ Guarded simulation with TOTAL qty instead of mean qty (refined)

import pandas as pd
import numpy as np

# ---------- 0) Small helpers ----------
def _clean_cat(s: pd.Series) -> pd.Series:
    return (s.astype(str)
             .replace({"nan": "Uncategorized", "None": "Uncategorized", "": "Uncategorized"})
             .fillna("Uncategorized"))

def grid_for_beta(beta: float) -> np.ndarray:
    # Conservative grid if elasticity is ~0; wider otherwise
    return np.array([-0.05, 0.0, 0.05]) if beta >= -0.02 else np.array([-0.30, -0.20, -0.10, 0.0, 0.05, 0.10, 0.20, 0.30])

# ---------- 1) Clean labels & guard elasticities ----------
sales_costed = sales_costed.copy()
elastic_for_sim = elastic_for_sim.copy()

sales_costed["category"] = _clean_cat(sales_costed["category"])
elastic_for_sim["category"] = _clean_cat(elastic_for_sim["category"])

elastic_guard = elastic_for_sim.copy()
elastic_guard["elasticity"] = pd.to_numeric(elastic_guard["elasticity"], errors="coerce").clip(lower=-5.0, upper=0.0)

# ---------- 2) Baseline aggregates (TOTAL qty for scale) ----------
# Cast to numeric to avoid 'object' surprises
for col in ["price", "qty", "unit_cost", "ship_cost", "return_cost"]:
    sales_costed[col] = pd.to_numeric(sales_costed[col], errors="coerce")

base = (sales_costed
        .groupby("category", as_index=False)
        .agg(base_price=("price","mean"),
             base_qty=("qty","sum"),      # total qty, not mean
             unit_cost=("unit_cost","mean"),
             ship_cost=("ship_cost","mean"),
             return_cost=("return_cost","mean")))

merged = elastic_guard.merge(base, on="category", how="inner")

#  drop very small categories (noisy signal)
MIN_N = 50
n_per_cat = sales_costed.groupby("category")["qty"].size().rename("n_orders").reset_index()
merged = merged.merge(n_per_cat, on="category", how="left")
candidate = merged[merged["n_orders"].fillna(0) >= MIN_N].copy()
if candidate.empty:
    print(f"[warn] No categories pass MIN_N={MIN_N}; using all categories.")
    candidate = merged.copy()

# ---------- 3) Simulate guarded grids ----------
rows = []
for _, row in candidate.iterrows():
    beta = float(row["elasticity"])
    p0   = float(row["base_price"]) if float(row["base_price"]) > 0 else 1e-6
    q0   = float(max(row["base_qty"], 1e-6))
    c, s, r = float(row["unit_cost"]), float(row["ship_cost"]), float(row["return_cost"])

    for pct in grid_for_beta(beta):
        p1 = p0 * (1.0 + float(pct))
        q1 = q0 * (p1 / p0) ** beta
        rev = p1 * q1
        prof = (p1 - c - s) * q1 - r * q1
        rows.append({
            "category": row["category"],
            "delta_pct": float(pct),
            "price_new": float(p1),
            "qty_new": float(q1),
            "revenue": float(rev),
            "profit": float(prof),
            "elasticity": beta
        })

sim_guard = pd.DataFrame(rows)

best_guard = (sim_guard.sort_values("profit", ascending=False)
                        .groupby("category", as_index=False)
                        .first()
                        .rename(columns={
                            "delta_pct":"best_delta_pct",
                            "price_new":"best_price",
                            "qty_new":"best_qty",
                            "revenue":"best_revenue",
                            "profit":"best_profit"
                        }))

# Final safety cap on recommended moves
best_guard["best_delta_pct"] = best_guard["best_delta_pct"].clip(-0.30, 0.30)

# ---------- 4) Baseline actuals (row-level) ----------
sales_costed["revenue_row"] = sales_costed["price"] * sales_costed["qty"]
sales_costed["profit_row"]  = (
    (sales_costed["price"] - sales_costed["unit_cost"] - sales_costed["ship_cost"]) * sales_costed["qty"]
    - sales_costed["return_cost"] * sales_costed["qty"]
)

baseline_cat = (sales_costed.groupby("category", as_index=False)
                .agg(baseline_revenue=("revenue_row","sum"),
                     baseline_profit=("profit_row","sum"),
                     baseline_avg_price=("price","mean"),
                     n_orders=("qty","size")))

# ---------- 5) Join + lifts ----------
memo = (baseline_cat.merge(best_guard, on="category", how="left")
                     .sort_values("best_profit", ascending=False))

# Safe fallbacks for categories with no sim row
memo["best_delta_pct"] = memo["best_delta_pct"].fillna(0.0)
memo["best_price"]     = memo["best_price"].fillna(memo["baseline_avg_price"])
memo["best_revenue"]   = memo["best_revenue"].fillna(memo["baseline_revenue"])
memo["best_profit"]    = memo["best_profit"].fillna(memo["baseline_profit"])

memo["revenue_lift_abs"] = memo["best_revenue"] - memo["baseline_revenue"]
memo["profit_lift_abs"]  = memo["best_profit"]  - memo["baseline_profit"]
memo["revenue_lift_pct"] = np.where(memo["baseline_revenue"]>0,
                                    memo["revenue_lift_abs"]/memo["baseline_revenue"], 0.0)
memo["profit_lift_pct"]  = np.where(memo["baseline_profit"]>0,
                                    memo["profit_lift_abs"]/memo["baseline_profit"], 0.0)

# ---------- 6) Save + preview ----------
memo.to_csv("artifacts/memo_category_recos_guarded_FIXED.csv", index=False)
print("Saved: artifacts/memo_category_recos_guarded_FIXED.csv")
display(memo.head(10))


Saved: artifacts/memo_category_recos_guarded_FIXED.csv


Unnamed: 0,category,baseline_revenue,baseline_profit,baseline_avg_price,n_orders,best_delta_pct,best_price,best_qty,best_revenue,best_profit,elasticity,revenue_lift_abs,profit_lift_abs,revenue_lift_pct,profit_lift_pct
5,Set,37662424.0,24806680.0,852.053928,44077,0.05,894.656625,44207.425146,39550470.0,26696050.0,-0.002121,1888042.0,1889372.0,0.050131,0.076164
7,Uncategorized,15768162.19,8784619.0,685.457779,18635,0.3,891.095113,23142.207154,20621910.0,13892730.0,-0.141421,4853745.0,5108113.0,0.307819,0.581484
9,kurta,20452141.0,7625474.0,463.650245,43905,0.05,486.832757,44110.036291,21474210.0,8648115.0,-0.000912,1022070.0,1022641.0,0.049974,0.134108
8,Western Dress,10629096.0,6643152.0,775.444778,13663,0.05,814.217017,13706.092807,11159730.0,7174345.0,-0.002852,530638.0,531192.6,0.049923,0.079961
6,Top,5203733.0,2375946.0,535.069645,9692,0.05,561.823127,9723.144278,5462687.0,2635440.0,-0.003911,258954.3,259493.9,0.049763,0.109217
3,Ethnic Dress,760711.0,463829.7,746.1611,1018,0.05,783.469155,1020.492326,799524.3,502790.6,-0.010194,38813.26,38960.88,0.051022,0.083998
0,Blouse,434751.0,195734.0,530.316564,815,0.3,689.411534,817.300376,563456.3,325805.8,-0.021854,128705.3,130071.8,0.296044,0.664534
4,Saree,118509.0,74601.98,786.163265,147,0.3,1022.012245,149.798584,153096.0,109538.3,-0.030447,34586.99,34936.33,0.291851,0.468303
1,Bottom,140226.0,28277.62,364.577428,381,0.05,382.806299,384.851199,147323.5,35418.36,-0.007923,7097.463,7140.731,0.050614,0.252522
2,Dupatta,915.0,42.675,305.0,3,0.0,305.0,,915.0,42.675,,0.0,0.0,0.0,0.0


In [40]:
# ✅ Add global totals across categories

global_summary = {
    "category": "ALL_CATEGORIES",
    "baseline_revenue": float(memo["baseline_revenue"].sum()),
    "baseline_profit": float(memo["baseline_profit"].sum()),
    "best_revenue": float(memo["best_revenue"].sum()),
    "best_profit": float(memo["best_profit"].sum()),
}

# Absolute lifts
global_summary["revenue_lift_abs"] = global_summary["best_revenue"] - global_summary["baseline_revenue"]
global_summary["profit_lift_abs"]  = global_summary["best_profit"] - global_summary["baseline_profit"]

# Percentage lifts (guard divide-by-zero)
global_summary["revenue_lift_pct"] = (
    global_summary["revenue_lift_abs"] / global_summary["baseline_revenue"]
    if global_summary["baseline_revenue"] > 0 else 0.0
)
global_summary["profit_lift_pct"] = (
    global_summary["profit_lift_abs"] / global_summary["baseline_profit"]
    if global_summary["baseline_profit"] > 0 else 0.0
)

# Wrap as DataFrame
global_df = pd.DataFrame([global_summary])

# Append to the memo
memo_with_total = pd.concat([memo, global_df], ignore_index=True)

# Save and preview
memo_with_total.to_csv("artifacts/memo_category_recos_with_total.csv", index=False)
print("Saved: artifacts/memo_category_recos_with_total.csv")
display(memo_with_total.tail(1))  # just show the ALL_CATEGORIES row


Saved: artifacts/memo_category_recos_with_total.csv


Unnamed: 0,category,baseline_revenue,baseline_profit,baseline_avg_price,n_orders,best_delta_pct,best_price,best_qty,best_revenue,best_profit,elasticity,revenue_lift_abs,profit_lift_abs,revenue_lift_pct,profit_lift_pct
10,ALL_CATEGORIES,91170668.19,50998357.29,,,,,,99933320.0,60020280.0,,8762652.0,9021922.0,0.096113,0.176906
