# Legislative dashboard precomputations

This notebook consolidates the preprocessing required for the interactive dashboard. It reads the
canonical tables produced by the ETL pipeline, derives descriptive features, and serialises the
results in a `precomp_outputs` dictionary consumed by the front-end.

In [None]:
import json
import math
import warnings
from collections import Counter, defaultdict
from pathlib import Path

import numpy as np
import pandas as pd

try:
    import geopandas as gpd
except Exception:
    gpd = None

pd.options.display.float_format = "{:,.3f}".format
warnings.filterwarnings("ignore", category=pd.errors.SettingWithCopyWarning)

In [None]:
DATA_DIR = Path("ca_leg/legislation_data")
CALACCESS_DIR = Path("calaccess")
MAP_DIR = Path("dashboard/backend/data")
LABELS_PATH = Path("bill_labels_updated.json")


def read_csv_safe(path: Path, **kwargs) -> pd.DataFrame:
    if not path.exists():
        warnings.warn(f"Missing data file: {path}")
        return pd.DataFrame()
    return pd.read_csv(path, **kwargs)


bill_history = read_csv_safe(DATA_DIR / "bill_history_tbl.csv",
                             dtype={"action_status": str, "primary_location": str,
                                    "secondary_location": str, "end_status": str})
bill_history.rename(columns={c: c.lower() for c in bill_history.columns}, inplace=True)

history = read_csv_safe(DATA_DIR / "history.csv")
history.rename(columns={c: c.lower() for c in history.columns}, inplace=True)

bill_versions = read_csv_safe(DATA_DIR / "bill_versions.csv")
bill_versions.rename(columns={c: c.lower() for c in bill_versions.columns}, inplace=True)

bill_summary_votes = read_csv_safe(DATA_DIR / "bill_summary_vote_tbl.csv")
bill_summary_votes.rename(columns={c: c.lower() for c in bill_summary_votes.columns}, inplace=True)

bill_votes = read_csv_safe(DATA_DIR / "bill_detail_vote_tbl.csv", parse_dates=["session_date", "vote_date_time"])
bill_votes.rename(columns={c: c.lower() for c in bill_votes.columns}, inplace=True)

bill_motions = read_csv_safe(DATA_DIR / "bill_motion_tbl.csv")
bill_motions.rename(columns={c: c.lower() for c in bill_motions.columns}, inplace=True)

committee_codes = read_csv_safe(DATA_DIR / "committee_codes.csv")
committee_codes.rename(columns={c: c.lower() for c in committee_codes.columns}, inplace=True)

committee_hearings = read_csv_safe(DATA_DIR / "committee_hearing_tbl.csv")
committee_hearings.rename(columns={c: c.lower() for c in committee_hearings.columns}, inplace=True)

authors = read_csv_safe(DATA_DIR / "authors.csv")
authors.rename(columns={c: c.lower() for c in authors.columns}, inplace=True)

digests = read_csv_safe(DATA_DIR / "digest.csv")
digests.rename(columns={c: c.lower() for c in digests.columns}, inplace=True)

politicians = read_csv_safe(DATA_DIR / "politicians.csv")
politicians.rename(columns={c: c.lower() for c in politicians.columns}, inplace=True)

lobbying = read_csv_safe(CALACCESS_DIR / "lobbying_clean2.csv",
                         dtype={"PAYEE_NAMS": str, "BAKREF_TID": str})
expend_assembly = read_csv_safe(CALACCESS_DIR / "expend_assembly_matched.csv",
                                dtype={"TargetPropositionName": str})
expend_senate = read_csv_safe(CALACCESS_DIR / "expend_senate_matched.csv",
                              dtype={"TargetPropositionName": str})

with LABELS_PATH.open() as fp:
    bill_topics = json.load(fp)

In [None]:
# --- Canonical identifiers ---------------------------------------------------------------------

bill_versions = bill_versions.assign(
    version_id=bill_versions.get("id", bill_versions.get("ID")),
    bill_id_canonical=bill_versions.groupby("bill_id")["bill_id"].transform("first"),
)

version_to_bill = bill_versions.set_index("bill_id").get("bill_id_canonical", pd.Series(dtype=str)).to_dict()
if not version_to_bill:
    version_to_bill = bill_versions.set_index("bill_id")

for frame in [history, bill_history, bill_votes, bill_summary_votes, committee_hearings, digests]:
    if frame.empty:
        continue
    if "bill_id" in frame.columns:
        frame["bill_id"] = frame["bill_id"].map(version_to_bill).fillna(frame["bill_id"])

In [None]:
# --- Date harmonisation ------------------------------------------------------------------------

for df, cols in [
    (history, ["date", "action_date", "actiondate"]),
    (bill_history, ["action_date", "date"]),
    (bill_votes, ["vote_date_time", "vote_date", "session_date"]),
    (bill_summary_votes, ["vote_date", "session_date"]),
    (committee_hearings, ["hearing_date"]),
]:
    if df.empty:
        continue
    for col in cols:
        if col in df.columns:
            df[col] = pd.to_datetime(df[col], errors="coerce")

if not bill_votes.empty:
    bill_votes["term"] = bill_votes["vote_date_time"].apply(
        lambda ts: f"{ts.year}-{ts.year + 1}" if ts.year % 2 else
        (f"{ts.year - 1}-{ts.year}" if ts.month < 11 else f"{ts.year + 1}-{ts.year + 2}")
    )

In [None]:
# --- Helpers -----------------------------------------------------------------------------------

def canonical_topic(bill_id):
    return bill_topics.get(str(bill_id))


def infer_origin_chamber(bill_id):
    bill_id = str(bill_id or "")
    if bill_id.startswith("AB"):
        return "Assembly"
    if bill_id.startswith("SB"):
        return "Senate"
    return None


def weeks_between(start, end):
    if pd.isna(start) or pd.isna(end):
        return np.nan
    return (pd.to_datetime(end) - pd.to_datetime(start)).days / 7.0


def infer_term_from_date(ts):
    ts = pd.to_datetime(ts, errors='coerce')
    if pd.isna(ts):
        return None
    year = ts.year
    if year % 2 == 1:
        return f"{year}-{year + 1}"
    return f"{year - 1}-{year}" if ts.month < 11 else f"{year + 1}-{year + 2}"


VOTE_MAP = {"AYE": 1, "YES": 1, "NO": -1, "NOE": -1}


In [None]:
# --- Roll call aggregation ---------------------------------------------------------------------

votes = bill_votes.copy()
if not votes.empty:
    votes["vote_value"] = votes["vote_code"].str.upper().map(VOTE_MAP).fillna(0).astype(int)
    votes["date"] = votes["vote_date_time"].dt.date
    votes["is_floor"] = votes["location_code"].isin(["AFLOOR", "SFLOOR"])
    votes["is_committee"] = ~votes["is_floor"]
else:
    votes = pd.DataFrame(columns=["bill_id", "vote_value", "date", "is_floor", "is_committee", "motion_id", "term", "location_code"])

roll_group_cols = ["bill_id", "date", "motion_id", "location_code", "is_floor", "is_committee", "term"]
roll = (votes
        .groupby(roll_group_cols, dropna=False)
        .agg(yes=("vote_value", lambda x: int((np.array(x) > 0).sum())),
             no=("vote_value", lambda x: int((np.array(x) < 0).sum())),
             total=("vote_value", "count"))
        .reset_index()) if not votes.empty else pd.DataFrame(columns=roll_group_cols + ["yes", "no", "total"])
roll["pass"] = roll.get("yes", 0) > roll.get("no", 0)

motion_lookup = bill_motions.set_index("motion_id").get("motion_text", pd.Series(dtype=str)).to_dict() if not bill_motions.empty else {}
roll["motion_text"] = roll["motion_id"].map(motion_lookup) if "motion_id" in roll.columns else None

In [None]:
# --- Stage timing -------------------------------------------------------------------------------

intro_lookup = (history.groupby("bill_id")["date"].min()
                .combine_first(bill_history.groupby("bill_id")["action_date"].min())) if not history.empty else pd.Series(dtype="datetime64[ns]")
last_action_lookup = (history.groupby("bill_id")["date"].max()
                      .combine_first(bill_history.groupby("bill_id")["action_date"].max())) if not history.empty else pd.Series(dtype="datetime64[ns]")

def _stage_dates(group: pd.DataFrame):
    g = group.sort_values("date")
    intro = intro_lookup.get(group.name, pd.NaT)
    first_committee = g.loc[g["is_committee"], "date"].min() if g.get("is_committee", pd.Series()).any() else pd.NaT
    first_floor = g.loc[g["is_floor"], "date"].min() if g.get("is_floor", pd.Series()).any() else pd.NaT

    second_committee = pd.NaT
    if pd.notna(first_floor):
        after_floor = g[(g["date"] > first_floor) & g["is_committee"]]
        if not after_floor.empty:
            second_committee = after_floor["date"].min()

    second_floor = pd.NaT
    if pd.notna(second_committee):
        after_second = g[(g["date"] > second_committee) & g["is_floor"]]
        if not after_second.empty:
            second_floor = after_second["date"].min()

    asm_pass = pd.NaT
    sen_pass = pd.NaT
    if "location_code" in g.columns:
        asm_mask = (g["location_code"] == "AFLOOR") & g["pass"]
        sen_mask = (g["location_code"] == "SFLOOR") & g["pass"]
        if asm_mask.any():
            asm_pass = g.loc[asm_mask, "date"].min()
        if sen_mask.any():
            sen_pass = g.loc[sen_mask, "date"].min()

    return pd.Series({
        "intro": intro,
        "first_committee": first_committee,
        "first_floor": first_floor,
        "second_committee": second_committee,
        "second_floor": second_floor,
        "asm_floor_pass": asm_pass,
        "sen_floor_pass": sen_pass,
    })

stages_df = roll.groupby("bill_id").apply(_stage_dates).reset_index() if not roll.empty else pd.DataFrame(columns=["bill_id", "intro"])
stages_df["topic"] = stages_df["bill_id"].map(canonical_topic)
stages_df["origin_chamber"] = stages_df["bill_id"].map(infer_origin_chamber)
stages_df = stages_df.dropna(subset=["topic"]) if not stages_df.empty else stages_df

In [None]:
# --- Outcomes -----------------------------------------------------------------------------------

OUTCOME_TERMS = {
    "CHAPTERED": 1,
    "ENROLLED": 1,
    "APPROVED": 1,
    "SIGNED": 1,
    "VETOED": -1,
    "FAILED": -1,
}

history_outcomes = history.copy()
if not history_outcomes.empty and "action" in history_outcomes.columns:
    history_outcomes["action"] = history_outcomes["action"].astype(str).str.upper()
else:
    history_outcomes = pd.DataFrame(columns=["bill_id", "action", "date"])

outcome_lookup = (history_outcomes.sort_values("date", ascending=False)
                  .groupby("bill_id")["action"].first()) if not history_outcomes.empty else pd.Series(dtype=str)

def classify_outcome(action):
    if not isinstance(action, str):
        return 0
    for key, value in OUTCOME_TERMS.items():
        if key in action:
            return value
    return 0

outcome_series = outcome_lookup.map(classify_outcome) if not outcome_lookup.empty else pd.Series(dtype=int)
y_df = stages_df[["bill_id", "topic"]].copy() if not stages_df.empty else pd.DataFrame(columns=["bill_id", "topic"])
y_df["outcome"] = y_df["bill_id"].map(outcome_series)
y_df["last_action"] = y_df["bill_id"].map(last_action_lookup)
y_df["first_action"] = y_df["bill_id"].map(intro_lookup)

In [None]:
# --- Pipeline metrics ---------------------------------------------------------------------------

stage_order = ["intro", "first_committee", "first_floor", "second_committee", "second_floor", "asm_floor_pass", "sen_floor_pass"]
stage_order = [s for s in stage_order if s in stages_df.columns]

pipe_records = []
for _, row in stages_df.iterrows():
    for idx in range(len(stage_order) - 1):
        current_stage = stage_order[idx]
        next_stage = stage_order[idx + 1]
        entered = pd.notna(row[current_stage])
        advanced = entered and pd.notna(row[next_stage])
        pipe_records.append({
            "bill_id": row["bill_id"],
            "topic": row["topic"],
            "from_stage": current_stage,
            "to_stage": next_stage,
            "entered": int(entered),
            "advanced": int(advanced),
            "days_in_stage": weeks_between(row[current_stage], row[next_stage]) * 7 if advanced else np.nan,
        })

pipeline_stage_funnel = (pd.DataFrame(pipe_records)
                         .groupby(["from_stage", "to_stage", "topic"])
                         .agg(entered=("entered", "sum"),
                              advanced=("advanced", "sum"),
                              median_days=("days_in_stage", "median"))
                         .reset_index()) if pipe_records else pd.DataFrame(columns=["from_stage", "to_stage", "topic", "entered", "advanced", "median_days"])

if not pipeline_stage_funnel.empty:
    pipeline_stage_funnel["pass_rate"] = np.where(
        pipeline_stage_funnel["entered"] > 0,
        pipeline_stage_funnel["advanced"] / pipeline_stage_funnel["entered"],
        np.nan
    )
    pipeline_stage_funnel = pipeline_stage_funnel.rename(columns={"from_stage": "from", "to_stage": "to"})

pipeline_stage_durations = (pd.DataFrame(pipe_records)
                            .dropna(subset=["days_in_stage"])
                            .groupby(["from_stage", "topic"])
                            .agg(median_days=("days_in_stage", "median"),
                                 p90_days=("days_in_stage", lambda x: float(np.nanpercentile(x, 90))))
                            .reset_index().rename(columns={"from_stage": "stage"})) if pipe_records else pd.DataFrame(columns=["stage", "topic", "median_days", "p90_days"])

stage_events = []
for stage in stage_order:
    valid = stages_df[["bill_id", "topic", stage]].dropna()
    if valid.empty:
        continue
    tmp = valid.rename(columns={stage: "date"})
    tmp["stage"] = stage
    stage_events.append(tmp)

stage_calendar = (pd.concat(stage_events)
                  .assign(week=lambda d: pd.to_datetime(d["date"]).dt.to_period("W").dt.start_time)
                  .groupby(["stage", "week"]).agg(bills=("bill_id", "nunique")).reset_index()) if stage_events else pd.DataFrame(columns=["stage", "week", "bills"])


In [None]:
# --- Route archetypes ---------------------------------------------------------------------------

hearings_clean = (committee_hearings
                  .merge(committee_codes[["committee_code", "committee_clean"]],
                         left_on="location_code", right_on="committee_code", how="left")) if not committee_hearings.empty else pd.DataFrame(columns=["bill_id", "committee_clean", "hearing_date"])

hearings_clean = hearings_clean.dropna(subset=["bill_id"]) if not hearings_clean.empty else hearings_clean
if not hearings_clean.empty and "hearing_date" in hearings_clean.columns:
    hearings_clean["hearing_date"] = pd.to_datetime(hearings_clean["hearing_date"], errors="coerce")

route_sequences = (hearings_clean.sort_values(["bill_id", "hearing_date"])
                   .groupby("bill_id")["committee_clean"]
                   .apply(lambda seq: tuple(dict.fromkeys([c for c in seq if isinstance(c, str)])))
                   .reset_index(name="route")) if not hearings_clean.empty else pd.DataFrame(columns=["bill_id", "route"])

route_sequences["route_key"] = route_sequences["route"].apply(lambda r: " > ".join(list(r)[:5]) if isinstance(r, tuple) else None)
route_sequences["topic"] = route_sequences["bill_id"].map(canonical_topic)
route_sequences = route_sequences.dropna(subset=["topic", "route_key"]) if not route_sequences.empty else route_sequences

route_archetypes = (route_sequences
                    .merge(y_df[["bill_id", "outcome"]], on="bill_id", how="left")
                    .groupby(["topic", "route_key"], as_index=False)
                    .agg(n=("bill_id", "nunique"),
                         pass_rate=("outcome", lambda x: float((np.array(x) == 1).mean()) if len(x) else np.nan))
                    .sort_values(["topic", "n"], ascending=[True, False])) if not route_sequences.empty else pd.DataFrame(columns=["topic", "route_key", "n", "pass_rate"])

In [None]:
# --- Amendment churn ---------------------------------------------------------------------------

import re

def tokenise(text: str) -> set[str]:
    tokens = re.sub(r"[^a-z0-9\s]", " ", str(text).lower())
    return set(t for t in tokens.split() if t)

bill_version_tokens = (bill_versions[["bill_id", "versionnum"]]
                       .merge(digests[["bill_id", "digesttext"]], on="bill_id", how="left")
                       .dropna(subset=["digesttext"])) if not bill_versions.empty else pd.DataFrame(columns=["bill_id", "versionnum", "digesttext"])

amendment_rows = []
for bill_id, group in bill_version_tokens.groupby("bill_id"):
    ordered = group.sort_values("versionnum")
    toks = [tokenise(text) for text in ordered["digesttext"]]
    if not toks:
        continue
    sims = [1.0]
    for prev, curr in zip(toks, toks[1:]):
        sims.append(len(prev & curr) / len(prev | curr) if prev | curr else 1.0)
    amendment_rows.append({
        "bill_id": bill_id,
        "n_versions": len(toks),
        "median_similarity": float(np.median(sims)),
        "final_similarity": sims[-1],
    })

amendment_churn = pd.DataFrame(amendment_rows)
amendment_churn["topic"] = amendment_churn["bill_id"].map(canonical_topic)
amendment_churn = amendment_churn.dropna(subset=["topic"]) if not amendment_churn.empty else amendment_churn

In [None]:
# --- Procedural risk heuristics -----------------------------------------------------------------

stage_percentiles = {}
for stage in stage_order:
    durations = []
    for _, row in stages_df.iterrows():
        idx = stage_order.index(stage)
        if idx + 1 >= len(stage_order):
            continue
        start = row.get(stage)
        end = row.get(stage_order[idx + 1])
        if pd.notna(start) and pd.notna(end):
            durations.append((pd.to_datetime(end) - pd.to_datetime(start)).days)
    if durations:
        stage_percentiles[stage] = np.nanpercentile(durations, 80)

route_pass_lookup = (route_archetypes
                     .set_index(["topic", "route_key"])['pass_rate']
                     .to_dict()) if not route_archetypes.empty else {}

sponsor_outcomes = (authors.dropna(subset=["bill_id", "name"])
                    .assign(topic=lambda d: d["bill_id"].map(canonical_topic))
                    .dropna(subset=["topic"])
                    .merge(y_df[["bill_id", "outcome"]], on="bill_id", how="left")) if not authors.empty else pd.DataFrame(columns=["bill_id", "name", "topic", "outcome"])

sponsor_success = (sponsor_outcomes.groupby(["name", "topic"])['outcome']
                   .apply(lambda s: float((np.array(s) == 1).mean()) if len(s) else np.nan)
                   .reset_index(name="success_rate")) if not sponsor_outcomes.empty else pd.DataFrame(columns=["name", "topic", "success_rate"])

risk_rows = []
now = pd.Timestamp.today().normalize()
for _, row in stages_df.iterrows():
    last_dates = row.dropna()
    last_date = pd.to_datetime(last_dates.iloc[-1]) if not last_dates.empty else pd.NaT
    days_since_last = (now - last_date).days if pd.notna(last_date) else np.nan

    route_key = None
    if not route_sequences.empty:
        rk = route_sequences.loc[route_sequences["bill_id"] == row["bill_id"], "route_key"].head(1)
        route_key = rk.iloc[0] if not rk.empty else None
    route_pass = route_pass_lookup.get((row["topic"], route_key), np.nan)

    churn = amendment_churn.loc[amendment_churn["bill_id"] == row["bill_id"], "n_versions"].max()
    churn_flag = bool(churn and churn >= 5)
    route_flag = bool(route_pass and route_pass < 0.3)

    stage_flag = False
    for stg, pctl in stage_percentiles.items():
        date_val = row.get(stg)
        if pd.notna(date_val):
            delta = (now - pd.to_datetime(date_val)).days
            if delta > pctl:
                stage_flag = True
                break

    sponsor = sponsor_outcomes.loc[sponsor_outcomes["bill_id"] == row["bill_id"], "name"].head(1)
    sponsor = sponsor.iloc[0] if not sponsor.empty else None
    sponsor_flag = False
    if sponsor is not None:
        ss = sponsor_success.loc[(sponsor_success["name"] == sponsor) & (sponsor_success["topic"] == row["topic"]), "success_rate"]
        sponsor_flag = bool((not ss.empty) and ss.iloc[0] < 0.4)

    risk_score = int(stage_flag) + int(churn_flag) + int(route_flag) + int(sponsor_flag)
    reasons = []
    if stage_flag:
        reasons.append("behind schedule")
    if churn_flag:
        reasons.append("high churn")
    if route_flag:
        reasons.append("weak route history")
    if sponsor_flag:
        reasons.append("sponsor below norm")

    risk_rows.append({
        "bill_id": row["bill_id"],
        "topic": row["topic"],
        "route_key": route_key,
        "risk_score": risk_score,
        "days_since_last": days_since_last,
        "reasons": ", ".join(reasons)
    })

risk_register = pd.DataFrame(risk_rows)

In [None]:
# --- Committee metrics -------------------------------------------------------------------------

committee_votes = roll.merge(committee_codes[["committee_code", "committee_clean"]],
                             on="location_code", how="left") if not roll.empty else pd.DataFrame(columns=["committee", "bill_id"])
if not committee_votes.empty:
    committee_votes["committee"] = committee_votes["committee_clean"].fillna(committee_votes["location_code"])

committee_gatekeeping = (committee_votes
                          .loc[committee_votes.get("is_committee", False)]
                          .groupby("committee", as_index=False)
                          .agg(bills_heard=("bill_id", "nunique"),
                               pass_through=("pass", "sum"))) if not committee_votes.empty else pd.DataFrame(columns=["committee", "bills_heard", "pass_through"])
if not committee_gatekeeping.empty:
    committee_gatekeeping["gatekeeping"] = 1 - (committee_gatekeeping["pass_through"] /
                                                 committee_gatekeeping["bills_heard"].replace(0, np.nan))

committee_workload = (committee_gatekeeping.merge(
    committee_votes.groupby("committee")["bill_id"].nunique().reset_index(name="unique_bills"),
    on="committee", how="left") if not committee_gatekeeping.empty else committee_gatekeeping)


In [None]:
# --- Cross-chamber friction --------------------------------------------------------------------

cross_chamber = stages_df[["bill_id", "topic", "asm_floor_pass", "sen_floor_pass"]].copy() if not stages_df.empty else pd.DataFrame(columns=["bill_id", "topic"])
if not cross_chamber.empty:
    cross_chamber["asm_to_sen"] = cross_chamber["asm_floor_pass"].notna() & cross_chamber["sen_floor_pass"].notna()
    cross_chamber["sen_to_asm"] = cross_chamber["sen_floor_pass"].notna() & cross_chamber["asm_floor_pass"].notna()

cross_chamber_friction = (cross_chamber.groupby("topic", as_index=False)
                          .agg(pass_asm_to_sen=("asm_to_sen", "sum"),
                               pass_sen_to_asm=("sen_to_asm", "sum"))) if not cross_chamber.empty else pd.DataFrame(columns=["topic", "pass_asm_to_sen", "pass_sen_to_asm"])

In [None]:
# --- Survival curves ----------------------------------------------------------------------------

survival_records = []
for topic, group in y_df.groupby("topic"):
    starts = pd.to_datetime(group["first_action"])
    ends = pd.to_datetime(group["last_action"])
    if starts.isna().all():
        continue
    timeline = pd.date_range(starts.min(), (ends.dropna().max() if ends.notna().any() else pd.Timestamp.today()), freq="14D")
    total = len(group)
    for dt in timeline:
        alive = ((ends.isna()) | (ends > dt)).sum()
        survival_records.append({"topic": topic, "date": dt, "survival": alive / total if total else np.nan})

survival_curves = pd.DataFrame(survival_records)

In [None]:
# --- Voting blocs & controversy ----------------------------------------------------------------

votes_long = votes.copy()
if not votes_long.empty:
    votes_long["topic"] = votes_long["bill_id"].map(canonical_topic)
    votes_long = votes_long.dropna(subset=["topic"])
    votes_long["chamber"] = votes_long["location_code"].apply(lambda code: 
        'Assembly' if isinstance(code, str) and code.startswith('A') else 
        ('Senate' if isinstance(code, str) and code.startswith('S') else 'Both')
    )
else:
    votes_long = pd.DataFrame(columns=["bill_id", "topic", "vote_value", "chamber"])

vote_matrix = (votes_long
               .pivot_table(index="legislator_name", columns="bill_id", values="vote_value", fill_value=0)) if not votes_long.empty else pd.DataFrame()

similarity_rows = []
legislators = vote_matrix.index.tolist()
for i, a in enumerate(legislators):
    for b in legislators[i + 1:]:
        vec_a = vote_matrix.loc[a].values
        vec_b = vote_matrix.loc[b].values
        if not np.any(vec_a) or not np.any(vec_b):
            continue
        sim = float(np.corrcoef(vec_a, vec_b)[0, 1])
        if np.isnan(sim):
            continue
        if sim >= 0.6:
            similarity_rows.append({"source": a, "target": b, "weight": sim})

vote_similarity_edges = pd.DataFrame(similarity_rows)
if not vote_similarity_edges.empty:
    vote_similarity_edges = vote_similarity_edges.rename(columns={"weight": "sim", "source": "u", "target": "v"})

party_votes = votes_long.merge(politicians[["full_name", "party", "term"]],
                               left_on=["legislator_name", "term"], right_on=["full_name", "term"], how="left") if not votes_long.empty else pd.DataFrame(columns=["bill_id", "topic", "party", "vote_value"])
party_votes = party_votes.dropna(subset=["party"]) if not party_votes.empty else party_votes

roll_party = (party_votes.groupby(["bill_id", "topic", "party"], as_index=False)
              .agg(yes_rate=("vote_value", lambda x: float((np.array(x) > 0).mean()) if len(x) else np.nan))) if not party_votes.empty else pd.DataFrame(columns=["bill_id", "topic", "party", "yes_rate"])

party_pivot = roll_party.pivot_table(index=["bill_id", "topic"], columns="party", values="yes_rate") if not roll_party.empty else pd.DataFrame()
if not party_pivot.empty:
    party_pivot = party_pivot.reset_index().rename(columns={"D": "dem_yes", "R": "rep_yes"})
    party_pivot["polarization"] = (party_pivot["dem_yes"] - party_pivot["rep_yes"]).abs()
else:
    party_pivot = pd.DataFrame(columns=["bill_id", "topic", "dem_yes", "rep_yes", "polarization"])

topic_controversy = (party_pivot.groupby("topic", as_index=False)
                     .agg(mean_polarization=("polarization", "mean"),
                          party_split_rate=("polarization", lambda x: float((np.array(x) > 0.5).mean())))) if not party_pivot.empty else pd.DataFrame(columns=["topic", "mean_polarization", "party_split_rate"])

rollcall_party_splits = party_pivot

In [None]:
# --- Committee vs floor drift ------------------------------------------------------------------

committee_votes_long = votes_long.loc[votes_long.get("is_committee", False)] if not votes_long.empty else pd.DataFrame(columns=["legislator_name", "bill_id"])
floor_votes_long = votes_long.loc[votes_long.get("is_floor", False)] if not votes_long.empty else pd.DataFrame(columns=["legislator_name", "bill_id"])

committee_summary = (committee_votes_long
                     .groupby(["legislator_name", "bill_id"], as_index=False)['vote_value']
                     .mean().rename(columns={"vote_value": "committee_score"})) if not committee_votes_long.empty else pd.DataFrame(columns=["legislator_name", "bill_id", "committee_score"])

floor_summary = (floor_votes_long
                 .groupby(["legislator_name", "bill_id"], as_index=False)['vote_value']
                 .mean().rename(columns={"vote_value": "floor_score"})) if not floor_votes_long.empty else pd.DataFrame(columns=["legislator_name", "bill_id", "floor_score"])

committee_floor_drift = (committee_summary.merge(floor_summary, on=["legislator_name", "bill_id"], how="inner")
                          .assign(drift=lambda d: d["committee_score"] - d["floor_score"])) if not committee_summary.empty else pd.DataFrame(columns=["legislator_name", "bill_id", "committee_score", "floor_score", "drift"])

In [None]:
# --- Text lift (log odds) ----------------------------------------------------------------------

text_rows = []
for _, row in digests.iterrows():
    bill_id = row.get("bill_id")
    topic = canonical_topic(bill_id)
    if not topic:
        continue
    outcome = y_df.loc[y_df["bill_id"] == bill_id, "outcome"].head(1)
    if outcome.empty:
        continue
    tokens = tokenise(row.get("digesttext", ""))
    for token in tokens:
        text_rows.append({"topic": topic, "token": token, "outcome": int(outcome.iloc[0])})

text_df = pd.DataFrame(text_rows)

def log_odds(good, bad, alpha=0.01):
    return math.log((good + alpha) / (bad + alpha))

text_stats = []
for (topic, token), group in text_df.groupby(["topic", "token"]):
    passed = (group["outcome"] == 1).sum()
    failed = (group["outcome"] != 1).sum()
    lift = log_odds(passed, failed)
    text_stats.append({"topic": topic, "token": token, "log_lift_pass_vs_other": lift, "pos": passed, "neg": failed})

text_lift_top_tokens = pd.DataFrame(text_stats) if text_stats else pd.DataFrame(columns=["topic", "token", "log_lift_pass_vs_other", "pos", "neg"])
if not text_lift_top_tokens.empty:
    text_lift_top_tokens["count"] = text_lift_top_tokens["pos"] + text_lift_top_tokens["neg"]


In [None]:
# --- Funding allocation ------------------------------------------------------------------------

if not lobbying.empty:
    lobbying["expn_date"] = pd.to_datetime(lobbying.get("EXPN_DATE", lobbying.get("expn_date")), errors="coerce")
    lobbying["term"] = lobbying["expn_date"].apply(lambda x: np.nan if pd.isna(x) else (
        f"{x.year-1}-{x.year}" if (x.year % 2 == 0 and x.month < 11) else
        f"{x.year+1}-{x.year+2}" if (x.year % 2 == 0) else
        f"{x.year}-{x.year+1}"
    ))
    lobbying["beneficiary_clean"] = lobbying.get("clean_beneficiary", lobbying.get("BENEFICIARY"))
    lobby_by_leg = (lobbying.dropna(subset=["beneficiary_clean", "term"])
                    .groupby(["beneficiary_clean", "term"], as_index=False)["BENE_AMT"].sum()
                    .rename(columns={"BENE_AMT": "lobbying"}))
else:
    lobby_by_leg = pd.DataFrame(columns=["beneficiary_clean", "term", "lobbying"])

for df in (expend_assembly, expend_senate):
    if not df.empty and "matched_target_name" in df.columns:
        df["target_clean"] = df["matched_target_name"].str.lower().str.replace(",", "", regex=False)
        df["term"] = df["year"]

don_as = (expend_assembly.groupby(["target_clean", "term"], as_index=False)["Amount"].sum()
          if not expend_assembly.empty else pd.DataFrame(columns=["target_clean", "term", "Amount"]))

don_sen = (expend_senate.groupby(["target_clean", "term"], as_index=False)["Amount"].sum()
           if not expend_senate.empty else pd.DataFrame(columns=["target_clean", "term", "Amount"]))

donations = (pd.concat([don_as, don_sen], ignore_index=True)
             .rename(columns={"target_clean": "beneficiary_clean", "Amount": "donations"})) if not don_as.empty or not don_sen.empty else pd.DataFrame(columns=["beneficiary_clean", "term", "donations"])

funding = (donations.merge(lobby_by_leg, on=["beneficiary_clean", "term"], how="outer")
           .fillna({"donations": 0.0, "lobbying": 0.0})) if not donations.empty or not lobby_by_leg.empty else pd.DataFrame(columns=["beneficiary_clean", "term", "donations", "lobbying"])
funding["total_received"] = funding.get("donations", 0) + funding.get("lobbying", 0)

votes_topic_weights = (votes_long.groupby(["legislator_name", "term", "topic"])
                       ["vote_value"].apply(lambda x: float((np.array(x) > 0).sum()) / max(len(x), 1))
                       .reset_index(name="topic_weight")) if not votes_long.empty else pd.DataFrame(columns=["legislator_name", "term", "topic", "topic_weight"])

if not funding.empty:
    funding["beneficiary_lower"] = funding["beneficiary_clean"].astype(str)
    ca_legislator_funding = (funding
                             .groupby(["beneficiary_lower", "term"], as_index=False)
                             .agg(total_received=("total_received", "sum"),
                                  donations=("donations", "sum"),
                                  lobbying=("lobbying", "sum")))
    funding_alloc = (funding
                     .merge(votes_topic_weights, left_on=["beneficiary_lower", "term"],
                            right_on=["legislator_name", "term"], how="left")
                     .fillna({"topic_weight": 0.0}))
    funding_alloc["donations_topic"] = funding_alloc["donations"] * funding_alloc["topic_weight"]
    funding_alloc["lobbying_topic"] = funding_alloc["lobbying"] * funding_alloc["topic_weight"]
    funding_alloc["total_topic"] = funding_alloc["total_received"] * funding_alloc["topic_weight"]
else:
    ca_legislator_funding = pd.DataFrame(columns=["beneficiary_lower", "term", "total_received", "donations", "lobbying"])
    funding_alloc = pd.DataFrame(columns=["beneficiary_lower", "term", "topic_weight", "donations", "lobbying", "total_received"])

topic_funding_by_term = (funding_alloc.groupby(["topic", "term"], as_index=False)
                         .agg(total_donations=("donations_topic", "sum"),
                              total_lobbying=("lobbying_topic", "sum"),
                              total_received=("total_topic", "sum"))) if not funding_alloc.empty else pd.DataFrame(columns=["topic", "term", "total_donations", "total_lobbying", "total_received"])

legislator_topic_funding = (funding_alloc.groupby(["beneficiary_lower", "term", "topic"], as_index=False)
                            .agg(donations=("donations_topic", "sum"),
                                 lobbying=("lobbying_topic", "sum"),
                                 total=("total_topic", "sum"))) if not funding_alloc.empty else pd.DataFrame(columns=["beneficiary_lower", "term", "topic", "donations", "lobbying", "total"])


In [None]:
# --- Funding geography -------------------------------------------------------------------------

geo_records = []
if gpd is not None and MAP_DIR.exists():
    geo_file = next(MAP_DIR.glob("ca_legislator_funding_geo.*"), None)
    if geo_file is not None:
        try:
            geo_df = gpd.read_file(geo_file)
            keep_cols = [c for c in ["district", "chamber", "geometry"] if c in geo_df.columns]
            geo_df = geo_df[keep_cols]
            geo_records = json.loads(geo_df.to_crs(epsg=4326).to_json())
        except Exception as exc:
            warnings.warn(f"Unable to read legislator geography: {exc}")
else:
    warnings.warn("Geopandas not available or map directory missing; skipping geography export.")

In [None]:
# --- Bill browser table ------------------------------------------------------------------------

bill_longevity = (y_df
                  .assign(longevity_days=lambda d: (pd.to_datetime(d["last_action"]) -
                                                    pd.to_datetime(d["first_action"]))
                          .dt.days)) if not y_df.empty else pd.DataFrame(columns=["bill_id", "topic", "longevity_days"])

bills_table = (bill_longevity
               .merge(amendment_churn[["bill_id", "n_versions", "median_similarity"]], on="bill_id", how="left")
               .merge(route_sequences[["bill_id", "route_key"]], on="bill_id", how="left")
               .merge(risk_register[["bill_id", "risk_score"]], on="bill_id", how="left")) if not bill_longevity.empty else pd.DataFrame(columns=["bill_id", "topic"])

bills_table.rename(columns={"first_action": "first_action_date", "last_action": "last_action_date"}, inplace=True)

In [None]:
# --- Precomputed payload -----------------------------------------------------------------------

precomp_outputs = {
    "pipeline_stage_funnel": pipeline_stage_funnel,
    "pipeline_stage_durations": pipeline_stage_durations,
    "pipeline_stage_calendar": stage_calendar,
    "route_archetypes": route_archetypes,
    "amendment_churn": amendment_churn,
    "risk_register": risk_register,
    "committee_gatekeeping": committee_gatekeeping,
    "committee_workload": committee_workload,
    "cross_chamber_friction": cross_chamber_friction,
    "survival_curves": survival_curves,
    "topic_controversy": topic_controversy,
    "rollcall_party_splits": rollcall_party_splits,
    "vote_similarity_edges": vote_similarity_edges,
    "committee_floor_drift": committee_floor_drift,
    "text_lift_top_tokens": text_lift_top_tokens,
    "topic_funding_by_term": topic_funding_by_term,
    "topic_funding_by_leg": legislator_topic_funding,
    "ca_legislator_funding_geo": geo_records,
    "ca_legislator_funding": ca_legislator_funding,
    "bills_table": bills_table,
}

precomp_outputs
