[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/abigailhaddad/fedscope_new/blob/main/reconciliation_analysis.ipynb)

# Federal Workforce Data Reconciliation Analysis

Comparing snapshot employment data vs flow data (accessions - separations).

**All data filtered by `personnel_action_effective_date_yyyymm` to use actual action dates.**

In [None]:
import duckdb
import ipywidgets as widgets
from IPython.display import display

HF_USERNAME = "abigailhaddad"

def parquet_url(data_type: str, month: str) -> str:
    """HuggingFace parquet URL for a dataset."""
    return f"hf://datasets/{HF_USERNAME}/opm-federal-{data_type}-{month}/data.parquet"

## Config

In [None]:
MONTHS = ["202501", "202502", "202503", "202504", "202505", "202506", "202507", "202508", "202509"]

# Interactive controls
window_size_widget = widgets.IntSlider(value=4, min=2, max=6, step=1, description='Window (months):')
threshold_pct_widget = widgets.FloatSlider(value=20.0, min=5.0, max=50.0, step=5.0, description='Threshold %:')
min_employees_widget = widgets.IntSlider(value=0, min=0, max=1000, step=50, description='Min employees:')
mismatch_only_widget = widgets.Checkbox(value=False, description='Only show opposite stories')

display(window_size_widget, threshold_pct_widget, min_employees_widget, mismatch_only_widget)

## Load Data

In [None]:
def union_all_months(data_type: str, months: list[str]) -> str:
    """Generate SQL to UNION ALL parquet files for all months."""
    selects = [
        f"SELECT *, '{m}' as file_month FROM read_parquet('{parquet_url(data_type, m)}')"
        for m in months
    ]
    return " UNION ALL ".join(selects)


con = duckdb.connect()

# Load all data into views
con.execute(f"CREATE VIEW employment AS {union_all_months('employment', MONTHS)}")
con.execute(f"CREATE VIEW accessions AS {union_all_months('accessions', MONTHS)}")
con.execute(f"CREATE VIEW separations AS {union_all_months('separations', MONTHS)}")

print(f"Loaded {len(MONTHS)} months of data")

## Compute Window Discrepancies

In [None]:
def get_windows(months: list[str], window_size: int) -> list[tuple[str, str]]:
    """Generate (start_month, end_month) tuples for rolling windows."""
    return [(months[i], months[i + window_size - 1]) 
            for i in range(len(months) - window_size + 1)]


def window_discrepancy_sql(start_month: str, end_month: str, months: list[str]) -> str:
    """SQL to compute discrepancy % for a window, by subagency."""
    window_months = months[months.index(start_month):months.index(end_month) + 1]
    month_list = ", ".join(f"'{m}'" for m in window_months)
    
    return f"""
    WITH start_emp AS (
        SELECT agency, agency_code, agency_subelement, agency_subelement_code,
               SUM(CAST(count AS INTEGER)) as emp
        FROM employment WHERE file_month = '{start_month}'
        GROUP BY ALL
    ),
    end_emp AS (
        SELECT agency, agency_code, agency_subelement, agency_subelement_code,
               SUM(CAST(count AS INTEGER)) as emp
        FROM employment WHERE file_month = '{end_month}'
        GROUP BY ALL
    ),
    acc AS (
        SELECT agency, agency_code, agency_subelement, agency_subelement_code,
               SUM(CAST(count AS INTEGER)) as total
        FROM accessions 
        WHERE personnel_action_effective_date_yyyymm IN ({month_list})
        GROUP BY ALL
    ),
    sep AS (
        SELECT agency, agency_code, agency_subelement, agency_subelement_code,
               SUM(CAST(count AS INTEGER)) as total
        FROM separations
        WHERE personnel_action_effective_date_yyyymm IN ({month_list})
        GROUP BY ALL
    )
    SELECT 
        COALESCE(s.agency, e.agency) as agency,
        COALESCE(s.agency_code, e.agency_code) as agency_code,
        COALESCE(s.agency_subelement, e.agency_subelement) as agency_subelement,
        COALESCE(s.agency_subelement_code, e.agency_subelement_code) as agency_subelement_code,
        '{start_month}' as window_start,
        '{end_month}' as window_end,
        COALESCE(s.emp, 0) as start_emp,
        COALESCE(e.emp, 0) as end_emp,
        COALESCE(a.total, 0) as accessions,
        COALESCE(sp.total, 0) as separations,
        (COALESCE(e.emp, 0) - COALESCE(s.emp, 0)) as snapshot_change,
        (COALESCE(a.total, 0) - COALESCE(sp.total, 0)) as flow_expected,
        (COALESCE(e.emp, 0) - COALESCE(s.emp, 0)) - (COALESCE(a.total, 0) - COALESCE(sp.total, 0)) as discrepancy,
        CASE WHEN COALESCE(s.emp, 0) > 0 
             THEN ABS((COALESCE(e.emp, 0) - COALESCE(s.emp, 0)) - (COALESCE(a.total, 0) - COALESCE(sp.total, 0))) * 100.0 / s.emp
             ELSE NULL END as discrepancy_pct
    FROM start_emp s
    FULL OUTER JOIN end_emp e USING (agency, agency_code, agency_subelement, agency_subelement_code)
    LEFT JOIN acc a USING (agency, agency_code, agency_subelement, agency_subelement_code)
    LEFT JOIN sep sp USING (agency, agency_code, agency_subelement, agency_subelement_code)
    """

In [None]:
def run_analysis(window_size: int, threshold_pct: float, min_employees: int = 0):
    """Run the full analysis with given parameters."""
    windows = get_windows(MONTHS, window_size)
    print(f"Parameters: window_size={window_size}, threshold={threshold_pct}%, min_employees={min_employees}")
    
    con.execute("DROP VIEW IF EXISTS all_discrepancies")
    all_windows_sql = " UNION ALL ".join(
        f"({window_discrepancy_sql(start, end, MONTHS)})" 
        for start, end in windows
    )
    con.execute(f"CREATE VIEW all_discrepancies AS {all_windows_sql}")
    
    result = con.execute(f"""
        WITH flagged AS (
            SELECT * FROM all_discrepancies 
            WHERE discrepancy_pct > {threshold_pct} AND start_emp >= {min_employees}
        ),
        ordered AS (
            SELECT *, LAG(window_end) OVER (PARTITION BY agency_code, agency_subelement_code ORDER BY window_start) as prev_end
            FROM flagged
        ),
        islands AS (
            SELECT *,
                   SUM(CASE WHEN prev_end IS NULL OR window_start > prev_end THEN 1 ELSE 0 END) 
                       OVER (PARTITION BY agency_code, agency_subelement_code ORDER BY window_start) as island_id
            FROM ordered
        ),
        merged_islands AS (
            SELECT agency, agency_code, agency_subelement, agency_subelement_code, island_id,
                   MIN(window_start) as range_start, MAX(window_end) as range_end,
                   COUNT(*) as windows_in_island,
                   SUM(snapshot_change) as total_snapshot_change,
                   SUM(flow_expected) as total_flow_expected,
                   AVG(start_emp) as avg_start_emp
            FROM islands
            GROUP BY agency, agency_code, agency_subelement, agency_subelement_code, island_id
        )
        SELECT agency, agency_code, agency_subelement, agency_subelement_code,
               STRING_AGG(range_start || '-' || range_end, ', ' ORDER BY range_start) as flagged_periods,
               SUM(windows_in_island)::INT as n_windows_flagged,
               ROUND(AVG(avg_start_emp))::INT as avg_employees,
               SUM(total_snapshot_change)::INT as snapshot_change,
               SUM(total_flow_expected)::INT as flow_expected,
               CASE WHEN SUM(total_snapshot_change) > 0 THEN 'growing' WHEN SUM(total_snapshot_change) < 0 THEN 'shrinking' ELSE 'flat' END as snapshot_says,
               CASE WHEN SUM(total_flow_expected) > 0 THEN 'growing' WHEN SUM(total_flow_expected) < 0 THEN 'shrinking' ELSE 'flat' END as flow_says,
               CASE WHEN (SUM(total_snapshot_change) > 0 AND SUM(total_flow_expected) < 0) OR (SUM(total_snapshot_change) < 0 AND SUM(total_flow_expected) > 0) THEN true ELSE false END as story_mismatch
        FROM merged_islands
        GROUP BY agency, agency_code, agency_subelement, agency_subelement_code
        ORDER BY story_mismatch DESC, n_windows_flagged DESC, agency, agency_subelement
    """).fetchdf()
    
    n_mismatch = result['story_mismatch'].sum()
    print(f"Found {len(result)} subagencies with >{threshold_pct}% discrepancy")
    print(f"  â†’ {n_mismatch} have OPPOSITE stories\n")
    return result


def export_to_csv(df, filename="reconciliation_results.csv"):
    """Export results to CSV."""
    df.to_csv(filename, index=False)
    print(f"Exported {len(df)} rows to {filename}")


def drill_down(agency_code: str, subelement_code: str):
    """Show monthly breakdown for a specific subagency."""
    result = con.execute(f"""
        SELECT window_start, window_end, start_emp, end_emp, 
               accessions, separations, snapshot_change, flow_expected,
               discrepancy, ROUND(discrepancy_pct, 1) as discrepancy_pct
        FROM all_discrepancies
        WHERE agency_code = '{agency_code}' 
          AND agency_subelement_code = '{subelement_code}'
        ORDER BY window_start
    """).fetchdf()
    
    if len(result) == 0:
        print(f"No data found for {agency_code}/{subelement_code}")
        return None
    
    agency_name = con.execute(f"""
        SELECT DISTINCT agency, agency_subelement 
        FROM all_discrepancies 
        WHERE agency_code = '{agency_code}' AND agency_subelement_code = '{subelement_code}'
    """).fetchone()
    
    print(f"Drill-down: {agency_name[0]} / {agency_name[1]}")
    print(f"Codes: {agency_code} / {subelement_code}\n")
    return result

## Run Analysis

Adjust the sliders above and re-run this cell to see results with different parameters.

In [None]:
# Run with current widget values
result = run_analysis(
    window_size_widget.value, 
    threshold_pct_widget.value,
    min_employees_widget.value
)

if mismatch_only_widget.value:
    result = result[result['story_mismatch'] == True]
    print(f"Filtered to {len(result)} with opposite stories only\n")

result

## Export Results

In [None]:
# Export current results to CSV
export_to_csv(result, "reconciliation_results.csv")

## Drill-Down

Use `drill_down(agency_code, subelement_code)` to see window-by-window breakdown for a specific subagency.

In [None]:
# Example: drill into Interior's Office of the Secretary (big mismatch: +15997 snapshot vs -470 flow)
drill_down("IN", "IN01")