# The Calibration Matrix

The calibration pipeline has three stages: (1) compute uprated target values ([`hierarchical_uprating.ipynb`](hierarchical_uprating.ipynb)), (2) assemble the sparse constraint matrix (this notebook), and (3) optimize weights ([`fit_calibration_weights.py`](../policyengine_us_data/datasets/cps/local_area_calibration/fit_calibration_weights.py)). This notebook is the diagnostic checkpoint between stages 1 and 2 — understand your matrix before you optimize.

We build the full calibration matrix using `SparseMatrixBuilder`, then use `MatrixTracer` to inspect its structure: what rows and columns represent, how target groups partition the loss function, and where sparsity patterns emerge.

**Requirements:** `policy_data.db` and the stratified CPS h5 file in `STORAGE_FOLDER`.

## 1. Setup

In [None]:
import numpy as np
import pandas as pd
from policyengine_us import Microsimulation
from policyengine_us_data.storage import STORAGE_FOLDER
from policyengine_us_data.datasets.cps.local_area_calibration.sparse_matrix_builder import (
    SparseMatrixBuilder,
)
from policyengine_us_data.datasets.cps.local_area_calibration.calibration_utils import (
    get_all_cds_from_database,
    create_target_groups,
    STATE_CODES,
)
from policyengine_us_data.datasets.cps.local_area_calibration.matrix_tracer import (
    MatrixTracer,
)

db_path = STORAGE_FOLDER / "calibration" / "policy_data.db"
db_uri = f"sqlite:///{db_path}"
dataset_path = STORAGE_FOLDER / "stratified_extended_cps_2024.h5"

In [None]:
sim = Microsimulation(dataset=str(dataset_path))
cds_to_calibrate = get_all_cds_from_database(db_uri)

builder = SparseMatrixBuilder(
    db_uri=db_uri,
    time_period=2024,
    cds_to_calibrate=cds_to_calibrate,
    dataset_path=str(dataset_path),
)

targets_df, X_sparse, household_id_mapping = builder.build_matrix(
    sim,
    target_filter={"domain_variables": ["aca_ptc", "snap"]},
    hierarchical_domains=["aca_ptc", "snap"],
)

tracer = MatrixTracer(
    targets_df, X_sparse, household_id_mapping, cds_to_calibrate, sim
)

print(f"Matrix shape: {X_sparse.shape}")
print(f"Non-zero entries: {X_sparse.nnz:,}")

## 2. Matrix overview

In [None]:
tracer.print_matrix_structure()

## 3. Anatomy of a row

Each row is one calibration target — a known aggregate (dollar total, household count, person count) that the optimizer tries to match. The row vector's non-zero entries identify which (household, CD) pairs can contribute to that target.

In [None]:
mid_row = X_sparse.shape[0] // 2
row_info = tracer.get_row_info(mid_row)
print(f"Row {mid_row}:")
for k, v in row_info.items():
    print(f"  {k}: {v}")

In [None]:
row_vec = X_sparse[mid_row, :]
nz_cols = row_vec.nonzero()[1]
print(f"Row {mid_row} has {len(nz_cols):,} non-zero columns")

if len(nz_cols) > 0:
    first_col = tracer.get_column_info(nz_cols[0])
    last_col = tracer.get_column_info(nz_cols[-1])
    print(f"\nFirst non-zero column ({nz_cols[0]}):")
    for k, v in first_col.items():
        print(f"  {k}: {v}")
    print(f"\nLast non-zero column ({nz_cols[-1]}):")
    for k, v in last_col.items():
        print(f"  {k}: {v}")

    unique_cds = set(
        tracer.get_column_info(c)["cd_geoid"] for c in nz_cols
    )
    print(f"\nSpans {len(unique_cds)} CD(s)")

## 4. Anatomy of a column

Each column represents one (household, CD) pair. The columns are organized in blocks: the first `n_households` columns belong to CD 1, the next to CD 2, and so on. The block formula is:

$$\text{column\_idx} = \text{cd\_block} \times n_{\text{households}} + \text{hh\_index}$$

In [None]:
col_idx = tracer.n_households * 5 + 42
col_info = tracer.get_column_info(col_idx)
print(f"Column {col_idx}:")
for k, v in col_info.items():
    print(f"  {k}: {v}")

col_vec = X_sparse[:, col_idx]
nz_rows = col_vec.nonzero()[0]
print(f"\nThis column has non-zero values in {len(nz_rows)} target rows")
if len(nz_rows) > 0:
    print("First 5 target rows:")
    for r in nz_rows[:5]:
        ri = tracer.get_row_info(r)
        print(
            f"  row {r}: {ri['variable']} "
            f"(geo={ri['geographic_id']}, "
            f"val={X_sparse[r, col_idx]:.2f})"
        )

In [None]:
expected_col = 5 * tracer.n_households + 42
assert col_idx == expected_col, f"{col_idx} != {expected_col}"
print(
    f"Block formula verified: "
    f"cd_block=5 * n_hh={tracer.n_households} + hh_idx=42 = {expected_col}"
)

## 5. Target groups and loss weighting

Target groups partition the rows by (domain, variable, geographic level). Each group contributes equally to the loss function, so 436 district-level rows don't drown out 1 national row. The group IDs depend on the current database contents and may change if targets are added or removed.

In [None]:
target_groups, group_info = create_target_groups(targets_df)

records = []
for gid, info in enumerate(group_info):
    mask = target_groups == gid
    vals = targets_df.loc[mask, "value"]
    records.append({
        "group_id": gid,
        "description": info,
        "n_targets": mask.sum(),
        "min_value": vals.min(),
        "median_value": vals.median(),
        "max_value": vals.max(),
    })

group_df = pd.DataFrame(records)
print(group_df.to_string(index=False))

In [None]:
for gid in [0, 2, 4]:
    if gid >= len(group_info):
        continue
    rows = tracer.get_group_rows(gid)
    print(f"\n--- {group_info[gid]} ---")
    print(rows.head(8).to_string(index=False))

## 6. Tracing a household

One CPS household appears in every CD block (once per CD = 436 column positions). But most of those columns are zero — the household only contributes where its characteristics match the target constraints.

In [None]:
snap_values = sim.calculate("snap", map_to="household").values
hh_ids = sim.calculate("household_id", map_to="household").values
positive_snap = hh_ids[snap_values > 0]
example_hh = int(positive_snap[0])
print(f"Example SNAP-receiving household: {example_hh}")
print(f"SNAP value: ${snap_values[hh_ids == example_hh][0]:,.0f}")

positions = tracer.get_household_column_positions(example_hh)
print(f"Column positions across CDs: {len(positions)}")

In [None]:
cd_activity = []
for cd_geoid, col_pos in positions.items():
    col_vec = X_sparse[:, col_pos]
    nnz = col_vec.nnz
    cd_activity.append({"cd_geoid": cd_geoid, "col_pos": col_pos, "nnz": nnz})

cd_df = pd.DataFrame(cd_activity)
n_active = (cd_df["nnz"] > 0).sum()
n_zero = (cd_df["nnz"] == 0).sum()
print(f"CDs with non-zero entries: {n_active}")
print(f"CDs with all-zero columns: {n_zero}")

top10 = cd_df.nlargest(10, "nnz")
print(f"\nTop 10 CDs by activity for household {example_hh}:")
for _, r in top10.iterrows():
    state_fips = int(r["cd_geoid"]) // 100
    abbr = STATE_CODES.get(state_fips, "??")
    print(f"  CD {r['cd_geoid']} ({abbr}): {r['nnz']} non-zero rows")

## 7. Sparsity analysis

In [None]:
total_cells = X_sparse.shape[0] * X_sparse.shape[1]
density = X_sparse.nnz / total_cells
print(f"Total cells: {total_cells:,}")
print(f"Non-zero entries: {X_sparse.nnz:,}")
print(f"Density: {density:.6f}")
print(f"Sparsity: {1 - density:.4%}")

In [None]:
nnz_per_row = np.diff(X_sparse.indptr)
print(f"Non-zeros per row:")
print(f"  min:    {nnz_per_row.min():,}")
print(f"  median: {int(np.median(nnz_per_row)):,}")
print(f"  mean:   {nnz_per_row.mean():,.0f}")
print(f"  max:    {nnz_per_row.max():,}")

from policyengine_us_data.datasets.cps.local_area_calibration.calibration_utils import (
    _get_geo_level,
)

geo_levels = targets_df["geographic_id"].apply(_get_geo_level)
level_names = {0: "National", 1: "State", 2: "District"}
print("\nBy geographic level:")
for level in [0, 1, 2]:
    mask = (geo_levels == level).values
    if mask.any():
        vals = nnz_per_row[mask]
        print(
            f"  {level_names[level]:10s}: "
            f"n={mask.sum():>4d}, "
            f"median nnz={int(np.median(vals)):>7,}, "
            f"range=[{vals.min():,}, {vals.max():,}]"
        )

In [None]:
n_hh = tracer.n_households
n_cds = tracer.n_geographies
cd_nnz = []
for cd_idx in range(n_cds):
    block = X_sparse[:, cd_idx * n_hh : (cd_idx + 1) * n_hh]
    cd_nnz.append({
        "cd_geoid": cds_to_calibrate[cd_idx],
        "nnz": block.nnz,
    })

cd_nnz_df = pd.DataFrame(cd_nnz)
print(f"Non-zeros per CD block:")
print(f"  min:    {cd_nnz_df['nnz'].min():,} (CD {cd_nnz_df.loc[cd_nnz_df['nnz'].idxmin(), 'cd_geoid']})")
print(f"  median: {int(cd_nnz_df['nnz'].median()):,}")
print(f"  max:    {cd_nnz_df['nnz'].max():,} (CD {cd_nnz_df.loc[cd_nnz_df['nnz'].idxmax(), 'cd_geoid']})")

## 8. Group exclusion

`GROUPS_TO_EXCLUDE` removes redundant or harmful constraints before training. For example, state-level SNAP household counts (Group 1) are redundant with reconciled district rows (Group 4) and can confuse the optimizer. Group IDs depend on database contents, so always check `print_matrix_structure()` output first.

In [None]:
GROUPS_TO_EXCLUDE = [1]

print(f"Before exclusion: {X_sparse.shape[0]} rows")

keep_mask = ~np.isin(tracer.target_groups, GROUPS_TO_EXCLUDE)
n_dropped = (~keep_mask).sum()
print(f"Excluding groups {GROUPS_TO_EXCLUDE}: dropping {n_dropped} rows")

X_filtered = X_sparse[keep_mask, :]
targets_filtered = targets_df[keep_mask].reset_index(drop=True)
print(f"After exclusion: {X_filtered.shape[0]} rows")

In [None]:
remaining_groups, remaining_info = create_target_groups(targets_filtered)
print(f"\nRemaining groups ({len(remaining_info)}):")
for info in remaining_info:
    print(f"  {info}")

## 9. Achievable targets

A target is achievable if at least one household can contribute to it (row sum > 0). Rows with sum = 0 are impossible constraints that the optimizer cannot satisfy.

In [None]:
row_sums = np.array(X_filtered.sum(axis=1)).flatten()
achievable_mask = row_sums > 0
n_achievable = achievable_mask.sum()
n_impossible = (~achievable_mask).sum()

print(f"Achievable targets: {n_achievable}")
print(f"Impossible targets: {n_impossible}")

if n_impossible > 0:
    impossible = targets_filtered[~achievable_mask]
    print("\nImpossible targets:")
    for _, r in impossible.iterrows():
        print(
            f"  {r.get('domain_variable', '?')}/{r['variable']} "
            f"(geo={r['geographic_id']})"
        )

In [None]:
ratios = row_sums[achievable_mask] / targets_filtered.loc[achievable_mask, "value"].values
ratio_df = targets_filtered[achievable_mask].copy()
ratio_df["row_sum"] = row_sums[achievable_mask]
ratio_df["ratio"] = ratios

hardest = ratio_df.nsmallest(5, "ratio")
print("Hardest targets (lowest row_sum / target_value ratio):")
for _, r in hardest.iterrows():
    print(
        f"  {r.get('domain_variable', '?')}/{r['variable']} "
        f"(geo={r['geographic_id']}): "
        f"ratio={r['ratio']:.4f}, "
        f"row_sum={r['row_sum']:,.0f}, "
        f"target={r['value']:,.0f}"
    )

In [None]:
X_final = X_filtered[achievable_mask, :]
print(f"Final matrix shape: {X_final.shape}")
print(f"Final non-zero entries: {X_final.nnz:,}")
print(f"Final density: {X_final.nnz / (X_final.shape[0] * X_final.shape[1]):.6f}")
print("\nThis is what the optimizer receives.")

## Summary

The calibration matrix pipeline has five steps:

1. **Build** — `SparseMatrixBuilder.build_matrix()` queries targets, applies hierarchical uprating, evaluates constraints, and assembles the sparse CSR matrix.
2. **Read** — `MatrixTracer` decodes rows (targets) and columns (household-CD pairs) so you can verify the matrix makes sense.
3. **Groups** — `create_target_groups()` partitions rows for balanced loss weighting. `GROUPS_TO_EXCLUDE` drops redundant constraints.
4. **Sparsity** — Most of the matrix is zero. District-level targets confine non-zeros to single CD blocks; national targets span all blocks.
5. **Filter** — Remove impossible targets (row sum = 0) before handing to the optimizer.

When adding new domains or variables to the calibration, re-run this notebook to verify the new targets appear correctly and don't introduce impossible constraints.