# Global Superstore — End-to-End Analytics (Portfolio Edition)

**What this shows recruiters:** a disciplined pipeline from **raw Excel → cleaned fact/dims → DQ tests → EDA → SQL → cohorts/RFM → exports for BI**, with narrative and guardrails. Each section explains the why + how.

## 0) Project framing — objectives & business questions

**Why this exists:** reviewers should immediately see *what decisions this analysis enables* and *how the notebook is structured*.

**Objectives**
- Build a reliable, explainable dataset for sales analytics (clean types, consistent keys).
- Quantify performance by **Category** and **Market**; validate operational metrics.
- Segment customers (**RFM**) and analyze **cohort retention**.
- Export a **star schema** (fact + dims) for Power BI.

**Guiding questions**
1) Which **categories/markets** drive sales and profit?  
2) How do **discounts** relate to **profit margin**?  
3) What’s our **return rate** by category/market?  
4) Are we **shipping on time**? What are typical lead times (mean / p95)?  
5) Which **customer segments** are most valuable? Do they retain?

> **Portfolio note:** This section helps interviewers scan your work. Keep it lean but concrete.

## 1) Imports, paths & robust Excel helpers

**What we do**
- Import the analytics stack (`pandas`, `numpy`, `duckdb`, `matplotlib`, `plotly`).
- Create a small `Paths` dataclass to keep folder structure reproducible (`data/raw`, `data/processed`, `reports`).
- Define helpers:
  - `standardize_columns(df)`: trims/collapses spaces and *preserves* canonical column names we expect.
  - `read_sheet_any_case(path, name)`: reads a sheet even if the tab name varies in case or contains extra words.

**Why it matters**
- Prevents brittle failures when tab names change.
- Makes the notebook portable across machines.

In [None]:

from __future__ import annotations
import re, math, warnings
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Tuple

import numpy as np
import pandas as pd
import duckdb
import matplotlib.pyplot as plt
import plotly.express as px

pd.set_option("display.max_columns", 120)
plt.rcParams["figure.figsize"] = (10, 5)

@dataclass
class Paths:
    root: Path = Path(".")
    raw: Path = Path("data/raw")
    processed: Path = Path("data/processed")
    reports: Path = Path("reports")

paths = Paths()
for d in (paths.raw, paths.processed, paths.reports):
    d.mkdir(parents=True, exist_ok=True)

EXCEL_FILE = paths.raw / "GlobalSuperstore.xls"
print("Expecting Excel at:", EXCEL_FILE.resolve())

def standardize_columns(df: pd.DataFrame) -> pd.DataFrame:
    keep = {"Row ID","Order ID","Order Date","Ship Date","Ship Mode","Customer ID","Customer Name",
            "Segment","City","State","Country","Postal Code","Market","Region","Product ID",
            "Category","Sub-Category","Product Name","Sales","Quantity","Discount","Profit",
            "Shipping Cost","Order Priority","Returned","Person"}
    def clean(col: str) -> str:
        col = re.sub(r"\s+"," ",str(col).strip())
        if col in keep: return col
        return " ".join(w.capitalize() for w in col.split(" "))
    out = df.copy()
    out.columns = [clean(c) for c in out.columns]
    return out

def read_sheet_any_case(path: Path, name: str) -> pd.DataFrame:
    xls = pd.ExcelFile(path)
    target = name.lower()
    for n in xls.sheet_names:
        if n.lower() == target:
            return pd.read_excel(path, sheet_name=n)
    for n in xls.sheet_names:
        if target in n.lower():
            return pd.read_excel(path, sheet_name=n)
    raise ValueError(f"Sheet '{name}' not found. Available: {xls.sheet_names}")


## 2) Data intake & contracts (schema checks)
**Why:** Fail fast on missing columns or bad joins; make the pipeline **idempotent**.

In [None]:

from IPython.display import display

# Load raw sheets
orders  = standardize_columns(read_sheet_any_case(EXCEL_FILE, "Orders"))
returns = standardize_columns(read_sheet_any_case(EXCEL_FILE, "Returns"))
people  = standardize_columns(read_sheet_any_case(EXCEL_FILE, "People"))

# Contracts: required columns
req_orders = {"Order ID","Order Date","Ship Date","Ship Mode","Customer ID","Customer Name","Segment",
              "City","State","Country","Market","Region","Product ID","Category","Sub-Category",
              "Product Name","Sales","Quantity","Discount","Profit","Shipping Cost","Order Priority"}
missing = req_orders - set(orders.columns)
assert not missing, f"Orders missing columns: {missing}"
assert {"Order ID","Returned","Market"}.issubset(returns.columns), "Returns missing columns"
assert {"Person","Region"}.issubset(people.columns), "People missing columns"

# Normalize keys/dtypes
orders["Order ID"]  = orders["Order ID"].astype(str).str.strip()
returns["Order ID"] = returns["Order ID"].astype(str).str.strip()
returns["Returned"] = (returns["Returned"].astype(str).str.strip().str.lower()
                       .map({"yes": True, "y": True, "true": True, "1": True,
                             "no": False, "n": False, "false": False, "0": False}))

# Harmonize Markets in Returns
market_map = {
    "United States":"US","U.S.":"US","USA":"US","Usa":"US","us":"US",
    "Europe":"EU","European Union":"EU",
    "Asia Pacific":"APAC","ASIA PACIFIC":"APAC",
    "Latin America":"LATAM",
    "US":"US","EU":"EU","APAC":"APAC","LATAM":"LATAM","EMEA":"EMEA","Africa":"Africa","Canada":"Canada"
}
returns["Market"] = returns["Market"].astype(str).str.strip()
returns["Market_std"] = returns["Market"].map(lambda x: market_map.get(x, x))

# Harmonize Regions in People
people["Region"] = people["Region"].astype(str).str.strip()
people["Region_std"] = people["Region"].replace({"AMEA":"EMEA","emea":"EMEA","E.M.E.A":"EMEA"})

# QA: referential integrity & market mismatches
ret_not_in_orders = set(returns["Order ID"]) - set(orders["Order ID"])
print("Returns not in Orders:", len(ret_not_in_orders))

r = returns[["Order ID","Market_std"]].drop_duplicates("Order ID")
o = orders[["Order ID","Market"]].drop_duplicates("Order ID")
mk = o.merge(r, on="Order ID", how="left")
mk["match"] = (mk["Market"] == mk["Market_std"]) | mk["Market_std"].isna()
print("Market mismatches (pre):", int((~mk["match"]).sum()))

# Canonicalize Returns to Orders' market
market_map_orders = o.set_index("Order ID")["Market"]
returns["Market_std"] = returns["Order ID"].map(market_map_orders).fillna(returns["Market_std"])

# Re-check
r2 = returns[["Order ID","Market_std"]].drop_duplicates("Order ID")
mk2 = o.merge(r2, on="Order ID", how="left")
mk2["match"] = (mk2["Market"] == mk2["Market_std"]) | mk2["Market_std"].isna()
print("Market mismatches (post):", int((~mk2["match"]).sum()))

# Returned flag (idempotent) + Sales rep by Region
orders = orders.drop(columns=[c for c in orders.columns if c.lower().startswith("returned")], errors="ignore")
ret_flag = returns[["Order ID","Returned"]].drop_duplicates("Order ID")
orders = orders.merge(ret_flag, on="Order ID", how="left", validate="m:1", suffixes=("", "_ret"))
if "Returned_ret" in orders.columns and "Returned" not in orders.columns:
    orders = orders.rename(columns={"Returned_ret":"Returned"})
orders["Returned"] = orders["Returned"].astype("boolean").fillna(False).astype(bool)

people_clean = (people[["Region_std","Person"]].rename(columns={"Region_std":"Region"})
                .assign(Region=lambda d: d["Region"].astype(str).str.strip())
                .sort_values(["Region","Person"]).drop_duplicates("Region"))
orders["Region"] = orders["Region"].astype(str).str.strip()
orders = orders.drop(columns=[c for c in orders.columns if c.lower().startswith("person")], errors="ignore")
orders = orders.merge(people_clean, on="Region", how="left", validate="m:1")

display(orders.head(3)); display(returns.head(3)); display(people.head(3))
print({
    "orders_rows": len(orders),
    "returns_rows": len(returns),
    "distinct_orders": orders["Order ID"].nunique(),
    "returned_lines": int(orders["Returned"].sum()),
    "no_rep_rows": int(orders["Person"].isna().sum())
})


## 3) Cleaning & feature engineering (gold fact table)

**Types & categoricals**
- Parse date columns; coerce numeric columns (Sales/Profit/Discount/Shipping Cost/Quantity).
- Store `Postal Code` as **string** to preserve leading zeros and support non‑US formats.
- Normalize strings (trim spaces; Title Case for City/State).

**Deduplication**
- Prefer unique `Row ID` when available; otherwise use a composite (`Order ID`, `Product ID`, `Order Date`, `Customer ID`).

**Guardrails**
- Negative `Quantity` → absolute value (data capture issues).  
- Clip `Discount` to `[0,1]` to avoid invalid math.

**Derived fields**
- `Ship Lead Time (days)` = `Ship Date` − `Order Date`  
- `Year`, `Month` (timestamp for monthly trending)  
- `Profit Margin` = `Profit / Sales` (safe with zeros → NaN)

**Missingness snapshot**
- Display null% by column to justify imputation decisions (e.g., Postal Code often ~80% missing outside US).

> **Portfolio note:** Explain *why* each guardrail exists (e.g., to stabilize metrics / avoid divide‑by‑zero).

In [None]:

df = orders.copy()

# Types
for c in ["Order Date","Ship Date"]:
    df[c] = pd.to_datetime(df[c], errors="coerce")
for c in ["Sales","Profit","Discount","Shipping Cost","Quantity"]:
    if c in df.columns:
        df[c] = pd.to_numeric(df[c], errors="coerce")
if "Postal Code" in df.columns:
    df["Postal Code"] = df["Postal Code"].astype("string")

# Categoricals
for c in ["Ship Mode","Segment","City","State","Country","Market","Region",
          "Category","Sub-Category","Order Priority","Customer Name","Person"]:
    if c in df.columns:
        df[c] = df[c].astype("string").str.strip().str.replace(r"\s+"," ", regex=True)
for c in ["City","State"]:
    if c in df.columns:
        df[c] = df[c].str.title()

# Deduplicate
before = len(df)
if "Row ID" in df.columns:
    df = df.drop_duplicates(subset=["Row ID"])
else:
    subset = [x for x in ["Order ID","Product ID","Order Date","Customer ID"] if x in df.columns]
    df = df.drop_duplicates(subset=subset)
print("Deduped rows:", before - len(df))

# Guardrails
neg_qty = df["Quantity"] < 0
if neg_qty.any():
    df.loc[neg_qty, "Quantity"] = df.loc[neg_qty, "Quantity"].abs()
if "Discount" in df.columns:
    df["Discount"] = df["Discount"].clip(0,1)

# Features
df["Ship Lead Time (days)"] = (df["Ship Date"] - df["Order Date"]).dt.days
df["Year"]  = df["Order Date"].dt.year
df["Month"] = df["Order Date"].dt.to_period("M").dt.to_timestamp()
df["Profit Margin"] = np.where(df["Sales"].ne(0), df["Profit"]/df["Sales"], np.nan)

# Missingness snapshot
nulls = df.isna().mean().sort_values(ascending=False)
display(nulls.head(12))

orders_clean = df


## 4) Data-quality flags + viz-friendly copy

In [None]:

def robust_group_outliers(frame, col, by=["Market","Category"], z=4.5, min_group=30):
    g = frame.groupby(by, dropna=False)
    out = pd.Series(False, index=frame.index)
    for _, part in g:
        if len(part) < min_group: 
            continue
        x = pd.to_numeric(part[col], errors="coerce")
        med = x.median()
        mad = np.median(np.abs(x - med))
        if mad == 0 or np.isnan(mad): 
            continue
        zrob = 0.6745 * (x - med) / mad
        out.loc[part.index] = zrob.abs() > z
    return out

dq_flags = pd.DataFrame(index=orders_clean.index)
for c in ["Sales","Profit","Shipping Cost"]:
    dq_flags[f"{c}_outlier"] = robust_group_outliers(orders_clean, c)
dq_flags["neg_lead"]  = orders_clean["Ship Lead Time (days)"] < 0
lt_99 = orders_clean["Ship Lead Time (days)"].quantile(0.99)
dq_flags["long_lead"] = orders_clean["Ship Lead Time (days)"] > lt_99
display(dq_flags.sum().to_frame("count").T)

def winsorize_group(frame, col, by=["Market","Category"], q_low=0.005, q_high=0.995, min_group=30):
    x = frame[col].copy()
    g = frame.groupby(by, dropna=False)
    for _, part in g:
        if len(part) < min_group: 
            continue
        lo = part[col].quantile(q_low)
        hi = part[col].quantile(q_high)
        x.loc[part.index] = part[col].clip(lo, hi)
    return x

orders_viz = orders_clean.copy()
for c in ["Sales","Profit","Shipping Cost"]:
    orders_viz[c] = winsorize_group(orders_viz, c)


## 5) Missing-value strategy (hierarchical, only when needed)

**Approach**
- Fill `Shipping Cost` / `Discount` via hierarchical medians: (`Ship Mode`,`Market`) → (`Ship Mode`) → global.  
- If `Ship Date` is missing, impute as `Order Date + median lead` for that (`Ship Mode`,`Market`). Recompute lead.

**Policy on Postal Code**
- Leave nulls as-is (very high missingness outside US). Do **not** fabricate postal codes.

**Auditability**
- Create `_imputed` flags so we can quantify where imputations were applied (transparency for stakeholders).

In [None]:

clean = orders_clean.copy()

def fill_by_hierarchy(df, value_col, levels):
    out = df[value_col].copy()
    imputed_from_null = out.isna()
    for groups in levels:
        if not imputed_from_null.any(): 
            break
        if groups:
            med = df.groupby(groups, dropna=False)[value_col].median()
            idx = imputed_from_null
            out.loc[idx] = df.loc[idx, groups].apply(lambda r: med.get(tuple(r.values), np.nan), axis=1)
            imputed_from_null = out.isna()
        else:
            out.loc[imputed_from_null] = df[value_col].median()
            imputed_from_null = out.isna()
    flag = df[value_col].isna() & out.notna()
    return out, flag

for col in ["Shipping Cost","Discount"]:
    if col in clean.columns and clean[col].isna().any():
        filled, flag = fill_by_hierarchy(clean, col, [["Ship Mode","Market"],["Ship Mode"],[]])
        clean[col] = filled.clip(0,1) if col=="Discount" else filled
        clean[f"{col.replace(' ','')}_imputed"] = False
        clean.loc[flag, f"{col.replace(' ','')}_imputed"] = True

ship_null = clean["Ship Date"].isna()
if ship_null.any():
    med_lead = (clean.assign(lead=(clean["Ship Date"]-clean["Order Date"]).dt.days)
                      .groupby(["Ship Mode","Market"], dropna=False)["lead"].median().to_dict())
    def impute_sd(r):
        if pd.isna(r["Ship Date"]) and pd.notna(r["Order Date"]):
            val = med_lead.get((r.get("Ship Mode", np.nan), r.get("Market", np.nan)), np.nan)
            if pd.notna(val):
                return r["Order Date"] + pd.Timedelta(days=int(val))
        return r["Ship Date"]
    clean.loc[ship_null,"Ship Date"] = clean.loc[ship_null].apply(impute_sd, axis=1)
    clean["Ship Lead Time (days)"] = (clean["Ship Date"] - clean["Order Date"]).dt.days
    clean["ShipDate_imputed"] = ship_null & clean["Ship Date"].notna()

clean["has_postal_code"] = clean.get("Postal Code", pd.Series(index=clean.index)).notna()

orders_clean = clean


## 6) Tests & data dictionary

**Lightweight tests**
- No missing `Order ID`.
- `Discount` is within `[0,1]`.
- `Quantity` is non-negative.

**Data dictionary**
- Short, human-friendly descriptions for key fields, alongside dtypes.

> **Portfolio note:** Small assertions show discipline. They’re simple to read and signal quality.

In [None]:

# Tests
assert orders_clean["Order ID"].isna().sum() == 0
assert (orders_clean["Discount"].between(0,1)).all()
assert orders_clean["Quantity"].ge(0).all()

# Data dictionary
descriptions = {
    "Order ID":"Order identifier (order-level)",
    "Row ID":"Unique line identifier (line-level)",
    "Order Date":"Date order was placed",
    "Ship Date":"Date order shipped",
    "Ship Mode":"Fulfillment speed/tier",
    "Customer ID":"Customer identifier",
    "Customer Name":"Customer full name",
    "Segment":"Customer segment",
    "City":"Ship-to city",
    "State":"Ship-to state/province",
    "Country":"Ship-to country",
    "Postal Code":"Ship-to postal/ZIP",
    "Market":"Market region code (US, EU, APAC, LATAM, etc.)",
    "Region":"Sales territory (used to map sales reps)",
    "Product ID":"Product SKU",
    "Category":"Product category",
    "Sub-Category":"Product sub-category",
    "Product Name":"Product description",
    "Sales":"Line revenue",
    "Quantity":"Units on the line",
    "Discount":"Fractional discount (0–1)",
    "Profit":"Line profit (can be negative)",
    "Shipping Cost":"Freight cost for line",
    "Order Priority":"Order urgency flag",
    "Returned":"True if order is in Returns tab",
    "Person":"Sales rep assigned to region",
    "Ship Lead Time (days)":"Days from order to ship",
    "Profit Margin":"Profit / Sales (line-level)",
    "Year":"Order year",
    "Month":"Order year-month (timestamp)",
}
dd = (pd.DataFrame({
        "column": orders_clean.columns,
        "dtype": [str(t) for t in orders_clean.dtypes],
        "description": [descriptions.get(c,"") for c in orders_clean.columns]
     }))
dd.head(20)


## 7) EDA — distributions & trends (uses `orders_viz`)

**Visuals**
- **Monthly Sales & Profit:** trend checks; seasonality; structural breaks.  
- **Top Sub-Categories by Profit:** where value concentrates.  
- **Discount vs Profit Margin:** sanity-check negative relationship; color by Category to see differing patterns.

**Reading tips**
- Use `orders_viz` for charts to avoid outliers dominating visuals.  
- Keep KPI computations on `orders_clean` (raw) to preserve truth.

In [None]:

# Monthly sales & profit
monthly = orders_viz.groupby("Month", as_index=False).agg(sales=("Sales","sum"), profit=("Profit","sum"))
ax = monthly.plot(x="Month", y=["sales","profit"], title="Monthly Sales & Profit (winsorized for viz)")
ax.set_xlabel("Month"); ax.set_ylabel("Value")
plt.tight_layout(); plt.show()

# Top sub-categories by profit
top_sub = (orders_viz.groupby("Sub-Category", as_index=False)
                .agg(profit=("Profit","sum"))
                .sort_values("profit", ascending=False)
                .head(15))
fig = px.bar(top_sub, x="Sub-Category", y="profit", title="Top 15 Sub-Categories by Profit (viz)")
fig.update_layout(xaxis_tickangle=-35); fig.show()

# Discount vs Profit Margin
tmp = orders_viz.copy()
tmp["Profit Margin"] = np.where(tmp["Sales"]!=0, tmp["Profit"]/tmp["Sales"], np.nan)
fig = px.scatter(tmp, x="Discount", y="Profit Margin", color="Category",
                 title="Discount vs Profit Margin (viz)", hover_data=["Sub-Category","Product Name"])
fig.show()


## 8) SQL analysis (DuckDB in-memory)

**Queries shown**
- **Category KPIs:** SUM(Sales), SUM(Profit), and Profit Margin.  
- **Return rate** by Market × Category (uses distinct order counts to avoid line duplication).  
- **Top product per sub-category** via window function (`ROW_NUMBER()` over profits).

**Why DuckDB**
- In-memory, zero setup; shows you’re comfortable switching between pandas and SQL idioms.

In [None]:

con = duckdb.connect(database=":memory:")
con.register("orders", orders_clean)
con.register("returns", returns)
con.register("people", people)
con.execute("CREATE OR REPLACE VIEW v_orders  AS SELECT * FROM orders;")
con.execute("CREATE OR REPLACE VIEW v_returns AS SELECT * FROM returns;")
con.execute("CREATE OR REPLACE VIEW v_people  AS SELECT * FROM people;")
print("Views ready.")

sql_category = '''
SELECT Category,
       SUM(Sales)  AS Sales,
       SUM(Profit) AS Profit,
       CASE WHEN SUM(Sales) <> 0 THEN SUM(Profit)/SUM(Sales) END AS Profit_Margin
FROM v_orders
GROUP BY 1
ORDER BY Sales DESC;
'''
df_category = con.execute(sql_category).df()
df_category.head()

sql_return_rate = '''
SELECT o.Market, o.Category,
       COUNT(DISTINCT o."Order ID") AS orders,
       COUNT(DISTINCT r."Order ID") AS returned_orders,
       COUNT(DISTINCT r."Order ID") * 1.0 / NULLIF(COUNT(DISTINCT o."Order ID"),0) AS return_rate
FROM v_orders o
LEFT JOIN v_returns r ON o."Order ID" = r."Order ID"
GROUP BY 1,2
ORDER BY return_rate DESC NULLS LAST;
'''
df_return_rate = con.execute(sql_return_rate).df()
df_return_rate.head()

sql_top_product = '''
WITH prod_profit AS (
  SELECT "Sub-Category", "Product ID", "Product Name",
         SUM(Profit) AS profit
  FROM v_orders
  GROUP BY 1,2,3
)
SELECT * FROM (
  SELECT *,
         ROW_NUMBER() OVER (PARTITION BY "Sub-Category" ORDER BY profit DESC) AS rn
  FROM prod_profit
)
WHERE rn = 1
ORDER BY profit DESC;
'''
df_top_product = con.execute(sql_top_product).df()
df_top_product.head(10)


## 9) Discount–margin elasticity (quick regression)

In [None]:

def discount_elasticity_by_subcat(df: pd.DataFrame, min_rows:int=100, min_unique:int=5) -> pd.DataFrame:
    out = []
    g = df.copy()
    g = g[g["Sales"] > 0].copy()
    g["margin"] = np.where(g["Sales"]!=0, g["Profit"]/g["Sales"], np.nan)
    g = g.replace([np.inf,-np.inf], np.nan).dropna(subset=["Discount","margin"])

    for subcat, part in g.groupby("Sub-Category"):
        if len(part) < min_rows or part["Discount"].nunique() < min_unique:
            continue
        x = part["Discount"].values
        y = part["margin"].values
        a, b = np.polyfit(x, y, 1)   # slope, intercept
        yhat = a*x + b
        ss_res = np.sum((y - yhat)**2)
        ss_tot = np.sum((y - y.mean())**2)
        r2 = 1 - ss_res/ss_tot if ss_tot != 0 else np.nan
        out.append({"Sub-Category": subcat, "slope": a, "r2": r2, "n": len(part)})
    return (pd.DataFrame(out).sort_values(["slope","r2"], ascending=[True, False]))

elasticity = discount_elasticity_by_subcat(orders_clean, min_rows=100, min_unique=5)
elasticity.head(10)



### 9.1) Intercepts & Break‑Even Discount (manager‑friendly table)

- We extend the Step 9 fit (`margin = a·discount + b`) with:
  - **intercept** `b` → expected margin at **0%** discount
  - **break‑even discount** `d* = −b / a` → where expected margin hits **0**
  - **margin drop per +5pp/+10pp** → slope scaled by 0.05 / 0.10
- We keep results even if `d*` falls outside [0,1] (that tells us break‑even is beyond typical discount ranges).
- Exported to `reports/discount_elasticity_by_subcat.csv`.


In [None]:

import numpy as np
import pandas as pd
from pathlib import Path

def fit_slope_intercept(x: np.ndarray, y: np.ndarray):
    """Fit y = a*x + b via OLS (numpy.polyfit) and return (a, b, r2)."""
    a, b = np.polyfit(x, y, 1)  # unweighted; could add weights via 'w=' if desired
    yhat = a*x + b
    ss_res = np.sum((y - yhat)**2)
    ss_tot = np.sum((y - y.mean())**2)
    r2 = 1 - ss_res/ss_tot if ss_tot != 0 else np.nan
    return a, b, r2

def elasticity_with_break_even(df: pd.DataFrame, min_rows:int=100, min_unique:int=5) -> pd.DataFrame:
    base = df.copy()
    base = base[base["Sales"] > 0].copy()
    base["margin"] = np.where(base["Sales"] != 0, base["Profit"]/base["Sales"], np.nan)
    base = base.replace([np.inf, -np.inf], np.nan).dropna(subset=["Discount","margin"])

    rows = []
    for subcat, part in base.groupby("Sub-Category"):
        if len(part) < min_rows or part["Discount"].nunique() < min_unique:
            continue
        x = part["Discount"].to_numpy()
        y = part["margin"].to_numpy()
        a, b, r2 = fit_slope_intercept(x, y)  # slope, intercept, r2
        dstar = -b/a if a != 0 else np.nan  # break-even discount
        rows.append({
            "Sub-Category": subcat,
            "slope": a,
            "intercept": b,
            "r2": r2,
            "n": len(part),
            "d_star": dstar,
            "d_star_clipped_0_1": np.clip(dstar, 0, 1) if np.isfinite(dstar) else np.nan,
            "margin_drop_per_5pp": a * 0.05,
            "margin_drop_per_10pp": a * 0.10,
            "slope_positive": bool(a > 0)
        })
    out = (pd.DataFrame(rows)
           .sort_values(["slope","r2"], ascending=[True, False])  # most negative slopes first
           .reset_index(drop=True))
    return out

elasticity_be = elasticity_with_break_even(orders_clean, min_rows=100, min_unique=5)

# Pretty view + export
display_cols = ["Sub-Category","slope","intercept","r2","n","margin_drop_per_5pp","margin_drop_per_10pp","d_star","d_star_clipped_0_1"]
display(elasticity_be[display_cols].head(15))

REPORTS = Path("reports"); REPORTS.mkdir(parents=True, exist_ok=True)
elasticity_be.to_csv(REPORTS / "discount_elasticity_by_subcat.csv", index=False)
print("Saved:", (REPORTS / "discount_elasticity_by_subcat.csv").resolve())


## 10) Customer acquisition cohorts & retention

**How it’s built**
- Cohort = customer’s **first order month**.  
- `months_since_cohort` measures elapsed months for each subsequent order.  
- Pivot to get a **retention matrix** (0..12 months).

**Interpreting the table**
- Row 0 shows cohort size.  
- Decay shape across columns indicates retention strength.  
- Compare cohorts to see if newer cohorts retain better/worse.

In [None]:

dfc = orders_clean.dropna(subset=["Customer ID","Order ID","Order Date"]).copy()
dfc["Order Date"] = pd.to_datetime(dfc["Order Date"])
dfc["cohort"] = dfc.groupby("Customer ID")["Order Date"].transform(lambda s: s.min().to_period("M").to_timestamp())
dfc["period"] = dfc["Order Date"].dt.to_period("M").dt.to_timestamp()
dfc["months_since_cohort"] = ((dfc["period"].dt.year - dfc["cohort"].dt.year) * 12
                              + (dfc["period"].dt.month - dfc["cohort"].dt.month))

active = (dfc.groupby(["cohort","months_since_cohort"])["Customer ID"]
              .nunique().rename("active_customers").reset_index())
cohort_size = active[active["months_since_cohort"]==0][["cohort","active_customers"]].rename(columns={"active_customers":"cohort_size"})
ret = active.merge(cohort_size, on="cohort", how="left")
ret["retention"] = ret["active_customers"] / ret["cohort_size"]
ret_pivot = ret.pivot(index="cohort", columns="months_since_cohort", values="retention").fillna(0.0)
ret_pivot.iloc[:,:13].round(3)


## 11) Operations — lead times & on-time performance

**KPIs**
- SLA = 5 days on-time definition.  
- Mean lead time, **P95** lead time, on-time %, and unique orders by `Ship Mode`.

**Why P95**
- More robust than max; reflects “typical worst case” experience.

In [None]:

ops = orders_clean[["Order ID","Ship Mode","Ship Lead Time (days)"]].dropna()
sla_days = 5
ops["on_time"] = (ops["Ship Lead Time (days)"] <= sla_days).astype(int)

ops_summary = (ops.groupby("Ship Mode", as_index=False)
                 .agg(avg_lead=("Ship Lead Time (days)","mean"),
                      p95_lead=("Ship Lead Time (days)", lambda s: np.percentile(s,95)),
                      on_time_rate=("on_time","mean"),
                      orders=("Order ID","nunique"))
                 .sort_values("on_time_rate", ascending=False))
ops_summary


## 12) Star-schema exports for BI

**What we export**
- **fact_order_lines.csv** (all analysis fields, including Returned/Rep/Lead-Time/Margin/Year/Month).  
- Dims: **dim_customer**, **dim_product**, **dim_region**, **dim_market** (deduplicated keys).

**How to model in Power BI**
- `fact_order_lines` joins to each dim on its key (`Customer ID`, `Product ID`, `Region`, `Market`).  
- Build simple measures (Sales, Profit, Return Rate) over the fact; slice by dims.

In [None]:

from pathlib import Path
DATA_OUT = Path("data/processed"); DATA_OUT.mkdir(parents=True, exist_ok=True)

fact_cols = ["Row ID","Order ID","Order Date","Ship Date","Ship Mode","Customer ID","Customer Name","Segment",
             "City","State","Country","Postal Code","Market","Region","Product ID","Category","Sub-Category",
             "Product Name","Sales","Quantity","Discount","Profit","Shipping Cost","Order Priority",
             "Returned","Person","Ship Lead Time (days)","Profit Margin","Year","Month"]
fact = orders_clean[fact_cols].copy()
fact.to_csv(DATA_OUT / "fact_order_lines.csv", index=False)

dim_customer = (orders_clean[["Customer ID","Customer Name","Segment","City","State","Country","Postal Code"]]
                .drop_duplicates().sort_values("Customer ID"))
dim_product  = (orders_clean[["Product ID","Category","Sub-Category","Product Name"]]
                .drop_duplicates().sort_values("Product ID"))
dim_region   = (orders_clean[["Region","Person"]].drop_duplicates().sort_values("Region"))
dim_market   = (orders_clean[["Market"]].drop_duplicates().sort_values("Market"))

dim_customer.to_csv(DATA_OUT / "dim_customer.csv", index=False)
dim_product.to_csv(DATA_OUT / "dim_product.csv", index=False)
dim_region.to_csv(DATA_OUT / "dim_region.csv", index=False)
dim_market.to_csv(DATA_OUT / "dim_market.csv", index=False)

print("Saved:", [str(DATA_OUT / n) for n in [
    "fact_order_lines.csv","dim_customer.csv","dim_product.csv","dim_region.csv","dim_market.csv"
]])
