## Import Needed Libraries

* `numpy`: used for numerical arrays, inear algebra, and vectorized operations
* `pandas`: used for tabular data manipulation
* `arviz`: used for posterior analysis and diagnostics. Provides the InferenceData container that PyMC returns.

In [None]:
import numpy as np
import pandas as pd
from typing import Dict, List, Tuple

import pymc as pm
import arviz as az

In [None]:
def fit_bayesian_ecological_model(
    vtd_units: pd.DataFrame,
    cd_col: str,
    dem_share_2p_col: str,
    pop_col: str,
    race_cols: List[str],
    draws: int,
    tune: int,
    chains: int,
    target_accept: float,
    random_seed: int,
) -> Tuple[pm.Model, az.InferenceData, List[str], pd.Index]:
    df = vtd_units.copy().dropna(subset=[dem_share_2p_col])

    cd_codes, cd_index = pd.factorize(df[cd_col])
    X = df[race_cols].values.astype(float)          # (N,R)
    y = df[dem_share_2p_col].values.astype(float)   # (N,)
    w = df[pop_col].values.astype(float)            # (N,)

    D = int(np.unique(cd_codes).size)
    N, R = X.shape

    with pm.Model() as model:
        # Hyperpriors on logit scale (BDA3-style regularization)
        mu = pm.Normal("mu", mu=0.0, sigma=1.5, shape=R)
        tau = pm.HalfNormal("tau", sigma=1.0, shape=R)

        eta = pm.Normal("eta", mu=mu, sigma=tau, shape=(D, R))
        theta = pm.Deterministic("theta", pm.math.sigmoid(eta))

        mu_y = (theta[cd_codes] * X).sum(axis=1)

        sigma = pm.HalfNormal("sigma", sigma=0.08)
        sigma_i = sigma / pm.math.sqrt(pm.math.maximum(w, 1.0))

        pm.Normal("y_obs", mu=mu_y, sigma=sigma_i, observed=y)

        idata = pm.sample(
            draws=draws,
            tune=tune,
            chains=chains,
            target_accept=target_accept,
            random_seed=random_seed,
        )

        ppc = pm.sample_posterior_predictive(idata, var_names=["y_obs"], random_seed=random_seed)
        idata.extend(ppc)

    return model, idata, race_cols, cd_index

This function takes posterior draws of $\theta$ and turns them into
<ol>
<li>per congressional district Democratic win probabilities under an enacted congressional district's racial composition</li>
<li>plan-wide summary probabilities related to Gingles prong 3 style racial polarization metrics</li>
</ol>

In [None]:
def compute_gingles_posteriors(
    idata: az.InferenceData,
    race_cols: List[str],
    cd_index: pd.Index,
    cd_enacted_df: pd.DataFrame,
    minority: str = "black",
    coalition: bool = False,
    delta: float = 0.15,
    cohesion_thresh: float = 0.6,
) -> Tuple[pd.DataFrame, Dict[str, float]]:
    theta = idata.posterior["theta"]  # chain, draw, D, R
    theta_s = theta.stack(sample=("chain", "draw")).values  # (S, D, R)

    race_to_j = {r: j for j, r in enumerate(race_cols)}
    j_white = race_to_j["p_white"]
    if coalition:
        j_min1 = race_to_j["p_black"]
        j_min2 = race_to_j["p_latino"]
    else:
        j_min = race_to_j[f"p_{minority}"]

    # Align CD compositions to the same order as factorization
    cd = cd_enacted_df.copy().set_index("cd").loc[cd_index]
    M = cd[race_cols].values.astype(float)  # (D, R)

    # Predicted district Dem share for each posterior sample
    # (S,D,R) @ (R,D) -> (S,D)
    Yhat = np.einsum("sdr,Dr->sD", theta_s, M)

    win_prob = (Yhat > 0.5).mean(axis=0)  # (D,)
    cd_out = cd.reset_index()[["cd"]].copy()
    cd_out["win_prob_dem"] = win_prob
    cd_out["minority_share"] = cd["minority_share"].values

    # Plan-wide Gingles #3 summaries
    if coalition:
        theta_min = 0.5 * (theta_s[:, :, j_min1] + theta_s[:, :, j_min2])
    else:
        theta_min = theta_s[:, :, j_min]
    theta_white = theta_s[:, :, j_white]

    gingles3 = {
        "P(minority cohesion)": float((theta_min > cohesion_thresh).mean()),
        "P(white bloc voting)": float((theta_white < 0.5).mean()),
        f"P(polarization gap > {delta})": float(((theta_min - theta_white) > delta).mean()),
    }
    return cd_out, gingles3


## Summarizing Convergence/Efficiency Diagnostics

In [None]:
def diagnostics_summary(idata: az.InferenceData) -> Dict[str, float]:
    rhat = az.rhat(idata).to_array().values
    ess = az.ess(idata).to_array().values
    return {
        "rhat_mean": float(np.nanmean(rhat)),
        "ess_mean": float(np.nanmean(ess)),
    }
