# 010_distributional_enhancements.ipynb

## Part A — What we are doing (and the questions this answers)

This notebook extends the distributional analysis to address four specific requests:

1. **Percent-change in after-tax income by income decile**  
   We compute after-tax income (ATI) under **baseline** and **reform** and report percent changes by **AGI-weighted deciles**, plus **Top 5%**, **Top 1%**, and **All**.

2. **Baseline marginal tax rates (MTRs) on AGI before repeal**  
   We measure marginal tax rates using an **exact +$1 in wages** method, recomputing household taxes via PolicyEngine with a **person-level input mapping**.

3. **VAT distribution by factor income**  
   We extend the distributional tables to show **total wages** and **total capital income (AGI – wages)** by **equivalized income decile**, plus **Top 5% / Top 1% / All**.

4. **Totals consistency checks**  
   We verify that decile totals reconcile to statewide totals. As a cross-check, the **sum of tax changes** from eliminating the income tax is ≈ **California income tax revenue** (target **~$130B** with a small tolerance).

---

## Part B — Inputs & dependencies

- **Panel** (from Step 01): `../intermediate/ca_panel_2024.parquet` (or `.csv`)  
  Columns used:
  - `household_size`, `household_weight` (normalized to `weight`)
  - `household_agi`, `employment_income`
  - `fed_income_tax`, `ca_income_tax` *(net of credits)*
  - `filing_status`, `is_married_couple`, `size_bucket`
  - `consumption_allowance`, `rebate_after_phaseout`
- **Config**: `../config/columns.yaml`
- **Helpers**: `../policy/vat_rebate.py` (deciles, allowance, phaseout)
- **Baseline recompute**: `../policy/baseline_taxes.py` (**person-level** `set_input` version)

**Assumptions**
- Step 01 already filtered to **CA households** and **AGI ≥ 0**, and applied the **weight deflator**.
- `fed_income_tax` and `ca_income_tax` are **net of credits** in the panel.

---

## Part C — Key definitions

### Deciles and top groups
- **AGI deciles (`agi_decile`)**: used for **after-tax income** comparisons.
- **Equivalized income deciles (`equiv_decile`)**: used for **VAT wage/capital** distribution:
  \[
  \text{equiv\_income} = \frac{\text{AGI}}{\max(\text{household\_size}, 1)}
  \]
- **Top 5% / Top 1%**: thresholds computed from **equivalized income** with household weights.

### After-tax income (ATI)
- **Baseline** (taxes are net of credits in the panel):
  \[
  \text{ATI}_{\text{base}} = \text{AGI} - (\text{fed\_income\_tax} + \text{ca\_income\_tax})
  \]
- **Reform** (**no income tax + VAT rebate**, no other changes):
  \[
  \text{ATI}_{\text{reform}} = \text{AGI} + \text{rebate\_after\_phaseout}
  \]
- **Percent change**:
  \[
  \%\Delta = 100 \times \frac{\text{ATI}_{\text{reform}} - \text{ATI}_{\text{base}}}{|\text{ATI}_{\text{base}}|}
  \]

### Factor incomes
- **Wage income**: `employment_income`
- **Capital income**:
  \[
  \text{capital} = \text{AGI} - \text{employment\_income}
  \]
  We also report **positive capital** and **capital losses** separately.

### Baseline marginal tax rate (MTR) on AGI
- Method: **exact +$1** to **wages** for households with baseline wages > 0.
- Implementation: the +$1 household change is **allocated proportionally to persons** and fed to PolicyEngine with **person-level** `set_input`; then we recompute **federal + state** taxes and take the difference:
  \[
  \text{MTR} = (\text{tax}_{+\$1} - \text{tax}_{\text{base}})
  \]
- Reported as **population-weighted means** by **AGI decile** and for **All**.

---

## Part D — Methods (step-by-step)

1. **Load & normalize panel**  
   Read Step 01 panel; normalize weight to `weight`. Ensure allowance/phaseout columns exist (recompute if missing).

2. **After-tax income by AGI decile (+ Top 5% / Top 1% / All)**  
   - Build **AGI deciles** with weighted cutoffs.
   - Compute **ATI baseline** and **ATI reform**; report **percent changes**.
   - Compute **Top 5% / Top 1%** using **equivalized** income thresholds for consistency with Step 04.

3. **VAT distribution by factor income (equivalized deciles)**  
   - Build **equivalized deciles**.
   - Aggregate **weighted totals** of **wages**, **capital**, **positive capital**, **capital losses**.
   - Include **Top 5% / Top 1% / All** rows.

4. **Baseline MTRs (+$1 wages; exact)**  
   - Copy the panel and add **+$1** to `employment_income` for households with **wages > 0**.
   - Call `baseline_taxes.run_baseline_taxes_2024(df_like)` (person-level mapping).
   - Compute **MTR = (tax_plus - tax_base)** and aggregate by **AGI decile** and **All**.

5. **Consistency checks (rigor aligned to 01–04)**  
   - **Population reconciliation:** decile population sums ≈ statewide population.
   - **Wage / capital reconciliation:** decile sums = statewide totals.
   - **Statewide change:** sum of decile changes = statewide change.
   - **Income-tax magnitude:** weighted CA income tax baseline ≈ **$130B** (tolerance **±10%**).
   - **Zero-wage HH sanity:** mean MTR among zero-wage households ≈ 0.

---

## Part E — Outputs (machine-readable)

All files write to `../outputs/vat/`.

1. `after_tax_income_by_decile_2024.csv`  
   **Rows:** `decile_1..decile_10`, `top_5pct`, `top_1pct`, `all`  
   **Columns:**  
   - `year`  
   - `group`  
   - `after_tax_income_baseline`  
   - `after_tax_income_reform`  
   - `percent_change`  
   - `households_weighted`

2. `distribution_wage_capital_2024.csv`  
   **Rows:** `decile_1..decile_10`, `top_5pct`, `top_1pct`, `all`  
   **Columns:**  
   - `group`, `pop_weight`  
   - `total_wages`  
   - `total_capital`, `total_capital_pos`, `total_capital_losses`  
   - `total_change` *(reform tax – baseline tax; rebate counted as negative tax)*  
   - `share_of_total_change_signed_%` *(deciles only)*  
   - `share_of_relief_%` *(deciles only; share of \-total_change among groups receiving relief)*

3. `mtr_rebate_only_by_decile_2024.csv`  
   **Rows:** `decile_1..decile_10`, `all`  
   **Columns:**  
   - `year`, `group`  
   - `mtr_population_weighted` *(rebate clawback per +$1 AGI)*  
   - `note="rebate_only_exact"`

4. `mtr_baseline_income_taxes_by_decile_2024.csv`  
   **Rows:** `decile_1..decile_10`, `all`  
   **Columns:**  
   - `year`, `group`  
   - `mtr_population_weighted` *(exact +$1 person-level recompute)*  
   - `note="baseline_income_tax_exact"`

5. `010_summary_2024.md` and `010_summary_2024.html`  
   Concise textual summary of the key tables and diagnostics.

---

## Part F — Acceptance checks (must pass)

- **Decile pop shares sum to statewide** (± rounding).
- **Sum of decile wages/capital** = **statewide totals**.
- **Sum of decile `total_change`** = **statewide tax change**.
- **CA income tax baseline ≈ $130B** within **±10%** tolerance.  
  *Note:* Your Step-04 diagnostic printed ~\$119B; that passes.
- **Zero-wage households** have **≈0** average baseline MTR.

If any of these fail, the notebook **raises** (or prints a clear diagnostic) so it’s easy to spot and fix.

---

## Part G — How to rerun

1. Ensure Steps **01–04** are up to date (especially Step 01 panel).  
2. Confirm `../policy/baseline_taxes.py` contains the **person-level** `run_baseline_taxes_2024` (the notebook checks this automatically).  
3. Run `010_distributional_enhancements.ipynb`.  
4. Inspect the printed diagnostics and the CSV/MD outputs in `../outputs/vat/`.

---

## Part H — Troubleshooting

- **All MTRs are exactly zero**  
  You’re likely using an old `baseline_taxes.py` that sets wages at the **household** level. Replace it with the **person-level** version and rerun. The notebook asserts that the person-level setter is present.

- **Row-alignment mismatch**  
  The recompute requires that `df_like` match **CA & AGI≥0** households in **Step-01 order**. Don’t sub-sample or reorder the panel prior to the recompute call.

- **CA income tax total is far from \$130B**  
  Check that you’re using **2024** and have the **Step-01 weight deflator**. Increase tolerance only if your benchmark year differs.

- **Negative capital income**  
  That’s expected (losses). We report both **positive capital** and **losses** so totals reconcile:
  \[
  \text{capital} = \text{capital\_pos} - \text{capital\_losses}
  \]

---

## Part I — Versioning & parameters

- **Year**: 2024  
- **Decile labels**: `decile_1..decile_10`  
- **Top groups**: `top_5pct`, `top_1pct` (based on **equivalized** thresholds)  
- **Weighting**: Household weights from Step 01 (post-deflator)  
- **Tolerance**: CA IIT revenue check **±10%**

---

## Part J — Summary of what changed vs 01–04

- Moved from average tax **levels** to **percent-change in after-tax income** by **AGI decile** (+ top groups).
- Added **exact** baseline MTRs via **+\$1 person-level** recompute (not just rebate clawback).
- Added **wage vs. capital** totals by **equivalized decile** to inform VAT incidence.
- Strengthened **consistency checks** so decile-level rollups reconcile to statewide totals and revenue magnitudes.



In [4]:
# ==== 010_distributional_enhancements — end-to-end (EXACT +$1 income-tax MTRs) ====
# Files written:
#   - outputs/vat/after_tax_income_by_decile_2024.csv                (deciles + top_5pct + top_1pct + all)
#   - outputs/vat/distribution_wage_capital_2024.csv                 (equivalized deciles + top_5pct + top_1pct + all)
#   - outputs/vat/mtr_rebate_only_by_decile_2024.csv                 (deciles + all)
#   - outputs/vat/mtr_baseline_income_taxes_by_decile_2024.csv       (deciles + all; exact +$1 via PolicyEngine)
#   - outputs/vat/010_summary_2024.(md|html)

import os, sys, numpy as np, pandas as pd, yaml, importlib.util, inspect, textwrap
from pathlib import Path

pd.options.display.float_format = "{:.6f}".format

# -----------------------------------------------------------------------------------
# Paths & helpers
# -----------------------------------------------------------------------------------
ROOT = Path.cwd()
INTERMEDIATE = Path("../intermediate")
OUTPUTS = Path("../outputs/vat"); OUTPUTS.mkdir(parents=True, exist_ok=True)
COLMAP_PATH = "../config/columns.yaml"

def import_from(path: str, modname: str):
    """Load a module from an explicit path and register it in sys.modules (so reload works)."""
    spec = importlib.util.spec_from_file_location(modname, path)
    if spec is None or spec.loader is None:
        raise ImportError(f"Could not load {modname} from {path}")
    mod = importlib.util.module_from_spec(spec)
    sys.modules[modname] = mod
    spec.loader.exec_module(mod)
    return mod

def wmean(x: pd.Series, w: pd.Series) -> float:
    x = pd.to_numeric(x, errors="coerce")
    w = pd.to_numeric(w, errors="coerce")
    T = float(w.sum())
    return float((x * w).sum() / T) if T > 0 else np.nan

def weighted_percentile_threshold(x: pd.Series, w: pd.Series, q: float) -> float:
    """Weighted quantile threshold (e.g., q=0.95 for 95th)."""
    s = pd.DataFrame({"x": pd.to_numeric(x, errors="coerce"), "w": pd.to_numeric(w, errors="coerce")}).sort_values("x")
    cw = s["w"].cumsum()
    tot = s["w"].sum()
    if tot <= 0:
        return np.nan
    idx = cw.searchsorted(q * tot, side="left")
    idx = int(min(max(idx, 0), len(s) - 1))
    return float(s["x"].iloc[idx])

# -----------------------------------------------------------------------------------
# Load helpers/modules
# -----------------------------------------------------------------------------------
if not os.path.exists(COLMAP_PATH):
    raise FileNotFoundError(f"Missing {COLMAP_PATH}; run 00/01 to generate columns.yaml.")
with open(COLMAP_PATH) as f:
    col_map = yaml.safe_load(f)

# vat_rebate helpers
vr = import_from(os.path.abspath("../policy/vat_rebate.py"), "vat_rebate")
print("[info] loaded:", vr.__file__)

# baseline_taxes (person-level set_input implementation)
bt = None
for p in [
    os.path.abspath("../policy/baseline_taxes.py"),
    r"C:\Users\Ali.Melad\Dropbox\Ali Work\Kyle\California VAT\policy_engile_cali_v2\policy\baseline_taxes.py",
]:
    if os.path.exists(p):
        bt = import_from(p, "baseline_taxes")
        print("[info] baseline_taxes module:", p)
        break
if bt is None:
    raise FileNotFoundError("baseline_taxes.py not found in ../policy/ or your Windows path.")

# verify person-level code is in use
src = inspect.getsource(bt.run_baseline_taxes_2024)
assert ('map_to="person"' in src) and ('set_input(' in src) and ("target_wages_hh" not in src), \
    "baseline_taxes.run_baseline_taxes_2024 must be the person-level version."

# -----------------------------------------------------------------------------------
# Load Step 01 panel (CA-only, AGI>=0, weights already deflated/scaled)
# -----------------------------------------------------------------------------------
parq = INTERMEDIATE / "ca_panel_2024.parquet"
csv  = INTERMEDIATE / "ca_panel_2024.csv"
panel_path = parq if parq.exists() else (csv if csv.exists() else None)
if panel_path is None:
    raise FileNotFoundError("Missing panel; run Step 01 to create ca_panel_2024.(parquet|csv).")

if str(panel_path).endswith(".parquet"):
    df = pd.read_parquet(panel_path)
else:
    # CSV may contain commas as thousands separators
    df = pd.read_csv(panel_path, thousands=",")
print(f"[info] panel: {panel_path} shape={df.shape}")

# Normalize weight (like 02/04)
if "weight" not in df.columns:
    w_alias = [c for c in df.columns if c.lower() in ("household_weight","weight","hh_weight")]
    if not w_alias:
        raise KeyError("No weight column found (household_weight/weight/hh_weight).")
    df["weight"] = pd.to_numeric(df[w_alias[0]], errors="coerce").fillna(0.0)
else:
    df["weight"] = pd.to_numeric(df["weight"], errors="coerce").fillna(0.0)
print(f"[diag] Weighted CA households (post Step-01 deflator): {df['weight'].sum():,.0f}")

# Ensure allowance & phaseout columns exist (compute if missing)
if "consumption_allowance" not in df.columns:
    need = {"size_bucket","is_married_couple"}
    missing = [m for m in need if m not in df.columns]
    if missing:
        raise KeyError(f"Missing {missing} required to compute allowance.")
    df = vr.compute_allowance(df)

if "rebate_after_phaseout" not in df.columns:
    df = vr.apply_phaseout(df)

# Ensure numeric types for key cols
for c in [
    "household_size","household_weight","household_agi","employment_income",
    "fed_income_tax","ca_income_tax","rebate_after_phaseout"
]:
    if c in df.columns:
        df[c] = pd.to_numeric(df[c], errors="coerce").fillna(0.0)

# -----------------------------------------------------------------------------------
# 1) Percent-change in after-tax income by AGI decile (+ Top 5% / Top 1% / All)
# -----------------------------------------------------------------------------------
# Per Kyle: use weighted AGI deciles
df = vr.add_weighted_deciles(df, income_col="household_agi", weight_col="weight", label="agi_decile")
df["agi_decile"] = df["agi_decile"].astype(int)

# After-tax income definitions (taxes are net of credits in panel)
df["after_tax_income_baseline"] = df["household_agi"] - (df["fed_income_tax"] + df["ca_income_tax"])
df["after_tax_income_reform"]   = df["household_agi"] + df["rebate_after_phaseout"]  # no income tax + rebate

# Top 5% / Top 1% defined on equivalized income (AGI / size), consistent with Step 04
df["equiv_income"] = df["household_agi"] / np.maximum(df["household_size"], 1.0)
p95 = weighted_percentile_threshold(df["equiv_income"], df["weight"], 0.95)
p99 = weighted_percentile_threshold(df["equiv_income"], df["weight"], 0.99)
df["top_5pct"] = (df["equiv_income"] >= p95).astype(int)
df["top_1pct"] = (df["equiv_income"] >= p99).astype(int)

rows_at = []
# Deciles 1..10
for d in range(1, 11):
    g = df["agi_decile"].eq(d)
    w = df.loc[g, "weight"]
    base = wmean(df.loc[g, "after_tax_income_baseline"], w)
    refm = wmean(df.loc[g, "after_tax_income_reform"],   w)
    pct  = 100.0 * (refm - base) / abs(base) if base != 0 else np.nan
    rows_at.append(dict(year=2024, group=f"decile_{d}",
                        after_tax_income_baseline=base,
                        after_tax_income_reform=refm,
                        percent_change=pct,
                        households_weighted=float(w.sum())))

# Top groups
for label, mask in [("top_5pct", df["top_5pct"].eq(1)), ("top_1pct", df["top_1pct"].eq(1)), ("all", pd.Series(True, index=df.index))]:
    g = mask
    w = df.loc[g, "weight"]
    base = wmean(df.loc[g, "after_tax_income_baseline"], w)
    refm = wmean(df.loc[g, "after_tax_income_reform"],   w)
    pct  = 100.0 * (refm - base) / abs(base) if base != 0 else np.nan
    rows_at.append(dict(year=2024, group=label,
                        after_tax_income_baseline=base,
                        after_tax_income_reform=refm,
                        percent_change=pct,
                        households_weighted=float(w.sum())))

after_tax_tbl = pd.DataFrame(rows_at)
order_groups = [f"decile_{i}" for i in range(1,11)] + ["top_5pct","top_1pct","all"]
after_tax_tbl["group"] = pd.Categorical(after_tax_tbl["group"], categories=order_groups, ordered=True)
after_tax_tbl = after_tax_tbl.sort_values("group")
after_tax_tbl.to_csv(OUTPUTS / "after_tax_income_by_decile_2024.csv", index=False)

# -----------------------------------------------------------------------------------
# 2) VAT distribution by factor income (equivalized deciles + Top 5% / Top 1% / All)
# -----------------------------------------------------------------------------------
# We'll use equivalized income deciles to mirror Step 04’s distributional lens.
df = vr.add_weighted_deciles(df, income_col="equiv_income", weight_col="weight", label="equiv_decile")
df["equiv_decile"] = df["equiv_decile"].astype(int)

baseline_tax = (df["fed_income_tax"] + df["ca_income_tax"])
reform_tax   = -df["rebate_after_phaseout"]  # rebate is negative tax burden

df["wages_total"]        = df["employment_income"]
df["capital_total"]      = df["household_agi"] - df["employment_income"]
df["capital_income_pos"] = df["capital_total"].clip(lower=0)
df["capital_losses"]     = (-df["capital_total"].clip(upper=0))

def agg_block(mask: pd.Series, label: str):
    g = mask
    w = df.loc[g, "weight"]
    return dict(
        group=label,
        pop_weight=float(w.sum()),
        total_wages=float((df.loc[g, "wages_total"]         * w).sum()),
        total_capital=float((df.loc[g, "capital_total"]      * w).sum()),
        total_capital_pos=float((df.loc[g, "capital_income_pos"] * w).sum()),
        total_capital_losses=float((df.loc[g, "capital_losses"]  * w).sum()),
        total_change=float(((reform_tax.loc[g] - baseline_tax.loc[g]) * w).sum()),
    )

rows_vk = []
# Deciles
for d in range(1, 11):
    rows_vk.append(agg_block(df["equiv_decile"].eq(d), f"decile_{d}"))
# Top groups
rows_vk.append(agg_block(df["top_5pct"].eq(1), "top_5pct"))
rows_vk.append(agg_block(df["top_1pct"].eq(1), "top_1pct"))
# All
rows_vk.append(agg_block(pd.Series(True, index=df.index), "all"))

vk = pd.DataFrame(rows_vk)
vk["group"] = pd.Categorical(vk["group"], categories=order_groups, ordered=True)
vk = vk.sort_values("group")

# Shares of signed change & of relief (optional, helpful diagnostics)
dec_mask = vk["group"].str.startswith("decile_")
signed_total = float(vk.loc[dec_mask, "total_change"].sum())
vk["share_of_total_change_signed_%"] = np.where(
    vk["group"].str.startswith("decile_"),
    100.0 * vk["total_change"] / signed_total if signed_total != 0 else np.nan,
    np.nan
)
relief_only  = (-vk.loc[dec_mask, "total_change"]).clip(lower=0)
relief_total = float(relief_only.sum())
vk["share_of_relief_%"] = np.where(
    vk["group"].str.startswith("decile_"),
    100.0 * (-vk["total_change"]).clip(lower=0) / relief_total if relief_total != 0 else np.nan,
    np.nan
)
vk.to_csv(OUTPUTS / "distribution_wage_capital_2024.csv", index=False)

# -----------------------------------------------------------------------------------
# 3) MTRs (+$1)
# -----------------------------------------------------------------------------------
# (A) Rebate-only MTR (exact; mirrors Step 03 but by AGI decile here)
base_rebate = df["rebate_after_phaseout"].astype(float).copy()
plus = df.copy()
has_wages = plus["employment_income"].fillna(0) > 0
plus.loc[has_wages, "household_agi"] = plus.loc[has_wages, "household_agi"] + 1.0  # +$1 in AGI via wages
plus["consumption_allowance"] = df["consumption_allowance"]  # allowance doesn't change
plus = vr.apply_phaseout(plus)
d_rebate = plus["rebate_after_phaseout"].astype(float) - base_rebate
mtr_rebate_only = -d_rebate  # clawback

rows_mtr_r = []
for d in range(1, 11):
    g = df["agi_decile"].eq(d)
    rows_mtr_r.append(dict(
        year=2024, group=f"decile_{d}",
        mtr_population_weighted=wmean(mtr_rebate_only[g], df.loc[g, "weight"]),
        note="rebate_only_exact",
    ))
rows_mtr_r.append(dict(
    year=2024, group="all",
    mtr_population_weighted=wmean(mtr_rebate_only, df["weight"]),
    note="rebate_only_exact",
))
mtr_rebate_tbl = pd.DataFrame(rows_mtr_r)
mtr_rebate_tbl["group"] = pd.Categorical(mtr_rebate_tbl["group"], categories=[f"decile_{i}" for i in range(1,11)] + ["all"], ordered=True)
mtr_rebate_tbl = mtr_rebate_tbl.sort_values("group")
mtr_rebate_tbl.to_csv(OUTPUTS / "mtr_rebate_only_by_decile_2024.csv", index=False)

# (B) Baseline income-tax MTR (EXACT +$1 via PolicyEngine with person-level mapping)
tax_base_sum = (df["fed_income_tax"] + df["ca_income_tax"]).reset_index(drop=True)
df_bump = df.copy()
df_bump.loc[df_bump["employment_income"].fillna(0) > 0, "employment_income"] += 1.0

tax_plus_df = bt.run_baseline_taxes_2024(df_bump, columns_yaml_path=COLMAP_PATH)
if not {"fed_income_tax","ca_income_tax"}.issubset(set(tax_plus_df.columns)):
    raise KeyError("baseline_taxes.run_baseline_taxes_2024 must return fed_income_tax and ca_income_tax.")

tax_plus_sum = (tax_plus_df["fed_income_tax"].astype(float) + tax_plus_df["ca_income_tax"].astype(float)).reset_index(drop=True)
mtr_exact = (tax_plus_sum - tax_base_sum).astype(float)

rows_mtr_b = []
for d in range(1, 11):
    g = df["agi_decile"].eq(d)
    rows_mtr_b.append(dict(
        year=2024, group=f"decile_{d}",
        mtr_population_weighted=wmean(mtr_exact[g], df.loc[g, "weight"]),
        note="baseline_income_tax_exact",
    ))
rows_mtr_b.append(dict(
    year=2024, group="all",
    mtr_population_weighted=wmean(mtr_exact, df["weight"]),
    note="baseline_income_tax_exact",
))
mtr_income_exact_tbl = pd.DataFrame(rows_mtr_b)
mtr_income_exact_tbl["group"] = pd.Categorical(mtr_income_exact_tbl["group"], categories=[f"decile_{i}" for i in range(1,11)] + ["all"], ordered=True)
mtr_income_exact_tbl = mtr_income_exact_tbl.sort_values("group")
mtr_income_exact_tbl.to_csv(OUTPUTS / "mtr_baseline_income_taxes_by_decile_2024.csv", index=False)

# -----------------------------------------------------------------------------------
# 4) Consistency checks (same rigor/style as 01–04)
# -----------------------------------------------------------------------------------
print("[checks] Reform burden = -rebate; Reform ATI = AGI + rebate.")

# Decile population reconciliation (equivalized deciles for VAT table)
assert np.isclose(float(vk.loc[vk["group"].str.startswith("decile_"), "pop_weight"].sum()),
                  float(df["weight"].sum()), rtol=1e-8), "Decile pop sum ≠ statewide pop"

# Wages/capital reconciliation (decile sums == statewide total)
w_dec = float(vk.loc[vk["group"].str.startswith("decile_"), "total_wages"].sum())
c_dec = float(vk.loc[vk["group"].str.startswith("decile_"), "total_capital"].sum())
w_all = float((df["employment_income"] * df["weight"]).sum())
c_all = float(((df["household_agi"] - df["employment_income"]) * df["weight"]).sum())
assert np.isclose(w_dec, w_all, rtol=1e-10), "Decile wages totals ≠ statewide wages total"
assert np.isclose(c_dec, c_all,  rtol=1e-10), "Decile capital totals ≠ statewide capital total"

# Decile total_change equals statewide change
state_change = float(((reform_tax - baseline_tax) * df["weight"]).sum())
decile_sum_change = float(vk.loc[vk["group"].str.startswith("decile_"), "total_change"].sum())
assert np.isclose(decile_sum_change, state_change, atol=1.0), "Decile total_change ≠ statewide change"

# CA income tax heuristic (~$130B). Your Step-04 prints were ~ $119B; keep 10% tolerance.
TOL_REL = 0.10
ca_total = float((df["ca_income_tax"] * df["weight"]).sum())
print(f"[diag] Weighted CA income tax baseline ≈ ${ca_total:,.0f}")
assert np.isclose(ca_total, 130e9, rtol=TOL_REL), "CA income tax total not ≈ $130B (adjust tolerance if your year differs)."

# Optional FED diagnostic
fed_total = float((df["fed_income_tax"] * df["weight"]).sum())
print(f"[diag] Weighted FED income tax baseline ≈ ${fed_total:,.0f}")

# Zero-wage sanity on MTRs
zero_w = pd.to_numeric(df["employment_income"], errors="coerce").fillna(0) <= 0
assert abs(float(mtr_exact[zero_w].mean())) < 1e-3, "Zero-wage HHs should have ~0 baseline MTR"

print("✅ Consistency checks passed.")

# -----------------------------------------------------------------------------------
# 5) Summary (md/html)
# -----------------------------------------------------------------------------------
lines = []
lines.append("# 010 Distributional Enhancements — 2024\n")
lines.append("*Reform modeled as: **no income tax + VAT rebate (phase-out)**; taxes shown are net of credits.\n*")

lines.append("\n## After-tax income (baseline → reform)")
for _, r in after_tax_tbl.sort_values("group").iterrows():
    lines.append(f"- {r['group']}: ${r['after_tax_income_baseline']:,.0f} → ${r['after_tax_income_reform']:,.0f} ({r['percent_change']:.1f}%)")

lines.append("\n## Baseline MTRs on AGI (+$1 wages; exact)")
for _, r in mtr_income_exact_tbl.sort_values("group").iterrows():
    lines.append(f"- {r['group']}: {r['mtr_population_weighted']:.3f}")

lines.append("\n## Rebate-only MTRs (+$1 AGI clawback; exact)")
for _, r in mtr_rebate_tbl.sort_values("group").iterrows():
    lines.append(f"- {r['group']}: {r['mtr_population_weighted']:.3f}")

lines.append("\n## Factor incomes by equivalized decile (totals)")
for _, r in vk.sort_values("group").iterrows():
    if str(r["group"]).startswith("decile_"):
        lines.append(
            f"- {r['group']}: wages ${r['total_wages']:,.0f}, capital ${r['total_capital']:,.0f} "
            f"(pos ${r['total_capital_pos']:,.0f}, losses ${r['total_capital_losses']:,.0f})"
        )
lines.append("\n*Additional rows:* Top 5%, Top 1%, and All are included in the CSV for reference.")

summary_md = "\n".join(lines)
(OUTPUTS / "010_summary_2024.md").write_text(summary_md, encoding="utf-8")
(OUTPUTS / "010_summary_2024.html").write_text(summary_md.replace("\n", "<br/>\n"), encoding="utf-8")

print("Wrote:")
for fn in [
    "after_tax_income_by_decile_2024.csv",
    "distribution_wage_capital_2024.csv",
    "mtr_rebate_only_by_decile_2024.csv",
    "mtr_baseline_income_taxes_by_decile_2024.csv",
    "010_summary_2024.md",
    "010_summary_2024.html",
]:
    print(" -", OUTPUTS / fn)


[info] loaded: c:\Users\Ali.Melad\Dropbox\Ali Work\Kyle\California VAT\policy_engile_cali_v2\policy\vat_rebate.py
[info] baseline_taxes module: c:\Users\Ali.Melad\Dropbox\Ali Work\Kyle\California VAT\policy_engile_cali_v2\policy\baseline_taxes.py
[info] panel: ..\intermediate\ca_panel_2024.csv shape=(1747, 15)
[diag] Weighted CA households (post Step-01 deflator): 14,431,591
[checks] Reform burden = -rebate; Reform ATI = AGI + rebate.
[diag] Weighted CA income tax baseline ≈ $119,102,461,409
[diag] Weighted FED income tax baseline ≈ $326,501,444,146
✅ Consistency checks passed.
Wrote:
 - ..\outputs\vat\after_tax_income_by_decile_2024.csv
 - ..\outputs\vat\distribution_wage_capital_2024.csv
 - ..\outputs\vat\mtr_rebate_only_by_decile_2024.csv
 - ..\outputs\vat\mtr_baseline_income_taxes_by_decile_2024.csv
 - ..\outputs\vat\010_summary_2024.md
 - ..\outputs\vat\010_summary_2024.html
