# Household Net Income Breakdown Analysis

This notebook analyzes household net income components from a CSV file, focusing on stacked bar charts and waterfall visualizations.

In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from policyengine_us import Simulation
import os
from typing import Dict, Any, List, Optional
import json
import plotly.express as px

  from .autonotebook import tqdm as notebook_tqdm


## Loading and Preparing Data

Load data from a CSV file and prepare it for analysis.

In [2]:
def load_household_data(file_path: str) -> pd.DataFrame:
    """
    Load household data from CSV file.
    
    Args:
        file_path: Path to the CSV file containing household data
        
    Returns:
        DataFrame with household data
    """
    try:
        # Load the CSV file
        df = pd.read_csv(file_path)
        print(f"Successfully loaded {len(df)} records from {file_path}")
        print(f"Columns: {df.columns.tolist()}")
        return df
    except Exception as e:
        print(f"Error loading data: {str(e)}")
        return None

In [13]:
def create_household_situation(row: pd.Series) -> Dict[str, Any]:
    """
    Create a household situation dictionary for PolicyEngine from a CSV row.
    Includes axes parameter for parameter sweep.
    
    Args:
        row: A row from the households dataframe
        
    Returns:
        A dictionary representing the household situation for PolicyEngine
    """
    # State FIPS to state code mapping
    STATE_MAPPING = {
        1: "AL", 2: "AK", 3: "AZ", 4: "AR", 5: "CA", 6: "CO", 7: "CT", 8: "DE", 9: "DC", 10: "FL",
        11: "GA", 12: "HI", 13: "ID", 14: "IL", 15: "IN", 16: "IA", 17: "KS", 18: "KY", 19: "LA", 20: "ME",
        21: "MD", 22: "MA", 23: "MI", 24: "MN", 25: "MS", 26: "MO", 27: "MT", 28: "NE", 29: "NV", 30: "NH",
        31: "NJ", 32: "NM", 33: "NY", 34: "NC", 35: "ND", 36: "OH", 37: "OK", 38: "OR", 39: "PA", 40: "RI",
        41: "SC", 42: "SD", 43: "TN", 44: "TX", 45: "UT", 46: "VT", 47: "VA", 48: "WA", 49: "WV", 50: "WI",
        51: "WY",
    }
    
    # Default state list for randomly assigning state codes to zero values
    DEFAULT_STATES = list(STATE_MAPPING.values())
    
    # Extract basic household information
    state_fips = row['state']
    
    # Convert state FIPS to state code or assign random state if zero
    import random
    if state_fips == 0 or state_fips not in STATE_MAPPING:
        state = random.choice(DEFAULT_STATES)
    else:
        state = STATE_MAPPING[state_fips]
    
    mstat = row['mstat']
    page = row['page']
    sage = row['sage'] if mstat == 2 else 0
    depx = row['depx']
    
    # Extract income information
    swages = row.get('swages', 0)
    psemp = row.get('psemp', 0)
    ssemp = row.get('ssemp', 0)
    dividends = row.get('dividends', 0)
    intrec = row.get('intrec', 0)
    stcg = row.get('stcg', 0)
    ltcg = row.get('ltcg', 0)
    pensions = row.get('pensions', 0)
    gssi = row.get('gssi', 0)
    pui = row.get('pui', 0)  # Primary unemployment insurance
    
    # Create the base situation
    situation = {
        "people": {
            "you": {
                "age": {
                    "2025": int(page)
                },
                "self_employment_income": {
                    "2025": float(psemp)
                },
                "dividend_income": {
                    "2025": float(dividends)  # Assign all to head
                },
                "interest_income": {
                    "2025": float(intrec)  # Assign all to head
                },
                "short_term_capital_gains": {
                    "2025": float(stcg)  # Assign all to head
                },
                "long_term_capital_gains": {
                    "2025": float(ltcg)  # Assign all to head
                },
                "pension_income": {
                    "2025": float(pensions)  # Assign all to head
                },
                "social_security": {
                    "2025": float(gssi)  # Assign all to head
                },
                "unemployment_compensation": {
                    "2025": float(pui)  # Primary unemployment insurance
                }
            }
        },
        "families": {
            "your family": {
                "members": [
                    "you"
                ]
            }
        },
        "marital_units": {
            "your marital unit": {
                "members": [
                    "you"
                ]
            }
        },
        "tax_units": {
            "your tax unit": {
                "members": [
                    "you"
                ]
            }
        },
        "spm_units": {
            "your household": {
                "members": [
                    "you"
                ]
            }
        },
        "households": {
            "your household": {
                "members": [
                    "you"
                ],
                "state_code": {
                    "2025": state
                }
            }
        },
        "axes":[
            [
                {
                    "name": "employment_income",
                    "count": 200, 
                    "min": 0,
                    "max": 200_000
                }
            ]
        ]
    }
    
    # Add spouse if married
    if mstat == 2 and sage > 0:
        situation["people"]["your spouse"] = {
            "age": {
                "2025": int(sage)
            },
            "self_employment_income": {
                "2025": float(ssemp)
            }
        }
        
        # Add spouse to all units
        for unit in ["families", "marital_units", "tax_units", "spm_units", "households"]:
            first_unit = list(situation[unit].keys())[0]
            situation[unit][first_unit]["members"].append("your spouse")
    
    # Add dependents (children)
    for i in range(int(depx)):
        child_age_col = f'age{i+1}'
        child_age = row.get(child_age_col, 0)
        
        if child_age > 0:
            child_id = f"child_{i+1}"
            situation["people"][child_id] = {
                "age": {
                    "2025": int(child_age)
                }
            }
            
            # Add child to all units
            for unit in ["families", "tax_units", "spm_units", "households"]:
                first_unit = list(situation[unit].keys())[0]
                situation[unit][first_unit]["members"].append(child_id)
    
    return situation


In [14]:
def process_households(csv_path: str, sample_size: Optional[int] = None) -> pd.DataFrame:
    """
    Process a CSV file of households, run simulations, and return results for all axes values.
    This function processes each household individually but returns a combined DataFrame.
    
    Args:
        csv_path: Path to the CSV file
        sample_size: Optional number of households to process (for testing)
        
    Returns:
        DataFrame with simulation results for each household across all axes values
    """
    # Load data
    households_df = load_household_data(csv_path)
    
    if households_df is None:
        return None
    
    # Take a sample if specified
    if sample_size is not None:
        households_df = households_df.sample(n=sample_size, random_state=42)
    
    # Initialize an empty DataFrame to store all results
    all_results_df = pd.DataFrame()
    
    # Process each household
    for idx, row in households_df.iterrows():
        print(f"Processing household {idx+1}/{len(households_df)}...")
        
        # Create situation with axes parameter
        situation = create_household_situation(row)
        
        # Calculate results
        simulation = Simulation(situation=situation)
        
        # Define variables to calculate
        variables = [
            "household_net_income",
            "household_market_income",
            "income_tax_before_refundable_credits",
            "state_income_tax_before_refundable_credits",
            "eitc",
            "refundable_ctc",
            "state_refundable_credits",
            "snap",
            "ssi",
            "free_school_meals",
            "tanf",
            "social_security",
            "household_state_benefits",
            "medicaid",
            "premium_tax_credit"
        ]
        
        # Get the employment income values from the axes
        axes_count = situation["axes"][0][0]["count"] 
        employment_income_values = simulation.calculate("employment_income", map_to = "household", period = 2025)

        state_code = situation["households"]["your household"]["state_code"]["2025"]
        
        # Extract all the inputs from the situation to include in results
        # Create a results dictionary with the basic demographic info
        results_dict = {
            "household_id": [row.get('taxsimid', idx)] * axes_count,
            "state_code": [state_code] * axes_count,
            "marital_status": [row['mstat']] * axes_count,
            "head_age": [row['page']] * axes_count,
            "spouse_age": [row['sage']] * axes_count,
            "dependents": [row['depx']] * axes_count,
            "employment_income": employment_income_values.tolist()  # The swept parameter
        }
        
        # Add all other income components from the primary person
        primary_person = situation["people"]["you"]
        for income_type, values in primary_person.items():
            if income_type != "age" and "2025" in values:
                results_dict[f"{income_type}"] = [float(values["2025"])] * axes_count
        
        # Add spouse income components if present
        if "your spouse" in situation["people"]:
            spouse_person = situation["people"]["your spouse"]
            for income_type, values in spouse_person.items():
                if income_type != "age" and "2025" in values:
                    results_dict[f"spouse_{income_type}"] = [float(values["2025"])] * axes_count
        
        # Add dependent ages if present
        for i in range(int(row['depx'])):
            child_id = f"child_{i+1}"
            if child_id in situation["people"]:
                child_age = situation["people"][child_id]["age"]["2025"]
                results_dict[f"dependent_{i+1}_age"] = [child_age] * axes_count
        
        
        # Calculate all the specified variables
        for var in variables:
            try:
                value_array = simulation.calculate(var, map_to = "household", period = 2025)
                # Store all values in the array (one for each employment income value)
                results_dict[var] = value_array.tolist()
            except Exception as e:
                print(f"Error calculating {var} for household {idx}: {str(e)}")
                results_dict[var] = [None] * axes_count
        
        # Convert to DataFrame for this household
        household_df = pd.DataFrame(results_dict)
        
        # Append to the combined results
        all_results_df = pd.concat([all_results_df, household_df], ignore_index=True)
    
    print(f"Processed {len(households_df)} households, generated {len(all_results_df)} rows")
    return all_results_df

In [15]:
# Run process_households with your CSV file
# results_df = process_households('cps_households.csv')

# If you want to test with a smaller sample first
results_df = process_households('cps_households.csv', sample_size=100)

# To see the first few rows of the results
results_df.head()

Successfully loaded 112502 records from cps_households.csv
Columns: ['taxsimid', 'year', 'state', 'mstat', 'page', 'sage', 'depx', 'pwages', 'psemp', 'swages', 'ssemp', 'dividends', 'intrec', 'stcg', 'ltcg', 'otherprop', 'nonprop', 'pensions', 'gssi', 'pui', 'sui', 'transfers', 'rentpaid', 'proptax', 'otheritem', 'childcare', 'mortgage', 'scorp', 'idtl', 'age1', 'age2', 'age3', 'age4', 'age5', 'age6', 'age7', 'age8', 'age9', 'age10', 'age11']
Processing household 51200/100...
Processing household 87402/100...
Processing household 78145/100...
Processing household 37064/100...
Processing household 79525/100...
Processing household 13665/100...
Processing household 97725/100...
Processing household 11119/100...
Processing household 86782/100...
Processing household 62563/100...
Processing household 72454/100...
Processing household 33732/100...
Processing household 56905/100...
Processing household 108642/100...
Processing household 6371/100...
Processing household 41440/100...
Processin

Unnamed: 0,household_id,state_code,marital_status,head_age,spouse_age,dependents,employment_income,self_employment_income,dividend_income,interest_income,...,snap,ssi,free_school_meals,tanf,household_state_benefits,medicaid,premium_tax_credit,dependent_1_age,dependent_2_age,dependent_3_age
0,51200.0,CA,2.0,70.0,76.0,0.0,0.0,0.0,0.0,0.728779,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,,,
1,51200.0,CA,2.0,70.0,76.0,0.0,1054.116211,0.0,0.0,0.728779,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,,,
2,51200.0,CA,2.0,70.0,76.0,0.0,2108.232422,0.0,0.0,0.728779,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,,,
3,51200.0,CA,2.0,70.0,76.0,0.0,3162.348877,0.0,0.0,0.728779,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,,,
4,51200.0,CA,2.0,70.0,76.0,0.0,4216.464844,0.0,0.0,0.728779,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,,,


In [16]:
results_df.to_csv('sampled_households_results.csv', index=False)

In [None]:
def run_single_household(csv_path: str, row_index: int = 0, show_results: bool = True):
    """
    Run a simulation for a single household from the CSV file with axes parameter.
    Include all inputs from the situation in the output CSV.
    
    Args:
        csv_path: Path to the CSV file
        row_index: Index of the household row to process (0-based)
        show_results: Whether to print the results
        
    Returns:
        DataFrame with simulation results across all axes values
    """
    # Load data
    households_df = load_household_data(csv_path)
    
    if households_df is None:
        print("Failed to load household data.")
        return None
    
    if row_index >= len(households_df):
        print(f"Error: Row index {row_index} is out of range. CSV has {len(households_df)} rows.")
        return None
    
    # Get the selected row
    row = households_df.iloc[row_index]
    print(f"Processing household at index {row_index}:")
    print(f"ID: {row.get('taxsimid', row_index)}, State: {row['state']}, Marital Status: {row['mstat']}")
    
    # Create situation (keep the axes parameter)
    situation = create_household_situation(row)
    
    # Calculate results
    simulation = Simulation(situation=situation)
    
    # Define variables to calculate
    variables = [
        "household_net_income",
        "household_market_income",
        "income_tax_before_refundable_credits",
        "state_income_tax_before_refundable_credits",
        "eitc",
        "refundable_ctc",
        "state_refundable_credits",
        "snap",
        "ssi",
        "free_school_meals",
        "tanf",
        "social_security",
        "household_state_benefits",
        "medicaid",
        "premium_tax_credit"
    ]
    
        
    # Get the employment income values from the axes
    axes_count = situation["axes"][0][0]["count"]
    employment_income_values = simulation.calculate("employment_income", map_to = "household", period = 2025)

    # Extract all the inputs from the situation to include in results
    # First, create a results dictionary with the basic demographic info
    results_dict = {
        "household_id": [row.get('taxsimid', row_index)] * axes_count,
        "state": [row['state']] * axes_count,
        "marital_status": [row['mstat']] * axes_count,
        "head_age": [row['page']] * axes_count,
        "spouse_age": [row['sage']] * axes_count,
        "dependents": [row['depx']] * axes_count,
        "employment_income": employment_income_values.tolist()  # The swept parameter
    }
    
    # Add all other income components from the primary person
    primary_person = situation["people"]["you"]
    for income_type, values in primary_person.items():
        if income_type != "age" and "2025" in values:
            results_dict[f"{income_type}"] = [float(values["2025"])] * axes_count
    
    # Add spouse income components if present
    if "your spouse" in situation["people"]:
        spouse_person = situation["people"]["your spouse"]
        for income_type, values in spouse_person.items():
            if income_type != "age" and "2025" in values:
                results_dict[f"spouse_{income_type}"] = [float(values["2025"])] * axes_count
    
    # Add dependent ages if present
    for i in range(int(row['depx'])):
        child_id = f"child_{i+1}"
        if child_id in situation["people"]:
            child_age = situation["people"][child_id]["age"]["2025"]
            results_dict[f"dependent_{i+1}_age"] = [child_age] * axes_count
    
    # Add state name
    state_code = situation["households"]["your household"]["state_code"]["2025"]
    results_dict["state_code"] = [state_code] * axes_count
    
    # Calculate all the specified variables
    for var in variables:
        try:
            value_array = simulation.calculate(var, 2025)
            # Store all values in the array (one for each employment income value)
            results_dict[var] = value_array.tolist()
        except Exception as e:
            print(f"Error calculating {var}: {str(e)}")
            results_dict[var] = [None] * axes_count
    
    # Convert to DataFrame
    results_df = pd.DataFrame(results_dict)
    
    # Print sample results if requested
    if show_results:
        print("\nSimulation Results (first 5 rows):")
        print("-" * 80)
        print(results_df.head(5))
        print("-" * 80)
        print(f"Total rows generated: {len(results_df)}")
        print(f"Total columns: {len(results_df.columns)}")
        print(f"Columns: {results_df.columns.tolist()}")
    
    return results_df

In [None]:
def visualize_household_components(household_results_df, household_id=None):
    """
    Create and display visualizations for household results with axes.
    
    Args:
        household_results_df: DataFrame returned from run_single_household function
        household_id: Optional ID to filter for a specific household
        
    Returns:
        Dictionary of plotly figures
    """
    # Filter for specific household if provided
    if household_id is not None:
        df = household_results_df[household_results_df['household_id'] == household_id]
    else:
        df = household_results_df
    
    # Get household details for the title
    state = df['state'].iloc[0]
    mstat = df['marital_status'].iloc[0]
    mstat_label = 'Married' if mstat == 2 else 'Single'
    
    # Create line chart showing how variables change with employment income
    fig1 = px.line(
        df, 
        x='employment_income', 
        y=['income_tax', 'state_income_tax', 'snap', 'household_net_income'],
        title=f"Policy Components by Employment Income<br><sup>State: {state}, Status: {mstat_label}</sup>",
        labels={'employment_income': 'Employment Income ($)', 'value': 'Amount ($)'},
        height=600,
        width=900
    )
    
    # Create effective tax rate chart
    df['effective_tax_rate'] = (df['income_tax'] + df['state_income_tax']) / df['employment_income'] * 100
    df['effective_tax_rate'] = df['effective_tax_rate'].replace([np.inf, -np.inf, np.nan], 0)
    
    fig2 = px.line(
        df, 
        x='employment_income', 
        y='effective_tax_rate',
        title=f"Effective Tax Rate by Employment Income<br><sup>State: {state}, Status: {mstat_label}</sup>",
        labels={'employment_income': 'Employment Income ($)', 'effective_tax_rate': 'Effective Tax Rate (%)'},
        height=500,
        width=800
    )
    
    # Create net income chart
    fig3 = px.line(
        df, 
        x='employment_income', 
        y='household_net_income',
        title=f"Net Household Income by Employment Income<br><sup>State: {state}, Status: {mstat_label}</sup>",
        labels={'employment_income': 'Employment Income ($)', 'household_net_income': 'Net Income ($)'},
        height=500,
        width=800
    )
    
    # Display the figures
    fig1.show()
    fig2.show()
    fig3.show()
    
    return {
        "policy_components": fig1,
        "effective_tax_rate": fig2,
        "net_income": fig3
    }


In [None]:
household_results = run_single_household('cps_households.csv', row_index=11119)
household_results.to_csv('single_household_results.csv', index=False)  # Save to CSV
visualize_household_components(household_results)