In [3]:
import os, shutil
os.chdir('\\\\Data1\\RES1\\WRK\\CHT\\w2025weo\\Ch3-Demographics\\Code_Review\\EH') # change the directory to your own

import numpy as np, pandas as pd, matplotlib.pyplot as plt
import copy, pickle, time
from Code import ss as steady_state, td as transition, jacobians as jac
from Code.calibration import load_initial_ss
from Code import production, government, demographics, household, equilibrium, calibration
from Code.gen_h_mult import gen_h_mult

# from importlib import reload
# from model import td as transition
# reload(transition)

path_data_inputs = "@Import/Data/input_data/"
path_intermediate_data = "@Import/Data/intermediate_data/"
path_calibration_targets = "@Import/Data/calibration_targets/"
path_calibrated_parameters = "@Import/Data/calibrated_parameters/"
path_cached_results = "@Import/Data/cached_results/"
path_model_outputs = "@Export/output_data/"

In [4]:
#pip install numba

In [5]:
#pip install openpyxl

Set up parameters indicating the scenario we want to run

In [6]:
migration = False                             # whether there's additional migration from LIC (True or False)
closed_economy = None                         # which country to simulate in financial autarky (None or 'LIC')
h_mult_scenario = 'Baseline'                  # the h_mult scenario ('Baseline' or 'Innovation' or 'Healthy aging' or 'Baseline_morehealth' or 'FLFP' or 'All' (closing FLFP gap & more health) or 'Demog_only' or 'No_innovation')
LIC_wedge = 0.03                              # the wedge applied to LIC's interest rate (percent / 100)
fertility_scenario = 'medium'                 # UNPP fertility scenario ('low', 'medium', or 'high')
alt_debt = False                              # whether debt level returns to the 16-19 average (True or False)
alt_ret = False                               # whether to run the retirement policy scenario (True or False)
FLFP_gap = None                               # by how much FLFP gap closes by 2040 (in percent): None, 50, 75, or 100
vintage_UNPP = False                          # whether to use vintage UNPP (True or False)
sigma = 2                                     # 1 / IES; default is 2

LIC_integration = 'baseline'                  # the speed of LIC financial integration: 'faster', 'slower', or 'baseline'

Construct calibration targets

In [7]:
from Code import isocode
cs_full = sorted(isocode.isocode_list_full)
cs_full.append('LIC')

In [8]:
df_w = pd.read_csv(path_calibration_targets + "targets_global.csv")

if df_w.loc[0,'sigma'] != sigma:

  df_w.loc[0,'sigma'] = sigma
  df_w.to_csv(path_calibration_targets + "targets_global.csv", index=False)

Calibrate the model

In [None]:
recalibrate = True

suffix = '_mig' if migration is True else ''
suffix += '_autarky' if closed_economy is not None else ''
suffix += '_' + h_mult_scenario.replace(" ", "_") if h_mult_scenario != 'Baseline' else ''
suffix += '_FLFP' + str(FLFP_gap) if FLFP_gap is not None else ''
suffix += f"_{fertility_scenario}" if fertility_scenario != "medium" else ""
suffix += '_altdebt' if alt_debt is True else ''
suffix += '_altret' if alt_ret is True else ''
suffix += '_vintage_UNPP' if vintage_UNPP is True else ''
suffix += f'_IES{sigma}' if sigma != 2 else ''

if recalibrate:
  if not os.path.exists(path_calibrated_parameters + f"params_calib{suffix}.csv"):    #"params_calib_LIC_w_gamma_varBY3.csv"
    df_calib = calibration.calib_ss(case='homothetic', LIC_wedge=LIC_wedge)
    df_calib.to_csv(path_calibrated_parameters + f"params_calib{suffix}.csv")
  else:
    shutil.copy2(path_calibrated_parameters + f"params_calib{suffix}.csv",
                path_calibrated_parameters + "params_calib_homothetic_varyxi.csv")

Load objects from the calibration

In [10]:
# Initial steady state
r_init, gamma, hh_init, prod_init, pop_init, gov_init, ss_init = load_initial_ss(case='homothetic', LIC_wedge=LIC_wedge)
J = len(hh_init['USA'].h_adj)                                         # number of age bins

# Keep only one country if need to run the autarky scenario
if closed_economy is not None:
  for var_name in ['hh_init', 'prod_init', 'pop_init', 'gov_init', 'ss_init']:
    globals()[var_name] = {k: v for k, v in globals()[var_name].items() if k in [closed_economy]}
    r_init += LIC_wedge

cs = list(hh_init)                                                    # ISO code list for the countries in the model

Our standard exogenous shifters: government and population (need both terminal and transition).

In [11]:
# Population shifter based on targets on the transition path ("targets_trans_countries***.csv")
pop = demographics.load_poptrans(migration=migration, closed_economy=closed_economy,
                                 fertility_scenario=fertility_scenario,vintage_UNPP=vintage_UNPP)

T = len(pop['LIC'].N)                                                 # number of years in the transition
pop_term = {c: pop[c].get_ss_terminal() for c in cs}                  # terminal population

In [12]:
# Government shifter

# Import B/Y paths
B_Y_path = pd.read_csv("@Import/Data/intermediate_data/debt_gdp_w_LIC.csv")
year_max, year_min = B_Y_path['year'].max(), B_Y_path['year'].min()
num_years = year_max - year_min + 1
B_Y_end = B_Y_path.query('year == @year_max').set_index('isocode')['debt_gdp']                          # B/Y in the last year of the data provided (2029)
B_Y_16_19 = B_Y_path.query('@year_min <= year <= @year_min+3').groupby('isocode')['debt_gdp'].mean()    # average B/Y between 2016-19
years_to_extend = range(year_max+1, year_max+1+T+1-num_years)
slope = (B_Y_16_19 - B_Y_end) / 10                                                                      # the slope of an alternative B/Y path returning to the 16-19 avg in 10 years

# Fill the B/Y path between 2030 and 2400
new_rows = []
for country in B_Y_path['isocode'].unique():
    for year in years_to_extend:
        # Stay at the 2029 B/Y level if using the baseline path, or if the 2029 level <= the 2016-19 avg
        if not alt_debt or B_Y_end[country] <= B_Y_16_19[country]:
            new_rows.append([country, year, B_Y_end[country]])

        # Otherwise, decrease linearly to the 2016-19 avg
        else:
            B_Y_curr = max(B_Y_end[country] + slope[country] * (year - year_max), B_Y_16_19[country])
            new_rows.append([country, year, B_Y_curr])

B_Y_extended = pd.DataFrame(new_rows, columns = B_Y_path.columns.to_list())
B_Y_final = pd.concat([B_Y_path, B_Y_extended], ignore_index=True).sort_values(by=['isocode', 'year']).set_index('isocode')

# Import retirement age paths
ret_path = pd.read_excel("@Import/Data/intermediate_data/_Tr_Scenario1.xlsx")[['isocode','year','Tr_1']]
year_max, year_min = ret_path['year'].max(), ret_path['year'].min()
ret_max = ret_path.query('year == @year_max').set_index('isocode')['Tr_1']
ret_min = ret_path.query('year == @year_min').set_index('isocode')['Tr_1']
ret_increase = ret_max - ret_min
years_to_extend = range(year_max+1, 2401)

# Fill the retirement age path between 2101 and 2400
for country in ret_path['isocode'].unique():
  for year in years_to_extend:
    ret_path = pd.concat([ret_path,
      pd.DataFrame([[country, year, ret_max[country]]], columns=ret_path.columns.to_list())])

ret_path = ret_path.sort_values(by=['isocode', 'year']).set_index('isocode')

# Construct the government shifter
gov = {}
gov_term = copy.deepcopy(gov_init)
for c in cs:
    B_Y_curr = B_Y_final.loc[c,'debt_gdp'].to_list()
    if alt_ret:
      ret_curr = ret_path.loc[c,'Tr_1'].to_list()
      gov[c] = gov_init[c].ss_to_td(T, B_Y=B_Y_curr, ret_path=ret_curr)
    elif c in ('LIC', 'IND'):
      gov[c] = gov_init[c].ss_to_td(T, B_Y=B_Y_curr)
    else:
      gov[c] = gov_init[c].ss_to_td(T, age_increase=5, years_increase=60, B_Y=B_Y_curr)

    gov[c].adjust_rule = 'all'

    gov_term[c].adjust_rule = 'all'
    years_adj = (5 if not alt_ret and c not in ('LIC', 'IND')
                else 0 if not alt_ret and c in ('LIC', 'IND')
                else ret_increase[c])
    gov_term[c].rho = gov_term[c].adjust_rho(gov_term[c].rho, years_adj)

# Update B_Y for the gov object
gov_term = {c: gov_term[c].update_B_Y(B_Y_end[c]) for c in gov_term}

Now we have a new shifter to the household labor supply, which leads to a new terminal steady state:

In [13]:
FE_dta = pd.read_excel(path_intermediate_data + "_Input_For_MODEL.xlsx").set_index('iso3code')

# Average out the FEs for CHN and IND
avg_CHN_IND = (FE_dta.loc['CHN']['_cons_fe'] + FE_dta.loc['IND']['_cons_fe']) / 2
FE_dta.loc['CHN','_cons_fe'] = avg_CHN_IND
FE_dta.loc['IND','_cons_fe'] = avg_CHN_IND

# Fixed effects
FE_convergence = {c: FE_dta.loc[c]['_cons_fe'] if c != 'USA' else 0 for c in cs_full}
beta_conv = FE_dta.loc['AUS']['_beta_distance']
beta_pop = FE_dta.loc['AUS']['_beta_demo']
beta_pop_lag = FE_dta.loc['AUS']['_beta_demolag']
beta_inter = FE_dta.loc['AUS']['_beta_interac']

h_mult_mtx = gen_h_mult(case=h_mult_scenario,beta_conv=beta_conv,beta_pop=beta_pop,
                        beta_pop_lag=beta_pop_lag,beta_inter=beta_inter,FE_conv=FE_convergence,
                        FLFP_gap=FLFP_gap,LIC_wedge=LIC_wedge,migration=migration,
                        vintage_UNPP=vintage_UNPP,fertility_scenario=fertility_scenario)

h_mult = {h_mult_scenario: h_mult_mtx}

# Update the terminal hosuehold object with the terminal h_mult matrix
hh_term = {c: hh_init[c].update_h(h_mult_mtx[c][-1]) for c in cs}

Calculate terminal interest rate and other objects

In [None]:
r_term = steady_state.solve_world_ss(hh_term, pop_term, gov_term, prod_init, gamma, rmin=0, rmax=0.036)

# Store the terminal interest rate in the specified CSV file, which will be used by calculate_terminal_fakenews()
cached_r_term = {'Autarky': r_term} if closed_economy is not None else {'Baseline': r_term}
pd.DataFrame(list(cached_r_term.items())).to_csv("@Import/Data/cached_results/cached_r_terms.csv", index=False, header=False)

ss_term, prod_term = {}, {}
for c in cs:
    ss_term[c], _, prod_term[c] = steady_state.calculate_ss(
        hh_term[c], pop_term[c], gov_term[c], prod_init[c], r_term, gamma)

Calculate the terminal fake news matrix for transition simulation

In [15]:
fixed_debt = True if np.isscalar(gov['LIC'].B_Y) else False

suffix = '_mig' if migration is True else ''
suffix += '_autarky' if closed_economy is not None else ''
suffix += '_' + h_mult_scenario.replace(" ", "_") if h_mult_scenario != 'Baseline' else ''
suffix += '_FLFP' + str(FLFP_gap) if FLFP_gap is not None else ''
suffix += f"_{fertility_scenario}" if fertility_scenario != "medium" else ""
suffix += '_altdebt' if alt_debt is True else ''
suffix += '_altret' if alt_ret is True else ''
suffix += '_vintage_UNPP' if vintage_UNPP is True else ''
suffix += f"_{LIC_integration}_integration" if LIC_integration != "baseline" else ''
suffix += '_baseline' if suffix == '' else ''
suffix += f'_IES{sigma}' if sigma != 2 else ''

common_args = {
    'h_mult': h_mult,
    'calibration_options': {'case': 'homothetic'},
    'fixed_debt': fixed_debt,
    'migration': migration,
    'closed_economy': closed_economy,
    'fertility_scenario': fertility_scenario,
    'alt_debt': alt_debt,
    'alt_ret': alt_ret,
    'FLFP_gap': FLFP_gap,
    'vintage_UNPP': vintage_UNPP,
    'LIC_wedge': LIC_wedge,
    'sigma': sigma,
    'LIC_integration': LIC_integration
}

Fs = jac.calculate_terminal_fakenews(**common_args)

Jhh = ({c: jac.make_household_jacobian(Fs[c], r_term, ss_term[c]['W'], hh_term[c],
      gov_term[c], pop_term[c], prod_term[c], T) for c in prod_term})

Compute the entire transition path

In [None]:
#cs_small = ['USA', 'CHN', 'JPN', 'LIC', 'DEU'] #, 'IND'] # smaller set of countries to speed up
tol = 1e-5 if closed_economy is not None else 1e-4

r, W_Y, NFA_Y, K_Y, Y, PB_Y, C_Y, Inc_Y, G_Y, tau, d_bar = transition.solve_world_td_full(
    pop, gov, r_term, ss_term, cs=cs, tol=tol, **common_args)

In [None]:
save_soe = False

if save_soe or suffix == '_baseline' or (h_mult_scenario == 'All' and sigma == 2):
  r_soe = {c: np.full(T, r_init) if c not in ('CHN','IND','LIC') else np.full(T, r_init) + LIC_wedge for c in cs}
  W_Y_soe, NFA_Y_soe, K_Y_soe, Y_soe, PB_Y_soe, C_Y_soe, Inc_Y_soe, G_Y_soe, tau_soe, d_bar_soe = {}, {}, {}, {}, {}, {}, {}, {}, {}, {}

  hh_td = {c: hh_init[c].update_h(h_mult_mtx[c]) for c in cs}
  for c in cs:
    (W_Y_soe[c], NFA_Y_soe[c], K_Y_soe[c], Y_soe[c], PB_Y_soe[c], C_Y_soe[c],
     Inc_Y_soe[c], G_Y_soe[c], tau_soe[c], d_bar_soe[c]) = (
        transition.calculate_country_soe_td(
        r_soe[c], pop[c], gov[c], ss_init[c], ss_term[c], hh_td[c], prod_init[c],
        gamma, Jhh[c][T-1:, T-1:], tol)
        )

Export the transition path

In [None]:
population = {c: pop[c].N for c in pop}
pop_growth = {c: pop[c].n for c in pop}
pop_working = {c: pop[c].N - pop[c].get_Nret(gov[c].rho) for c in pop}

paths = ({'r': r, 'W_Y': W_Y, 'NFA_Y': NFA_Y, 'K_Y': K_Y, 'Y': Y, 'PB_Y': PB_Y,
          'C_Y': C_Y, 'Inc_Y': Inc_Y,'G_Y': G_Y, 'tau': tau, 'd_bar': d_bar,
          'pop': population, 'pop_growth': pop_growth, 'pop_working': pop_working})
if save_soe  or suffix == '_baseline' or (h_mult_scenario == 'All' and sigma == 2):
  paths.update({'r_soe': r_soe, 'W_Y_soe': W_Y_soe, 'NFA_Y_soe': NFA_Y_soe,
                'K_Y_soe': K_Y_soe, 'Y_soe': Y_soe, 'PB_Y_soe': PB_Y_soe,
                'C_Y_soe': C_Y_soe, 'Inc_Y_soe': Inc_Y_soe,'G_Y_soe': G_Y_soe,
                'tau_soe': tau_soe, 'd_bar_soe': d_bar_soe})

suffix = '_mig' if migration is True else ''
suffix += '_autarky' if closed_economy is not None else ''
suffix += '_' + h_mult_scenario.replace(" ", "_") if h_mult_scenario != 'Baseline' else ''
suffix += '_FLFP' + str(FLFP_gap) if FLFP_gap is not None else ''
suffix += f"_{fertility_scenario}" if fertility_scenario != "medium" else ""
suffix += '_altdebt' if alt_debt is True else ''
suffix += '_altret' if alt_ret is True else ''
suffix += '_vintage_UNPP' if vintage_UNPP is True else ''
suffix += f"_{LIC_integration}_integration" if LIC_integration != "baseline" else ''
suffix += '_baseline' if suffix == '' else ''
suffix += f'_IES{sigma}' if sigma != 2 else ''

pickle.dump(paths, open(f'@Import/Data/cached_results/paths{suffix}.pkl', 'wb'))

# Generate Model outputs

In [None]:
# Load model output data and append the historical data sets
paths_here = pickle.load(open(f'@Import/Data/cached_results/paths{suffix}.pkl', 'rb'))
paths = copy.deepcopy(paths_here)
cs = list(paths_here['Y'].keys())

if type(paths['r']) != dict:
  # Add the exogenous wedge path for LIC back to the LIC interest rate
  # Define the range of years
  start_year = 2016
  end_year = 2075
  final_year = 2400

  # Create an array of target years
  target_years = np.arange(start_year, final_year + 1)

  # A wedge that declines linearly to 0
  interpolated_rates = np.where(
      (target_years >= start_year) & (target_years <= end_year),
      LIC_wedge - LIC_wedge * (target_years - start_year) / (end_year - start_year),
      0,
  )

  # Add the wedge if running an integrated economy scenario
  if closed_economy is None:
    paths['r'] = ({c: paths['r'] if c not in ('LIC','CHN','IND') else
                            paths['r'] + interpolated_rates for c in cs})
  else:
    paths['r'] = {c: paths['r'] for c in cs}

# Create DataFrame to be exported
df = pd.DataFrame()
var_list = ['r','Y','NFA_Y','PB_Y','G_Y','C_Y','Inc_Y','tau','d_bar','pop','pop_growth','pop_working']
if "r_soe" in paths:
  var_list = np.append(var_list,['r_soe','Y_soe','PB_Y_soe'])

for var in var_list:
    df_var = pd.DataFrame()  # Initialize DataFrame for the current variable
    for c in cs:  # Iterate through countries
        # Convert data to Series, create DataFrame, and add country code
        df_c = pd.DataFrame({'isocode': c,
                            'year': range(2016, 2016 + len(paths[var][c])),
                            var: paths[var][c]
                            })
        df_var = pd.concat([df_var, df_c], ignore_index=True)  # Append to variable's DataFrame

    # if var == 'NFA_Y':
    #     df_var = pd.concat([df_var, NFA_Y_hist], ignore_index=True)  # Add historical data for NFA_Y

    df = df.merge(df_var, on=['isocode', 'year'], how='outer') if not df.empty else df_var  # Merge into main DataFrame

# Save in the @Export folder
df_all = df.sort_values(by=['isocode', 'year'])
#df_all = pd.merge(df_all,pop_hist,on=['isocode','year'],how='outer')
if closed_economy is not None:
  df_all = df_all[df_all['isocode'] == closed_economy]
df_all.to_csv("Data/" + f"{suffix}.csv", index=False)