# Blank Slate UBI data generation

This notebook generates the data in the report, by:

* Solving for optimal UBI levels for each flat tax rate (locally)
* Using the PolicyEngine API to extract the economic impact data for each policy

In [22]:
from policyengine_core.model_api import *
from policyengine_us import *
from scipy.optimize import differential_evolution

INCOME_ELASTICITY = -0.05


def create_funding_reform(flat_tax_rate: float, include_lsrs: bool = True):
    return Reform.from_dict({
        "gov.contrib.ubi_center.flat_tax.abolish_federal_income_tax": {
            "year:2023:10": True
        },
        "gov.contrib.ubi_center.flat_tax.abolish_payroll_tax": {
            "year:2023:10": True
        },
        "gov.contrib.ubi_center.flat_tax.abolish_self_emp_tax": {
            "year:2023:10": True
        },
        "gov.ssa.ssi.abolish_ssi": {
            "year:2023:10": True
        },
        "gov.usda.snap.abolish_snap": {
            "year:2023:10": True
        },
        "gov.usda.wic.abolish_wic": {
            "year:2023:10": True
        },
        "gov.contrib.ubi_center.flat_tax.rate.agi": {
            "year:2023:10": flat_tax_rate
        },
        "gov.contrib.ubi_center.flat_tax.deduct_ptc": {
            "year:2023:10": True
        },
        "gov.simulation.labor_supply_responses.elasticities.income": {
            "year:2023:10": -0.05
        } if include_lsrs else {},
        "gov.simulation.labor_supply_responses.elasticities.substitution.by_position_and_decile.primary.1": {
            "year:2023:10": 0.31
        } if include_lsrs else {},
        "gov.simulation.labor_supply_responses.elasticities.substitution.by_position_and_decile.primary.10": {
            "year:2023:10": 0.22
        } if include_lsrs else {},
        "gov.simulation.labor_supply_responses.elasticities.substitution.by_position_and_decile.primary.2": {
            "year:2023:10": 0.28
        } if include_lsrs else {},
        "gov.simulation.labor_supply_responses.elasticities.substitution.by_position_and_decile.primary.3": {
            "year:2023:10": 0.27
        } if include_lsrs else {},
        "gov.simulation.labor_supply_responses.elasticities.substitution.by_position_and_decile.primary.4": {
            "year:2023:10": 0.27
        } if include_lsrs else {},
        "gov.simulation.labor_supply_responses.elasticities.substitution.by_position_and_decile.primary.5": {
            "year:2023:10": 0.25
        } if include_lsrs else {},
        "gov.simulation.labor_supply_responses.elasticities.substitution.by_position_and_decile.primary.6": {
            "year:2023:10": 0.25
        } if include_lsrs else {},
        "gov.simulation.labor_supply_responses.elasticities.substitution.by_position_and_decile.primary.7": {
            "year:2023:10": 0.22
        } if include_lsrs else {},
        "gov.simulation.labor_supply_responses.elasticities.substitution.by_position_and_decile.primary.8": {
            "year:2023:10": 0.22
        } if include_lsrs else {},
        "gov.simulation.labor_supply_responses.elasticities.substitution.by_position_and_decile.primary.9": {
            "year:2023:10": 0.22
        } if include_lsrs else {},
        "gov.simulation.labor_supply_responses.elasticities.substitution.by_position_and_decile.secondary": {
            "year:2023:10": 0.27
        } if include_lsrs else {},
    },
    "us",
    )


baseline = Microsimulation(dataset="enhanced_cps_2022")

class BlankSlatePolicy:
    young_child: float = 0
    older_child: float = 0
    young_adult: float = 0
    adult: float = 0
    senior: float = 0
    flat_tax_rate: float = 0.40

    def __init__(self, flat_tax_rate: float = 0.40, year: int = 2024, include_lsrs: bool = True):
        self.baseline = baseline
        self.blank_slate_funded = Microsimulation(reform=create_funding_reform(flat_tax_rate, include_lsrs), dataset="enhanced_cps_2022")
        self.baseline.default_calculation_period = year
        self.blank_slate_funded.default_calculation_period = year
        self.df = self.create_dataframe()
        self.ubi_funding = self.get_ubi_funding()
        self.flat_tax_rate = flat_tax_rate
        self.include_lsrs = include_lsrs

    def create_dataframe(self) -> pd.DataFrame:
        age = self.baseline.calc("age").values
        df = pd.DataFrame(
            dict(
                baseline_net_income=self.baseline.calc(
                    "household_net_income"
                ).values,
                baseline_tax=self.baseline.calculate(
                    "household_tax"
                ).values,
                baseline_benefits=self.baseline.calculate(
                    "household_benefits"
                ).values,
                count_young_child=self.baseline.map_result(
                    age < 6, "person", "household"
                ),
                count_older_child=self.baseline.map_result(
                    (age >= 6) & (age < 18), "person", "household"
                ),
                count_adult=self.baseline.map_result(
                    (age >= 18) & (age < 65), "person", "household"
                ),
                count_senior=self.baseline.map_result(
                    age >= 65, "person", "household"
                ),
                count_person=self.baseline.map_result(
                    age >= 0, "person", "household"
                ),
                funded_net_income=self.blank_slate_funded.calculate(
                    "household_net_income"
                ).values,
                funded_tax=self.blank_slate_funded.calculate(
                    "household_tax"
                ).values,
                funded_benefits=self.blank_slate_funded.calculate(
                    "household_benefits"
                ).values,
                count_disabled=self.baseline.calculate("is_ssi_disabled", map_to="household").values,
                weight=self.baseline.calculate("household_weight").values,
            )
        )

        df["total_employment_income"] = self.baseline.calculate("employment_income", map_to="household").values
        df["total_self_employment_income"] = self.baseline.calculate("self_employment_income", map_to="household").values
        return df

    def get_ubi_funding(self) -> float:
        return (
            (self.df.funded_tax - self.df.funded_benefits - self.df.baseline_tax + self.df.baseline_benefits)
            * self.df.weight
        ).sum()

    def get_senior_amount(
        self,
        young_child: float,
        older_child: float,
        adult: float,
        disabled: float,
        funding_offset: float = 0,
    ) -> float:
        return (
            self.ubi_funding + funding_offset
            - young_child * (self.df.count_young_child * self.df.weight).sum()
            - older_child * (self.df.count_older_child * self.df.weight).sum()
            - adult * (self.df.count_adult * self.df.weight).sum()
            - disabled * (self.df.count_disabled * self.df.weight).sum()
        ) / (self.df.count_senior * self.df.weight).sum()

    def mean_percentage_loss(
        self,
        young_child: float,
        older_child: float,
        adult: float,
        disabled: float,
        return_extras: bool = False
    ) -> float:
        senior_amount = self.get_senior_amount(
            young_child, older_child, adult, disabled,
        )
        final_net_income = (
            self.df.funded_net_income
            + self.df.count_young_child * young_child
            + self.df.count_older_child * older_child
            + self.df.count_adult * adult
            + self.df.count_senior * senior_amount
            + self.df.count_disabled * disabled
        )
        gain = final_net_income - self.df.baseline_net_income
        income_rel_change = final_net_income / self.df.funded_net_income - 1
        income_rel_change_c = np.clip(income_rel_change, -1, 1)
        total_income = self.df.total_employment_income + self.df.total_self_employment_income
        income_effect = income_rel_change_c * total_income * INCOME_ELASTICITY
        employment_income_change = income_effect
        MTR_GUESS = self.flat_tax_rate
        rough_net_income_change_from_lsr = employment_income_change * (1 - MTR_GUESS)
        rough_tax_change_from_lsr = employment_income_change * MTR_GUESS
        total_ubi_funding_change = (rough_tax_change_from_lsr * self.df.weight).sum()
        if self.include_lsrs:
            senior_amount = self.get_senior_amount(
                young_child, older_child, adult, disabled, total_ubi_funding_change
            )
        final_net_income = (
            self.df.funded_net_income
            + self.df.count_young_child * young_child
            + self.df.count_older_child * older_child
            + self.df.count_adult * adult
            + self.df.count_senior * senior_amount
            + self.df.count_disabled * disabled
        )
        gain = final_net_income - self.df.baseline_net_income + rough_net_income_change_from_lsr
        if not self.include_lsrs:
            gain = gain - rough_net_income_change_from_lsr
        absolute_loss = np.maximum(0, -gain)
        pct_loss = absolute_loss / np.maximum(100, self.df.baseline_net_income)
        average = np.average(
            pct_loss, weights=self.df.weight * self.df.count_person
        )
        if return_extras:
            return average, senior_amount, total_ubi_funding_change
        return average

    def solve(self, return_amounts: bool = False, return_loss: bool = False) -> dict:
        (
            self.young_child,
            self.older_child,
            self.adult,
            self.disabled,
        ) = differential_evolution(
            lambda x: self.mean_percentage_loss(*x),
            bounds=[(0, 30e3 * self.flat_tax_rate)] * 4,
            maxiter=int(1e2),
            seed=0,
        ).x
        _, self.senior, _ = self.mean_percentage_loss(
            self.young_child, self.older_child, self.adult, self.disabled, return_extras=True
        )
        self.reform = Reform.from_dict(
            {
                **create_funding_reform(self.flat_tax_rate).parameter_values,
                "gov.contrib.ubi_center.basic_income.amount.person.by_age[0].amount": {
                    "year:2023:10": self.young_child
                },
                "gov.contrib.ubi_center.basic_income.amount.person.by_age[1].amount": {
                    "year:2023:10": self.older_child
                },
                "gov.contrib.ubi_center.basic_income.amount.person.by_age[2].amount": {
                    "year:2023:10": self.adult
                },
                "gov.contrib.ubi_center.basic_income.amount.person.by_age[3].amount": {
                    "year:2023:10": self.adult
                },
                "gov.contrib.ubi_center.basic_income.amount.person.by_age[4].amount": {
                    "year:2023:10": self.senior
                },
                "gov.contrib.ubi_center.basic_income.amount.person.disability": {
                    "year:2023:10": self.disabled
                },
            },
            "us",
        )
        if not return_amounts and not return_loss:
            return self.reform
        
        data = dict(reform=self.reform)

        if return_amounts:
            data["amounts"] = dict(
                young_child=self.young_child,
                older_child=self.older_child,
                adult=self.adult,
                senior=self.senior,
                disabled=self.disabled,
            )
        
        if return_loss:
            data["loss"] = self.mean_percentage_loss(
                self.young_child, self.older_child, self.adult, self.disabled
            )
        
        public_reform = Reform.from_dict(
            {
                path: value for path, value in self.reform.parameter_values.items()
                if path.startswith("gov")
            },
            "us",
        )
        data["id"] = public_reform.api_id
        data["reform"] = public_reform
        
        return data

In [23]:
import plotly.express as px
from policyengine_core.charts import *
from tqdm import tqdm
import numpy as np
import pandas as pd
from policyengine_core.reforms import Reform
import gc

def get_reform_api_id_with_rounded_values(api_id):
    parameter_values = Reform.from_api(api_id, "us").parameter_values
    for parameter_name in [
        "gov.contrib.ubi_center.basic_income.amount.person.by_age[0].amount",
        "gov.contrib.ubi_center.basic_income.amount.person.by_age[1].amount",
        "gov.contrib.ubi_center.basic_income.amount.person.by_age[2].amount",
        "gov.contrib.ubi_center.basic_income.amount.person.by_age[3].amount",
        "gov.contrib.ubi_center.basic_income.amount.person.by_age[4].amount",
        "gov.contrib.ubi_center.basic_income.amount.person.disability",
    ]:
        for time_period in parameter_values[parameter_name]:
            parameter_values[parameter_name][time_period] = round(
                parameter_values[parameter_name][time_period] / (10 * 12)
            ) * (10 * 12)

    new_reform = Reform.from_dict(parameter_values, "us")
    return new_reform.api_id

def process_flat_tax_rate(flat_tax_rate):
    try:
        policy = BlankSlatePolicy(flat_tax_rate, year=2024)
        data = policy.solve(return_amounts=True, return_loss=True)
        return {
            'flat_tax_rate': flat_tax_rate,
            'loss': data["loss"],
            'api_id': data["id"]
        }
    except Exception as e:
        print(f"Error processing flat_tax_rate {flat_tax_rate}: {str(e)}")
        return None

def main():
    flat_tax_rates = np.linspace(0.25, 0.45, 21)
    results = []

    for flat_tax_rate in tqdm(flat_tax_rates):
        result = process_flat_tax_rate(flat_tax_rate)
        if result:
            results.append(result)
        
        # Force garbage collection
        gc.collect()

    df = pd.DataFrame(results)
    df["rounded_ubi_api_id"] = df["api_id"].apply(get_reform_api_id_with_rounded_values)
    df.to_csv("optimisation_results.csv", index=False)

if __name__ == "__main__":
    main()

100%|██████████| 21/21 [2:31:55<00:00, 434.09s/it]  


In [24]:
fig = px.line(
    df.sort_values("flat_tax_rate"),
    x="flat_tax_rate",
    y="loss",
)

format_fig(fig).update_traces(
    # Set color to BLUE
    line=dict(color=BLUE_PRIMARY),
).update_layout(
    title="Mean percentage loss by flat tax rate",
    xaxis_title="Flat tax rate",
    yaxis_title="Loss",
    xaxis_tickformat=".0%",
    yaxis_tickformat=".1%",
)

In [25]:
import requests
import pandas as pd
import time

API = "https://api.policyengine.org"

def get_basic_income_amounts(policy_id):
    # API endpoint URL
    api_url = f"{API}/us/policy/{policy_id}"

    # Making a GET request to the API
    response = requests.get(api_url)

    if response.status_code == 200:
        # Parsing the JSON response
        policy_data = response.json()

        # Extracting and renaming the basic income amounts
        basic_income_amounts = {}
        age_group_mapping = {
            "gov.contrib.ubi_center.basic_income.amount.person.by_age[0].amount": "ubi_0_5",
            "gov.contrib.ubi_center.basic_income.amount.person.by_age[1].amount": "ubi_6_17",
            "gov.contrib.ubi_center.basic_income.amount.person.by_age[2].amount": "ubi_18_64",
            "gov.contrib.ubi_center.basic_income.amount.person.by_age[4].amount": "ubi_65_plus",
            "gov.contrib.ubi_center.basic_income.amount.person.disability": "ubi_disability"
        }

        for key, value in policy_data["result"]["policy_json"].items():
            if key in age_group_mapping:
                # Assuming the amount is the first value in the dictionary
                amount = next(iter(value.values()))
                basic_income_amounts[age_group_mapping[key]] = amount
        return basic_income_amounts
    else:
        return None

# Read the CSV file into a DataFrame
df = pd.read_csv('optimisation_results.csv')

def get_json_from_id(id):
    res = requests.get(f"{API}/us/economy/{id}/over/2?time_period=2024&region=enhanced_us").json()

    while res["status"] == "computing":
        time.sleep(5)
        res = requests.get(f"{API}/us/economy/{id}/over/2?time_period=2024&region=enhanced_us").json()
    
    return res["result"]

# Fetch and add basic income amounts for each reform ID
json_data = {}

from tqdm import tqdm

for i, row in tqdm(df.iterrows(), total=len(df)):
    reform_id = int(row['rounded_ubi_api_id'])
    amounts = get_basic_income_amounts(reform_id)
    if amounts:
        for column_name, amount in amounts.items():
            df.loc[i, column_name] = amount
    json_data[reform_id] = get_json_from_id(reform_id)

df.to_csv('ubi_amounts.csv', index=False)

with open("json_data.json", "w") as f:
    json.dump(json_data, f)

100%|██████████| 21/21 [4:08:48<00:00, 710.89s/it]  
