## Introduction

This is the notebook file to replicate our macroeconometrics approach. This notebook does not contain the `blackmarblepy` application. Source data is from `blackmarblepy` for nightlight, and [BPS](https://www.bps.go.id/id/statistics-table/2/NTQwIzI=/-seri-2010--4--laju-pertumbuhan--y-on-y--pdrb-atas-dasar-harga-konstan-menurut-pengeluaran--2010-100---persen-.html). If you happens to find any issues, you can find Tim via timothy.ginting@dewanekonomi.go.id

You will see the following sections in this notebook:

1. Real GDP and quarterly night light index (NTL) graph;
2. OLS and residuals;
3. ADF test and Johansen Cointegration test;
4. VECM graph;
5. VAR graph; and
6. ARDL graph.

We do those steps for both quarterly dataset and growth dataset.

We are still working on the regional regression.

In [None]:
import pandas as pd
from pandas.tseries.offsets import QuarterEnd
import numpy as np
import statsmodels.api as sm
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import seaborn as sns
import statsmodels.api as sm
from statsmodels.tsa.vector_ar.vecm import VECM, select_order, select_coint_rank
from statsmodels.tsa.api import VAR
from statsmodels.tsa.stattools import adfuller
from statsmodels.tsa.ardl import ARDL
from sklearn.metrics import mean_squared_error
from sklearn.linear_model import Ridge
import statsmodels.formula.api as smf
from statsmodels.tsa.ardl import ardl_select_order
from datetime import datetime
import re
import io
import os
pd.options.display.max_seq_items = 4000 ## This is only for cosmetics.

## Quarterly Real GDP vs Quarterly NTL.

### Dataset

turn on the last line to see the dataframe.

In [None]:
## Data prep
### Creating data
ntl=pd.read_excel('data/ntl_monthly_avg_2012-2025.xlsx')
gdp=pd.read_excel('data/GDP_YoY_Quarterly_12_25.xlsx')

### Make time index
ntl.Date=pd.to_datetime(ntl['Date'])
ntl['qtr']=ntl['Date'].dt.quarter
ntl['year']=ntl['Date'].dt.year

### Averaging the radiance into quarterly, make it yoy quarterly growth
ntl=ntl.groupby(['year','qtr'])['NTL_Radiance'].mean().reset_index()
ntl['Date']=pd.date_range(start='2012-01-01', periods=len(ntl), freq='QE')
ntl=ntl[['Date','NTL_Radiance']]
ntl['g']=np.log(gdp['GDP'])
ntl['ntlg']=np.log(ntl['NTL_Radiance'])

### Creating dummy quarterly and dummy covid
ntl['q1']=np.where(ntl['Date'].dt.quarter==1,1,0)
ntl['q2']=np.where(ntl['Date'].dt.quarter==2,1,0)
ntl['q3']=np.where(ntl['Date'].dt.quarter==3,1,0)
ntl['q4']=np.where(ntl['Date'].dt.quarter==4,1,0)
ntl['covid']=np.where((ntl['Date'].dt.year>=2020) & (ntl['Date'].dt.year<=2022),1,0)
ntl['scar']=np.where((ntl['Date'].dt.year>=2020) ,1,0)

### Back to making time index
ntl=ntl.dropna().reset_index(drop=True)
ntl=ntl.set_index('Date')
ntl=ntl.asfreq('QE-DEC')
ntlm = ntl.copy()

### OLS for National Data

We run an ols for the national data

In [None]:
## OLS-ing

mod=sm.OLS(ntl['g'], sm.add_constant(ntl[['ntlg']])).fit()
ntl['resid']=mod.resid
ntl['ols']=mod.predict()

# Export OLS results to CSV and Markdown
from tabulate import tabulate

def ols_to_dataframe(model):
    """Create a DataFrame with OLS regression results"""
    data = []
    
    # Coefficients with standard errors
    for var in model.params.index:
        coef = model.params[var]
        se = model.bse[var]
        pval = model.pvalues[var]
        
        # Add significance stars
        stars = ''
        if pval < 0.01: stars = '***'
        elif pval < 0.05: stars = '**'
        elif pval < 0.1: stars = '*'
        
        data.append({'Variable': var, 'Coefficient': f"{coef:.4f}{stars}"})
        data.append({'Variable': '', 'Coefficient': f"({se:.4f})"})
    
    # Add statistics
    data.append({'Variable': 'Observations', 'Coefficient': str(int(model.nobs))})
    data.append({'Variable': 'R-squared', 'Coefficient': f"{model.rsquared:.4f}"})
    data.append({'Variable': 'Adj. R-squared', 'Coefficient': f"{model.rsquared_adj:.4f}"})
    data.append({'Variable': 'F-statistic', 'Coefficient': f"{model.fvalue:.2f}"})
    
    return pd.DataFrame(data)

# Create DataFrame and export
ols_results_df = ols_to_dataframe(mod)

# Save to CSV
ols_results_df.to_csv("reg/ols_results.csv", index=False)

# Save to Markdown
markdown_table = tabulate(ols_results_df, headers='keys', tablefmt='pipe', showindex=False)
with open("reg/ols_results.md", "w") as f:
    f.write(markdown_table)
    f.write("\n\n: OLS Regression Results for log real quarterly GDP {#tbl-ols}")

In [None]:
# Plotting GDP Growth and Night light growth side by side

fig, (ax1, ax2) = plt.subplots(nrows=1, ncols=2, figsize=(10, 4))
ntlm=ntl[4:]
ax1.plot(ntlm['g'],color='#f0a22e',marker='o', linestyle='-')
ax1.set_title('(a) Quarterly real GDP')

ax2.plot(ntlm['ntlg'], linestyle='-', color='#f0a22e',marker='o')
ax2.set_title('(b) Quarterly night light')

plt.tight_layout()
plt.savefig("fig/figQ.png") # Turn off to not save, or change file name to save in your preferred location
plt.show()

### OLS and residuals

In [1]:
# OLS results and plotting residuals
ntl=ntlm
print(mod.summary()) ## Checking agian the OLS results
fig, (ax1, ax2) = plt.subplots(nrows=1, ncols=2, figsize=(10, 4))

ax1.plot(ntlm['g'],color='#f0a22e',linestyle="-",label="observed GDP Growth")
ax1.plot(ntlm['ols'],color='#a5644e',linestyle="--",label="OLS-fitted GDP Growth")
ax1.set_title('(a) Quarterly Real GDP')
ax1.legend()

ax2.plot(ntlm['resid'], linestyle='-', color='#f0a22e')
ax2.set_title('(b) OLS Residuals')

plt.tight_layout()
plt.savefig("fig/Qols.png") # Turn off to not save, or change file name to save in your preferred location
plt.show()

NameError: name 'ntlm' is not defined

## ARDL with quarterly dataset

This is for ARD with quarterly dataset. We first subset the data from the original `ntl` object, then loop the 6 specifications. The first code generate the 6 panel graphs. We then use the next cell to save regression tables and lastly we try splitting the observation into training and testing.

In [None]:
en=ntl[['g']]
ex=ntl[['ntlg']]
exc=ntl[['ntlg','covid']]
exs=ntl[['ntlg','scar']]
exq=ntl[['ntlg','q1','q2','q3']]
exqc=ntl[['ntlg','q1','q2','q3','covid']]
exqs=ntl[['ntlg','q1','q2','q3','scar']]

lags = ardl_select_order(endog=en, exog=ex, maxlag=4,maxorder=4, ic='aic',seasonal=False)
ve = ARDL(endog=en,lags=lags.ar_lags,exog=ex,order=lags.dl_lags,trend='ct').fit()
lags = ardl_select_order(endog=en, exog=exc, maxlag=4,maxorder=4, ic='aic',seasonal=False)
vec= ARDL(endog=en,lags=lags.ar_lags,exog=exc,order=lags.dl_lags,trend='ct').fit()
lags = ardl_select_order(endog=en, exog=exs, maxlag=4,maxorder=4, ic='aic',seasonal=False)
ves= ARDL(endog=en,lags=lags.ar_lags,exog=exs,order=lags.dl_lags,trend='ct').fit()
lags = ardl_select_order(endog=en, exog=exq, maxlag=4,maxorder=4, ic='aic',seasonal=False)
veq= ARDL(endog=en,lags=lags.ar_lags,exog=exq,order=lags.dl_lags,trend='ct').fit()
lags = ardl_select_order(endog=en, exog=exqc, maxlag=4,maxorder=4, ic='aic',seasonal=False)
veqc= ARDL(endog=en,lags=lags.ar_lags,exog=exqc,order=lags.dl_lags,trend='ct').fit()
lags = ardl_select_order(endog=en, exog=exqs, maxlag=4,maxorder=4, ic='aic',seasonal=False)
veqs= ARDL(endog=en,lags=lags.ar_lags,exog=exqs,order=lags.dl_lags,trend='ct').fit() # This looks too good to be true

models = {'fve': ve, 'fvec': vec,'fves': ves, 'fveq': veq,'fveqc': veqc, 'fveqs': veqs}
results = {}

for name, model in models.items():
    fitted = pd.DataFrame(model.fittedvalues, columns=['g_fitted'])
    merged = pd.merge(en, fitted, left_index=True, right_index=True, how='left')
    results[name] = merged

fig, ax = plt.subplots(3,2,figsize=(12, 12))

ax[0,0].plot(results['fve']['g'],color='#f0a22e',linestyle='-',label="Actual log real GDP")
ax[0,0].plot(results['fve']['g_fitted'], linestyle='--', color='#a5644e', label="Fitted log real GDP")
ax[0,0].set_title('(a) ARDL')
ax[0,0].legend()

ax[0,1].plot(results['fvec']['g'],color='#f0a22e',linestyle='-',label="Actual log real GDP")
ax[0,1].plot(results['fvec']['g_fitted'], linestyle='--', color='#a5644e', label="Fitted log real GDP")
ax[0,1].set_title('(b) ARDL+Covid')
ax[0,1].legend()

ax[1,0].plot(results['fves']['g'],color='#f0a22e',linestyle='-',label="Actual log real GDP")
ax[1,0].plot(results['fves']['g_fitted'], linestyle='--', color='#a5644e', label="Fitted log real GDP")
ax[1,0].set_title('(c) ARDL+Scarring')
ax[1,0].legend()

ax[1,1].plot(results['fveq']['g'],color='#f0a22e',linestyle='-',label="Actual log real GDP")
ax[1,1].plot(results['fveq']['g_fitted'], linestyle='--', color='#a5644e', label="Fitted log real GDP")
ax[1,1].set_title('(d) ARDL+Quarterly')
ax[1,1].legend()

ax[2,0].plot(results['fveqc']['g'],color='#f0a22e',linestyle='-',label="Actual log real GDP")
ax[2,0].plot(results['fveqc']['g_fitted'], linestyle='--', color='#a5644e', label="Fitted log real GDP")
ax[2,0].set_title('(e) ARDL+Q+C')
ax[2,0].legend()

ax[2,1].plot(results['fveqs']['g'],color='#f0a22e',linestyle='-',label="Actual log real GDP")
ax[2,1].plot(results['fveqs']['g_fitted'], linestyle='--', color='#a5644e', label="Fitted log real GDP")
ax[2,1].set_title('(f) ARDL+Q+S')
ax[2,1].legend()
plt.tight_layout()
plt.savefig("fig/ARDLQ.png") # Turn off to not save, or change file name to save in your preferred location
plt.show()

In [None]:

def ardl_to_dataframe(models_dict, model_names=None):
    """
    Create a DataFrame with regression results for Quarto/export
    """
    if model_names is None:
        model_names = list(models_dict.keys())
    
    # Collect all unique variable names across models
    all_vars = set()
    for name in model_names:
        model = models_dict[name]
        all_vars.update(model.params.index)
    
    # Sort variables: const, trend, then AR lags, then exog lags
    def sort_key(v):
        if v == 'const': return (0, v)
        if v == 'trend': return (1, v)
        if v.startswith('g.L'): return (2, int(v.split('L')[1]))
        return (3, v)
    all_vars = sorted(all_vars, key=sort_key)
    
    # Nice column names for display
    col_labels = {
        'fve': 'Baseline', 'fvec': '+Covid', 'fves': '+Scar',
        'fveq': '+Quarterly', 'fveqc': '+Q+C', 'fveqs': '+Q+S'
    }
    
    # Build coefficient rows (with stars) and SE rows
    data = []
    for var in all_vars:
        coef_row = {'Variable': var}
        se_row = {'Variable': ''}
        for name in model_names:
            col_name = col_labels.get(name, name)
            model = models_dict[name]
            if var in model.params.index:
                coef = model.params[var]
                se = model.bse[var]
                pval = model.pvalues[var]
                stars = '***' if pval < 0.01 else ('**' if pval < 0.05 else ('*' if pval < 0.1 else ''))
                coef_row[col_name] = f"{coef:.4f}{stars}"
                se_row[col_name] = f"({se:.4f})"
            else:
                coef_row[col_name] = ''
                se_row[col_name] = ''
        data.append(coef_row)
        data.append(se_row)
    
    # Add statistics rows
    for stat_name, stat_func in [('Observations', lambda m: str(int(m.nobs))),
                                   ('AIC', lambda m: f"{m.aic:.2f}"),
                                   ('BIC', lambda m: f"{m.bic:.2f}")]:
        stat_row = {'Variable': stat_name}
        for name in model_names:
            col_name = col_labels.get(name, name)
            stat_row[col_name] = stat_func(models_dict[name])
        data.append(stat_row)
    
    df = pd.DataFrame(data)
    return df

# Create DataFrame
ardl_results_df = ardl_to_dataframe(models, ['fve', 'fvec', 'fves', 'fveq', 'fveqc', 'fveqs'])

# === OPTION 1: Save to CSV for Quarto ===
ardl_results_df.to_csv("reg/ardl_results.csv", index=False)
print("Saved to reg/ardl_results.csv")

# === OPTION 2: Markdown table (for Quarto pipe tables) ===
from tabulate import tabulate
markdown_table = tabulate(ardl_results_df, headers='keys', tablefmt='pipe', showindex=False)
print("\n--- Markdown Table (copy to .qmd) ---")
print(markdown_table)
print("\n: ARDL Regression Results {#tbl-ardl}")

# === OPTION 3: LaTeX table ===
latex_table = ardl_results_df.to_latex(index=False, escape=False, column_format='l' + 'c'*6)
print("\n--- LaTeX Table ---")
print(latex_table)

# === OPTION 4: Display in notebook ===
ardl_results_df

In [None]:
# === ARDL with Train/Test Split ===
# Training: before 2024Q1, Testing: 2024Q1 onwards

train_end = pd.Timestamp("2023-12-31")

# Split endogenous variable
en_full = ntl[['g']]
en_train = en_full.loc[:train_end]
en_test = en_full.loc[train_end:].iloc[1:]  # exclude train_end itself

# Split exogenous variables
ex_full = ntl[['ntlg']]
exc_full = ntl[['ntlg','covid']]
exs_full = ntl[['ntlg','scar']]
exq_full = ntl[['ntlg','q1','q2','q3']]
exqc_full = ntl[['ntlg','q1','q2','q3','covid']]
exqs_full = ntl[['ntlg','q1','q2','q3','scar']]

# Training exogenous
ex_train = ex_full.loc[:train_end]
exc_train = exc_full.loc[:train_end]
exs_train = exs_full.loc[:train_end]
exq_train = exq_full.loc[:train_end]
exqc_train = exqc_full.loc[:train_end]
exqs_train = exqs_full.loc[:train_end]

# Testing exogenous (for out-of-sample forecast)
ex_test = ex_full.loc[train_end:].iloc[1:]
exc_test = exc_full.loc[train_end:].iloc[1:]
exs_test = exs_full.loc[train_end:].iloc[1:]
exq_test = exq_full.loc[train_end:].iloc[1:]
exqc_test = exqc_full.loc[train_end:].iloc[1:]
exqs_test = exqs_full.loc[train_end:].iloc[1:]

# Dictionary to store all specs
specs = {
    'baseline': (ex_train, ex_test, ex_full),
    'covid': (exc_train, exc_test, exc_full),
    'scar': (exs_train, exs_test, exs_full),
    'q': (exq_train, exq_test, exq_full),
    'q_covid': (exqc_train, exqc_test, exqc_full),
    'q_scar': (exqs_train, exqs_test, exqs_full),
}

results_oos = {}
n_train = len(en_train)
n_test = len(en_test)

for spec_name, (exog_train, exog_test, exog_full) in specs.items():
    try:
        # Select optimal lags on training data
        lags = ardl_select_order(endog=en_train, exog=exog_train, maxlag=4, maxorder=4, ic='aic', seasonal=False)
        
        # Fit ARDL on training data only
        model = ARDL(endog=en_train, lags=lags.ar_lags, exog=exog_train, order=lags.dl_lags, trend='ct').fit()
        
        # Get fitted values (in-sample)
        fitted_vals = model.fittedvalues
        
        # Forecast out-of-sample
        forecast_vals = model.predict(
            start=n_train,
            end=n_train + n_test - 1,
            exog_oos=exog_test
        )
        
        # Store results
        results_oos[spec_name] = {
            'actual_train': en_train['g'],
            'actual_test': en_test['g'],
            'fitted': fitted_vals,
            'forecast': forecast_vals,
            'model': model
        }
        
        # Calculate MAE and RMSE for out-of-sample
        errors = forecast_vals.values - en_test['g'].values
        mae = np.abs(errors).mean()
        rmse = np.sqrt((errors**2).mean())
        results_oos[spec_name]['mae'] = mae
        results_oos[spec_name]['rmse'] = rmse
        
        print(f"{spec_name}: MAE={mae:.4f}, RMSE={rmse:.4f}")
        
    except Exception as e:
        print(f"Error fitting {spec_name}: {e}")
        continue

# === Plotting ===
fig, ax = plt.subplots(3, 2, figsize=(12, 12))
spec_list = ['baseline', 'covid', 'scar', 'q', 'q_covid', 'q_scar']
titles = ['(a) ARDL', '(b) ARDL+Covid', '(c) ARDL+Scarring', 
          '(d) ARDL+Quarterly', '(e) ARDL+Q+C', '(f) ARDL+Q+S']
positions = [(0,0), (0,1), (1,0), (1,1), (2,0), (2,1)]

for spec_name, title, pos in zip(spec_list, titles, positions):
    if spec_name not in results_oos:
        continue
    
    res = results_oos[spec_name]
    i, j = pos
    
    # Plot actual (full series)
    ax[i,j].plot(en_full.index, en_full['g'], color='#f0a22e', linestyle='-', 
                 label="Actual log(Real GDP)", linewidth=1.5)
    
    # Plot fitted (in-sample only)
    ax[i,j].plot(res['fitted'].index, res['fitted'], linestyle='--', color='#a5644e', 
                 label="Fitted (Training)", linewidth=1.5)
    
    # Plot forecast (out-of-sample)
    ax[i,j].plot(res['forecast'].index, res['forecast'], linestyle='-', color='#a19574', 
                 marker='o', markersize=4, label="Forecast (Test)", linewidth=1.5)
    
    # Add vertical line at train/test split
    ax[i,j].axvline(x=train_end, color='gray', linestyle=':', alpha=0.7, label='Train/Test Split')
    
    # Shade the forecast region
    ax[i,j].axvspan(en_test.index[0], en_test.index[-1], color='gray', alpha=0.1)
    
    ax[i,j].set_title(f"{title}\nMAE={res['mae']:.4f}, RMSE={res['rmse']:.4f}")
    ax[i,j].legend(loc='upper left', fontsize=8)
    ax[i,j].set_ylabel("log(Real GDP)")
    
    # Format x-axis
    ax[i,j].xaxis.set_major_locator(mdates.YearLocator(2))
    ax[i,j].xaxis.set_major_formatter(mdates.DateFormatter('%Y'))
    ax[i,j].tick_params(axis='x', rotation=45)

plt.suptitle("ARDL Models: Training vs Out-of-Sample Forecast (Test: 2024Q1 onwards)", fontsize=12, y=1.02)
plt.tight_layout()
plt.savefig("fig/ARDL_train_test_forecast.png", dpi=200, bbox_inches='tight')
plt.show()

print(f"\nTraining period: {en_train.index[0].strftime('%Y-%m')} to {en_train.index[-1].strftime('%Y-%m')}")
print(f"Testing period: {en_test.index[0].strftime('%Y-%m')} to {en_test.index[-1].strftime('%Y-%m')}")

In [None]:
## To see the regression table
### Available models are baseline, covid, scar, q, q_covid, and q_scar
results_oos['q_scar']['model'].summary()



### ADF test of the series and residuals.

This was used to test VAR. However, VAR shows no interesting NTL impact. THat is, the model for gdp essentially become AR.

In [None]:
## ADF Test for g, ntlg and OLS residuals
from tabulate import tabulate

def adf_test(series, name=""):
    """
    Perform ADF test and return results as dict
    """
    result = adfuller(series.dropna(), maxlag=0)
    return {
        'Series': name,
        'Test Statistic': f"{result[0]:.4f}",
        'p-value': f"{result[1]:.4f}",
        'CV 1%': f"{result[4]['1%']:.4f}",
        'CV 5%': f"{result[4]['5%']:.4f}",
        'CV 10%': f"{result[4]['10%']:.4f}"
    }

# Run ADF tests and collect results
adf_results = [
    adf_test(ntlm["g"], "Real GDP (Level)"),
    adf_test(ntlm["g"].diff(), "Real GDP (1st Diff)"),
    adf_test(ntlm["ntlg"], "Nighttime light index (Level)"),
    adf_test(ntlm["ntlg"].diff(), "Nighttime light index (1st Diff)"),
    adf_test(ntlm["resid"], "OLS Residuals")
]

# Create DataFrame
adf_df = pd.DataFrame(adf_results)

# Save to CSV
adf_df.to_csv("reg/adf_results.csv", index=False)

# Save to Markdown
markdown_table = tabulate(adf_df, headers='keys', tablefmt='pipe', showindex=False)
with open("reg/adf_results.md", "w") as f:
    f.write(markdown_table)
    f.write("\n\n: ADF Unit Root Test Results {#tbl-adf}")
print(adf_df.to_string(index=False))

### Johansen cointegration test

In [None]:
# Select optimal lag order
ntlm=ntl[['g','ntlg']]
lag_order = select_order(ntlm, maxlags=12, deterministic="ci")

# Convert lag order results to DataFrame
lag_df = pd.DataFrame({
    'AIC': lag_order.ics['aic'],
    'BIC': lag_order.ics['bic'],
    'FPE': lag_order.ics['fpe'],
    'HQIC': lag_order.ics['hqic']
})
lag_df.index.name = 'Lag'

# Convert to string first, then mark minimum values with asterisk
lag_df_formatted = lag_df.astype(float).copy()
for col in lag_df_formatted.columns:
    min_idx = lag_df_formatted[col].idxmin()
    lag_df_formatted[col] = lag_df_formatted[col].apply(lambda x: f"{x:.4f}")
    lag_df_formatted.loc[min_idx, col] = lag_df_formatted.loc[min_idx, col] + "*"

# Reset index to make Lag a column
lag_df_export = lag_df_formatted.reset_index()

# Save lag order to CSV and Markdown
from tabulate import tabulate

lag_df_export.to_csv("reg/lag_order.csv", index=False)
markdown_table = tabulate(lag_df_export, headers='keys', tablefmt='pipe', showindex=False)
with open("reg/lag_order.md", "w") as f:
    f.write(markdown_table)
    f.write("\n\n: VECM Lag Order Selection {#tbl-lag}")

print(lag_df_export.to_string(index=False))

# Select cointegration rank
coint_rank = select_coint_rank(ntlm, det_order=1, k_ar_diff=lag_order.bic)

# Convert cointegration rank results to DataFrame
n_stats = len(coint_rank.test_stats)
coint_df = pd.DataFrame({
    'r0': list(range(n_stats)),
    'r1': [ntlm.shape[1]] * n_stats,
    'Test Statistic': [f"{ts:.2f}" for ts in coint_rank.test_stats],
    'Critical Value (5%)': [f"{cv:.2f}" for cv in coint_rank.crit_vals]
})

# Save cointegration rank to CSV and Markdown
coint_df.to_csv("reg/coint_rank.csv", index=False)
markdown_table = tabulate(coint_df, headers='keys', tablefmt='pipe', showindex=False)
with open("reg/coint_rank.md", "w") as f:
    f.write(markdown_table)
    f.write("\n\n: Johansen Cointegration Test {#tbl-coint}")

print(coint_df.to_string(index=False))

### VECM with quarterly dataset

In [None]:
en=ntl[['g','ntlg']]
exc=ntl[['covid']]
exs=ntl[['scar']]
exq=ntl[['q1','q2','q3']]
exqc=ntl[['q1','q2','q3','covid']]
exqs=ntl[['q1','q2','q3','scar']]

lag=lag_order.aic

ve = VECM(en,k_ar_diff=lag, coint_rank=2, deterministic="cili").fit()
vec = VECM(en,k_ar_diff=lag, coint_rank=2, exog=exc,deterministic="cili").fit()
ves = VECM(en,k_ar_diff=lag, coint_rank=2, exog=exs,deterministic="cili").fit()
veq = VECM(en,k_ar_diff=lag, coint_rank=2, exog=exq,deterministic="cili").fit()
veqc = VECM(en,k_ar_diff=lag, coint_rank=2, exog=exqc,deterministic="cili").fit()
veqs = VECM(en,k_ar_diff=lag, coint_rank=2, exog=exqs,deterministic="cili").fit()

models = {'fve': ve, 'fvec': vec,'fves': ves, 'fveq': veq,'fveqc': veqc, 'fveqs': veqs}
results = {}

for name, model in models.items():
    fitted = pd.DataFrame(model.fittedvalues, columns=en.columns)
    fitted.index = en.index[-len(fitted):]  # Align to last N dates
    merged = pd.merge(en, fitted, left_index=True, right_index=True, suffixes=('', '_fitted'))
    results[name] = merged


fig, ax = plt.subplots(3,2,figsize=(12, 12))

ax[0,0].plot(results['fve']['g'],color='#f0a22e',linestyle='-',label="Actual GDP Growth")
ax[0,0].plot(results['fve']['g_fitted'], linestyle='--', color='#a5644e', label="Fitted GDP Growth")
ax[0,0].set_title('(a) VECM')
ax[0,0].legend()

ax[0,1].plot(results['fvec']['g'],color='#f0a22e',linestyle='-',label="Actual GDP Growth")
ax[0,1].plot(results['fvec']['g_fitted'], linestyle='--', color='#a5644e', label="Fitted GDP Growth")
ax[0,1].set_title('(b) VECM+Covid')
ax[0,1].legend()

ax[1,0].plot(results['fves']['g'],color='#f0a22e',linestyle='-',label="Actual GDP Growth")
ax[1,0].plot(results['fves']['g_fitted'], linestyle='--', color='#a5644e', label="Fitted GDP Growth")
ax[1,0].set_title('(c) VECM+Scarring')
ax[1,0].legend()

ax[1,1].plot(results['fveq']['g'],color='#f0a22e',linestyle='-',label="Actual GDP Growth")
ax[1,1].plot(results['fveq']['g_fitted'], linestyle='--', color='#a5644e', label="Fitted GDP Growth")
ax[1,1].set_title('(d) VECM+Q')
ax[1,1].legend()

ax[2,0].plot(results['fveqc']['g'],color='#f0a22e',linestyle='-',label="Actual GDP Growth")
ax[2,0].plot(results['fveqc']['g_fitted'], linestyle='--', color='#a5644e', label="Fitted GDP Growth")
ax[2,0].set_title('(e) VECM+Q+C')
ax[2,0].legend()

ax[2,1].plot(results['fveqs']['g'],color='#f0a22e',linestyle='-',label="Actual GDP Growth")
ax[2,1].plot(results['fveqs']['g_fitted'], linestyle='--', color='#a5644e', label="Fitted GDP Growth")
ax[2,1].set_title('(f) VECM+Q+S')
ax[2,1].legend()
plt.tight_layout()
plt.savefig("fig/VECMQ.png")
plt.show()

## Growth Regression

### The GDP and night light growth dataset
Turn on the last line of the first codeblock to see the dataframe

In [None]:
## Data prep
### Creating data
ntl=pd.read_excel('data/ntl_monthly_avg_2012-2025.xlsx')
gdp=pd.read_excel('data/GDP_YoY_Quarterly_12_25.xlsx')

### Make time index
ntl.Date=pd.to_datetime(ntl['Date'])
ntl['qtr']=ntl['Date'].dt.quarter
ntl['year']=ntl['Date'].dt.year
### Averaging the radiance into quarterly, make it yoy quarterly growth
ntl=ntl.groupby(['year','qtr'])['NTL_Radiance'].mean().reset_index()
ntl['Date']=pd.date_range(start='2012-01-01', periods=len(ntl), freq='QE')
ntl=ntl[['Date','NTL_Radiance']]
ntl['g']=(gdp['Real GDP YoY Growth Indonesia'])
ntl['ntlg']=(ntl['NTL_Radiance'])
ntl['NTL_Radiancelag'] = ntl['NTL_Radiance'].shift(4)
ntl['ntlg'] = ((ntl['NTL_Radiance'] - ntl['NTL_Radiancelag']) / ntl['NTL_Radiancelag']) * 100
### Creating dummy quarterly and dummy covid
ntl['q1']=np.where(ntl['Date'].dt.quarter==1,1,0)
ntl['q2']=np.where(ntl['Date'].dt.quarter==2,1,0)
ntl['q3']=np.where(ntl['Date'].dt.quarter==3,1,0)
ntl['q4']=np.where(ntl['Date'].dt.quarter==4,1,0)
ntl['covid']=np.where((ntl['Date'].dt.year>=2020) & (ntl['Date'].dt.year<=2022),1,0)
ntl['scar']=np.where((ntl['Date'].dt.year>=2020) ,1,0)
### Back to making time index
ntl=ntl.dropna().reset_index(drop=True)
ntl=ntl.set_index('Date')
ntl=ntl.asfreq('QE-DEC')
#ntlm=ntlm[['g','ntlg']]

### Creating dummy quarterly and dummy covid

## OLS-ing
mod=sm.OLS(ntl['g'], sm.add_constant(ntl['ntlg'])).fit()
ntl['resid']=mod.resid
ntl['ols']=mod.predict()
#ntl

In [None]:
# Plotting GDP Growth and Night light growth side by side
fig, (ax1, ax2) = plt.subplots(nrows=1, ncols=2, figsize=(10, 4))
ntlm=ntl[4:]
ax1.plot(ntlm['g'],color='#f0a22e',marker='o', linestyle='-')
ax1.set_title('(a) Quarterly GDP growth')

ax2.plot(ntlm['ntlg'], linestyle='-', color='#f0a22e',marker='o')
ax2.set_title('(b) Quarterly night light growth')

plt.tight_layout()
plt.savefig("fig/fig.png")
plt.show()

### OLS Regression

In [None]:
## OLS results and plotting residuals
ntl=ntlm
print(mod.summary())
fig, (ax1, ax2) = plt.subplots(nrows=1, ncols=2, figsize=(10, 4))

ax1.plot(ntlm['g'],color='#f0a22e',linestyle="-",label="observed GDP Growth")
ax1.plot(ntlm['ols'],color='#a5644e',linestyle="--",label="OLS-fitted GDP Growth")
ax1.set_title('(a) GDP Growth')
ax1.legend()

ax2.plot(ntlm['resid'], linestyle='-', color='#f0a22e')
ax2.set_title('(b) OLS Residuals')

plt.tight_layout()
plt.savefig("fig/ols.png") # Turn off to not save, or change file name to save in your preferred location
plt.show()

### ADF Test

In [None]:
## ADF Test for g, ntlg and OLS residuals
def adf_test(series, name=""):
    """
    Perform ADF test and print results
    """
    result = adfuller(series.dropna(), autolag="BIC")
    print(f"ADF Test for {name}")
    print(f"  Test Statistic : {result[0]:.4f}")
    print(f"  p-value        : {result[1]:.4f}")
    print(f"  #Lags Used     : {result[2]}")
    print(f"  #Observations  : {result[3]}")
    for key, value in result[4].items():
        print(f"   Critical Value {key} : {value:.4f}")
    if result[1] <= 0.05:
        print(f"  ==> {name} is stationary\n (reject H0 of unit root)\n")
    else:
        print(f"  ==> {name} is non-stationary\n (fail to reject H0)\n")

# Run ADF tests for both series
adf_test(ntlm["g"], "GDP YoY Growth")
adf_test(ntlm["ntlg"], "NTL YoY Growth")
adf_test(ntlm["resid"], "OLS Residuals")

### Johansen Cointegration test

In [None]:
# Select optimal lag order
ntlm=ntl[['g','ntlg']]
lag_order = select_order(ntlm.asfreq('QE-DEC'), maxlags=12, deterministic="ci")
print(lag_order.summary())

# Select cointegration rank
coint_rank = select_coint_rank(ntlm.asfreq('QE-DEC'), det_order=0, k_ar_diff=lag_order.bic)
print(coint_rank.summary())

### VECM with growth 

In [None]:
en=ntl[['g','ntlg']]
exc=ntl[['covid']]
exs=ntl[['scar']]
exq=ntl[['q1','q2','q3']]
exqc=ntl[['q1','q2','q3','covid']]
exqs=ntl[['q1','q2','q3','scar']]

lag=lag_order.aic

ve = VECM(en,k_ar_diff=lag, coint_rank=1, deterministic="ci").fit()
vec = VECM(en,k_ar_diff=lag, coint_rank=1, exog=exc,deterministic="ci").fit()
ves = VECM(en,k_ar_diff=lag, coint_rank=1, exog=exs,deterministic="ci").fit()
veq = VECM(en,k_ar_diff=lag, coint_rank=1, exog=exq,deterministic="ci").fit()
veqc = VECM(en,k_ar_diff=lag, coint_rank=1, exog=exqc,deterministic="ci").fit()
veqs = VECM(en,k_ar_diff=lag, coint_rank=1, exog=exqs,deterministic="ci").fit()

models = {'fve': ve, 'fvec': vec,'fves': ves, 'fveq': veq,'fveqc': veqc, 'fveqs': veqs}
results = {}

for name, model in models.items():
    fitted = pd.DataFrame(model.fittedvalues, columns=en.columns)
    fitted.index = en.index[-len(fitted):]  # Align to last N dates
    merged = pd.merge(en, fitted, left_index=True, right_index=True, suffixes=('', '_fitted'))
    results[name] = merged


fig, ax = plt.subplots(3,2,figsize=(12, 12))

ax[0,0].plot(results['fve']['g'],color='#f0a22e',linestyle='-',label="Actual GDP Growth")
ax[0,0].plot(results['fve']['g_fitted'], linestyle='--', color='#a5644e', label="Fitted GDP Growth")
ax[0,0].set_title('(a) VECM')
ax[0,0].legend()

ax[0,1].plot(results['fvec']['g'],color='#f0a22e',linestyle='-',label="Actual GDP Growth")
ax[0,1].plot(results['fvec']['g_fitted'], linestyle='--', color='#a5644e', label="Fitted GDP Growth")
ax[0,1].set_title('(b) VECM+Covid')
ax[0,1].legend()

ax[1,0].plot(results['fves']['g'],color='#f0a22e',linestyle='-',label="Actual GDP Growth")
ax[1,0].plot(results['fves']['g_fitted'], linestyle='--', color='#a5644e', label="Fitted GDP Growth")
ax[1,0].set_title('(c) VECM+Scarring')
ax[1,0].legend()

ax[1,1].plot(results['fveq']['g'],color='#f0a22e',linestyle='-',label="Actual GDP Growth")
ax[1,1].plot(results['fveq']['g_fitted'], linestyle='--', color='#a5644e', label="Fitted GDP Growth")
ax[1,1].set_title('(d) VECM+Q')
ax[1,1].legend()

ax[2,0].plot(results['fveqc']['g'],color='#f0a22e',linestyle='-',label="Actual GDP Growth")
ax[2,0].plot(results['fveqc']['g_fitted'], linestyle='--', color='#a5644e', label="Fitted GDP Growth")
ax[2,0].set_title('(e) VECM+Q+C')
ax[2,0].legend()

ax[2,1].plot(results['fveqs']['g'],color='#f0a22e',linestyle='-',label="Actual GDP Growth")
ax[2,1].plot(results['fveqs']['g_fitted'], linestyle='--', color='#a5644e', label="Fitted GDP Growth")
ax[2,1].set_title('(f) VECM+Q+S')
ax[2,1].legend()
plt.tight_layout()
plt.savefig("fig/VECM.png")
plt.show()

### VAR with growth

In [None]:

en=ntl[['g','ntlg']]
exc=ntl[['covid']]
exs=ntl[['scar']]
exq=ntl[['q1','q2','q3']]
exqc=ntl[['q1','q2','q3','covid']]
exqs=ntl[['q1','q2','q3','scar']]

#lag=lag_order.aic

ve = sm.tsa.VARMAX(en, order=(4,0), trend='n').fit(maxiter=1000, disp=False)
vec = sm.tsa.VARMAX(en, order=(4,0), trend='n',exog=exc).fit(maxiter=1000, disp=False)
ves = sm.tsa.VARMAX(en, order=(4,0), trend='n',exog=exs).fit(maxiter=1000, disp=False)
veq = sm.tsa.VARMAX(en, order=(4,0), trend='n',exog=exq).fit(maxiter=1000, disp=False)
veqc = sm.tsa.VARMAX(en, order=(4,0), trend='n',exog=exqc).fit(maxiter=1000, disp=False)
veqs = sm.tsa.VARMAX(en, order=(4,0), trend='n',exog=exqs).fit(maxiter=1000, disp=False)


models = {'fve': ve, 'fvec': vec,'fves': ves, 'fveq': veq,'fveqc': veqc, 'fveqs': veqs}
results = {}

for name, model in models.items():
    fitted = pd.DataFrame(model.fittedvalues, columns=en.columns)
    fitted.index = en.index[-len(fitted):]  # Align to last N dates
    merged = pd.merge(en, fitted, left_index=True, right_index=True, suffixes=('', '_fitted'))
    results[name] = merged

fig, ax = plt.subplots(3,2,figsize=(12, 12))

ax[0,0].plot(results['fve']['g'],color='#f0a22e',linestyle='-',label="Actual GDP Growth")
ax[0,0].plot(results['fve']['g_fitted'], linestyle='--', color='#a5644e', label="Fitted GDP Growth")
ax[0,0].set_title('(a) VAR')
ax[0,0].legend()

ax[0,1].plot(results['fvec']['g'],color='#f0a22e',linestyle='-',label="Actual GDP Growth")
ax[0,1].plot(results['fvec']['g_fitted'], linestyle='--', color='#a5644e', label="Fitted GDP Growth")
ax[0,1].set_title('(b) VAR+Covid')
ax[0,1].legend()

ax[1,0].plot(results['fves']['g'],color='#f0a22e',linestyle='-',label="Actual GDP Growth")
ax[1,0].plot(results['fves']['g_fitted'], linestyle='--', color='#a5644e', label="Fitted GDP Growth")
ax[1,0].set_title('(c) VAR+Scarring')
ax[1,0].legend()

ax[1,1].plot(results['fveq']['g'],color='#f0a22e',linestyle='-',label="Actual GDP Growth")
ax[1,1].plot(results['fveq']['g_fitted'], linestyle='--', color='#a5644e', label="Fitted GDP Growth")
ax[1,1].set_title('(d) VAR+Q')
ax[1,1].legend()

ax[2,0].plot(results['fveqc']['g'],color='#f0a22e',linestyle='-',label="Actual GDP Growth")
ax[2,0].plot(results['fveqc']['g_fitted'], linestyle='--', color='#a5644e', label="Fitted GDP Growth")
ax[2,0].set_title('(e) VAR+Q+C')
ax[2,0].legend()

ax[2,1].plot(results['fveqs']['g'],color='#f0a22e',linestyle='-',label="Actual GDP Growth")
ax[2,1].plot(results['fveqs']['g_fitted'], linestyle='--', color='#a5644e', label="Fitted GDP Growth")
ax[2,1].set_title('(f) VAR+Q+S')
ax[2,1].legend()
plt.tight_layout()
plt.savefig("fig/VAR.png")
plt.show()

### ARDL with growth

In [None]:
en=ntl[['g']]
ex=ntl[['ntlg']]
exc=ntl[['ntlg','covid']]
exs=ntl[['ntlg','scar']]
exq=ntl[['ntlg','q1','q2','q3']]
exqc=ntl[['ntlg','q1','q2','q3','covid']]
exqs=ntl[['ntlg','q1','q2','q3','scar']]

lags = ardl_select_order(endog=en, exog=ex, maxlag=8,maxorder=8, ic='aic',seasonal=False)
ve = ARDL(endog=en,lags=lags.ar_lags,exog=ex,order=lags.dl_lags,trend='ct').fit()
lags = ardl_select_order(endog=en, exog=exc, maxlag=8,maxorder=8, ic='aic',seasonal=False)
vec= ARDL(endog=en,lags=lags.ar_lags,exog=exc,order=lags.dl_lags,trend='ct').fit()
lags = ardl_select_order(endog=en, exog=exs, maxlag=8,maxorder=8, ic='aic',seasonal=False)
ves= ARDL(endog=en,lags=lags.ar_lags,exog=exs,order=lags.dl_lags,trend='ct').fit()
lags = ardl_select_order(endog=en, exog=exq, maxlag=4,maxorder=4, ic='aic',seasonal=False)
veq= ARDL(endog=en,lags=lags.ar_lags,exog=exq,order=lags.dl_lags,trend='ct').fit()
lags = ardl_select_order(endog=en, exog=exqc, maxlag=4,maxorder=4, ic='aic',seasonal=False)
veqc= ARDL(endog=en,lags=lags.ar_lags,exog=exqc,order=lags.dl_lags,trend='ct').fit()
lags = ardl_select_order(endog=en, exog=exqs, maxlag=4,maxorder=4, ic='aic',seasonal=False)
veqs= ARDL(endog=en,lags=lags.ar_lags,exog=exqs,order=lags.dl_lags,trend='ct').fit()

models = {'fve': ve, 'fvec': vec,'fves': ves}
results = {}

for name, model in models.items():
    fitted = pd.DataFrame(model.predict(), columns=en.columns)
    fitted.index = en.index[-len(fitted):]  # Align to last N dates
    merged = pd.merge(en, fitted, left_index=True, right_index=True, suffixes=('', '_fitted'))
    results[name] = merged

models = {'fveq': veq,'fveqc': veqc, 'fveqs': veqs}
result = {}

for name, model in models.items():
    fitted = pd.DataFrame(model.predict(), columns=en.columns)
    fitted.index = en.index[-len(fitted):]  # Align to last N dates
    merged = pd.merge(en, fitted, left_index=True, right_index=True, suffixes=('', '_fitted'))
    result[name] = merged



fig, ax = plt.subplots(3,2,figsize=(12, 12))

ax[0,0].plot(results['fve']['g'],color='#f0a22e',linestyle='-',label="Actual GDP Growth")
ax[0,0].plot(results['fve']['g_fitted'], linestyle='--', color='#a5644e', label="Fitted GDP Growth")
ax[0,0].set_title('(a) ARDL')
ax[0,0].legend()

ax[0,1].plot(results['fvec']['g'],color='#f0a22e',linestyle='-',label="Actual GDP Growth")
ax[0,1].plot(results['fvec']['g_fitted'], linestyle='--', color='#a5644e', label="Fitted GDP Growth")
ax[0,1].set_title('(b) ARDL+Covid')
ax[0,1].legend()

ax[1,0].plot(results['fves']['g'],color='#f0a22e',linestyle='-',label="Actual GDP Growth")
ax[1,0].plot(results['fves']['g_fitted'], linestyle='--', color='#a5644e', label="Fitted GDP Growth")
ax[1,0].set_title('(c) ARDL+Scarring')
ax[1,0].legend()

ax[1,1].plot(result['fveq']['g'],color='#f0a22e',linestyle='-',label="Actual GDP Growth")
ax[1,1].plot(result['fveq']['g_fitted'], linestyle='--', color='#a5644e', label="Fitted GDP Growth")
ax[1,1].set_title('(d) ARDL+Quarterly')
ax[1,1].legend()

ax[2,0].plot(result['fveqc']['g'],color='#f0a22e',linestyle='-',label="Actual GDP Growth")
ax[2,0].plot(result['fveqc']['g_fitted'], linestyle='--', color='#a5644e', label="Fitted GDP Growth")
ax[2,0].set_title('(e) ARDL+Q+C')
ax[2,0].legend()

ax[2,1].plot(result['fveqs']['g'],color='#f0a22e',linestyle='-',label="Actual GDP Growth")
ax[2,1].plot(result['fveqs']['g_fitted'], linestyle='--', color='#a5644e', label="Fitted GDP Growth")
ax[2,1].set_title('(f) ARDL+Q+S')
ax[2,1].legend()
plt.tight_layout()
plt.savefig("fig/ARDL.png")
plt.show()

In [None]:
# Model Functions

def run_vecm(en_train, exog_train, rank=1, det="ci", maxlags=8):
    """VECM with exogenous terms and valid det_type ('ci' or 'co')."""
    if det not in ["ci", "co"]:
        raise ValueError("det_type must be 'ci' or 'co' for VECM.")
    try:
        sel = select_order(en_train, maxlags=maxlags, deterministic=det)
        lag = sel.aic if (sel.aic is not None and sel.aic >= 1) else 1
    except Exception as e:
        print(f"select_order failed ({e}); fallback lag=1")
        lag = 1
    print(f"VECM | lag={lag}, rank={rank}, det={det}")
    model = VECM(en_train, k_ar_diff=lag, coint_rank=rank,
                 deterministic=det, exog=exog_train).fit()
    fitted = pd.DataFrame(model.fittedvalues, columns=en_train.columns,
                          index=en_train.index[-len(model.fittedvalues):])
    return model, fitted

def run_var(en_train, exog_train=None, maxlags=8):
    df = en_train.copy()
    if exog_train is not None:
        df = pd.concat([df, exog_train], axis=1)
    var_model = VAR(df)
    sel = var_model.select_order(maxlags=maxlags)
    p = max(sel.aic, 1) if sel.aic else 1
    print(f"VAR | lag={p}")
    res = var_model.fit(p)
    fitted = res.fittedvalues
    return res, fitted, p

def run_bvar_ridge(en_train, exog_train=None, p=2, lam=0.1):
    Y, X = [], []
    base = en_train.copy()
    if exog_train is not None:
        base = pd.concat([base, exog_train], axis=1)
    for t in range(p, len(base)):
        Y.append(base.values[t, :2])  # first 2 columns (g, ntlg)
        X.append(base.values[t-p:t].flatten())
    Y, X = np.array(Y), np.array(X)
    coefs = []
    for j in range(Y.shape[1]):
        ridge = Ridge(alpha=lam, fit_intercept=True)
        ridge.fit(X, Y[:, j])
        coefs.append(ridge)
    fitted_vals = np.array([r.predict(X) for r in coefs]).T
    fitted = pd.DataFrame(fitted_vals, index=base.index[p:], columns=['g','ntlg'])
    return coefs, fitted, p

def run_ardl(en_ardl, ex_ardl, maxlags=8):
    try:
        lag_sel = ardl_select_order(endog=en_ardl, exog=ex_ardl,
                                    maxlag=maxlags, maxorder=maxlags,
                                    ic="aic", seasonal=False)
        print(f"ARDL | AR lags={lag_sel.ar_lags}, exog lags={lag_sel.dl_lags}")
        model = ARDL(endog=en_ardl, lags=lag_sel.ar_lags,
                     exog=ex_ardl, order=lag_sel.dl_lags, trend="ct").fit()
    except Exception as e:
        print(f"ARDL selection failed ({e}); fallback (1,0)")
        model = ARDL(endog=en_ardl, lags=1, exog=ex_ardl, order=0, trend="ct").fit()
    fitted = model.fittedvalues.to_frame(name="g")
    return model, fitted

In [None]:
def parse_ardl_summary(model_res):
    """Cleanly extract ARDL coefficients as a proper DataFrame."""
    try:
        summary_table = model_res.summary().tables[1]
        df = pd.read_html(summary_table.as_html(), header=0, index_col=0)[0]
        df.reset_index(inplace=True)
        df.rename(columns={"index": "variable"}, inplace=True)
        return df
    except Exception:
        if hasattr(model_res, "params"):
            return pd.DataFrame({
                "variable": model_res.params.index,
                "coef": model_res.params.values,
                "std_err": getattr(model_res, "bse", np.nan),
                "z": getattr(model_res, "tvalues", np.nan),
                "P>|z|": getattr(model_res, "pvalues", np.nan)
            })
        else:
            return pd.DataFrame({"info": ["Could not parse ARDL coefficients."]})


def parse_text_summary(summary_text, start_pattern=None, end_pattern=None):
    """Generic parser for fixed-width text tables in statsmodels summaries."""
    lines = summary_text.splitlines()
    start, end = None, None
    if start_pattern:
        for i, line in enumerate(lines):
            if re.search(start_pattern, line):
                start = i + 2
            elif end_pattern and start is not None and re.search(end_pattern, line):
                end = i - 1
                break
    if start is None:
        start, end = 0, len(lines)
    block_lines = lines[start:end]
    data_lines = [ln for ln in block_lines if re.match(r"^[A-Za-zL0-9]", ln.strip())]
    if not data_lines:
        return None
    table_str = "\n".join(data_lines)
    df = pd.read_fwf(
        io.StringIO(table_str),
        names=["variable", "coef", "std_err", "z", "P>|z|", "CI_low", "CI_high"],
        infer_nrows=100
    )
    for col in ["coef", "std_err", "z", "P>|z|", "CI_low", "CI_high"]:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors="coerce")
    return df.dropna(subset=["coef"], how="all")


def parse_var_summary_tables(model_res):
    """Extract both merged and detailed coefficient tables from VAR summary."""
    summary_text = str(model_res.summary())
    lines = summary_text.splitlines()
    results = []
    eq_name = None
    capture = False
    block_lines = []

    for line in lines:
        match_eq = re.match(r"Results for equation (.+)", line)
        if match_eq:
            if eq_name and block_lines:
                results.append((eq_name, "\n".join(block_lines)))
                block_lines = []
            eq_name = match_eq.group(1).strip()
            capture = True
            continue
        if capture and re.match(r"Correlation matrix", line):
            if eq_name and block_lines:
                results.append((eq_name, "\n".join(block_lines)))
            break
        elif capture and re.match(r"^[A-Za-zL0-9]", line.strip()):
            block_lines.append(line.strip())

    all_tables = []
    for eq_name, block in results:
        df = pd.read_fwf(
            io.StringIO(block),
            names=["variable", "coef", "std_err", "t_stat", "p_value"],
            infer_nrows=100
        )
        for c in ["coef", "std_err", "t_stat", "p_value"]:
            df[c] = pd.to_numeric(df[c], errors="coerce")
        df["equation"] = eq_name
        all_tables.append(df)

    if not all_tables:
        return (pd.DataFrame({"info": ["No coefficients parsed from VAR summary."]}), None)

    # Compact merged table
    merged = None
    for df in all_tables:
        eq_name = df["equation"].iloc[0]
        sub = df[["variable", "coef"]].copy()
        sub.rename(columns={"coef": eq_name}, inplace=True)
        merged = sub if merged is None else pd.merge(merged, sub, on="variable", how="outer")

    detailed = pd.concat(all_tables, ignore_index=True)
    return merged, detailed

In [None]:
ntlm = ntl.copy()
# Settings 
model_type   = "ARDL"        # "VECM", "VAR", "VAR_diff", "BVAR", "ARDL"
spec_type    = "covid"    # "baseline", "covid", "scar", "q", "q_covid", "q_scar"
chosen_rank  = 1             # for VECM
det_type     = "ci"          # for VECM
steps_ahead  = 4             # 4=2024Q3–2025Q2, 8=2023Q3–2025Q2
ridge_lambda = 0.1           # for BVAR
max_lags     = 8

# Forecast horizon 
if steps_ahead == 4:
    future_idx = pd.date_range("2024-09-30", periods=steps_ahead, freq="QE")
    train_end  = pd.Timestamp("2024-06-30")
elif steps_ahead == 8:
    future_idx = pd.date_range("2023-09-30", periods=steps_ahead, freq="QE")
    train_end  = pd.Timestamp("2023-06-30")
else:
    raise ValueError("steps_ahead must be 4 or 8.")

# Base data prep 
en   = ntlm[['g','ntlg']]
exc  = ntlm[['covid']]
exs  = ntlm[['scar']]
exq  = ntlm[['q1','q2','q3']]
exqc = ntlm[['q1','q2','q3','covid']]
exqs = ntlm[['q1','q2','q3','scar']]

ntlm_train = ntlm.loc[:train_end].copy()
en_train   = en.loc[:train_end]

# unified exog map for all models 
exog_map = {
    "baseline": None,
    "covid":   exc.loc[:train_end],
    "scar":    exs.loc[:train_end],
    "q":       exq.loc[:train_end],
    "q_covid": exqc.loc[:train_end],
    "q_scar":  exqs.loc[:train_end],
}
exog_train = exog_map.get(spec_type, None)

# Define endog/exog for ARDL specifically 
en_ardl   = ntlm[['g']].loc[:train_end]
ex_ardl   = {
    "baseline": ntlm[['ntlg']],
    "covid":   ntlm[['ntlg','covid']],
    "scar":    ntlm[['ntlg','scar']],
    "q":       ntlm[['ntlg','q1','q2','q3']],
    "q_covid": ntlm[['ntlg','q1','q2','q3','covid']],
    "q_scar":  ntlm[['ntlg','q1','q2','q3','scar']],
}[spec_type].loc[:train_end]


# Execution
if model_type == "VECM":
    model_res, fitted_df = run_vecm(en_train, exog_train, rank=chosen_rank, det=det_type)
    last_exog = exog_train.iloc[-1].values if exog_train is not None else None
    exog_future = np.tile(last_exog, (steps_ahead, 1)) if last_exog is not None else None
    fcst_array = model_res.predict(steps=steps_ahead, exog_fc=exog_future)

elif model_type == "VAR":
    model_res, fitted_df, p = run_var(en_train, exog_train)
    # VAR was fitted on concatenated data (en_train + exog_train)
    # So we need to pass the full data for forecasting
    if exog_train is not None:
        full_train = pd.concat([en_train, exog_train], axis=1)
    else:
        full_train = en_train
    fcst_array = model_res.forecast(y=full_train.values[-p:], steps=steps_ahead)
    # Extract only the first 2 columns (g, ntlg) from forecast
    fcst_array = fcst_array[:, :2]

elif model_type == "BVAR":
    coefs, fitted_df, p = run_bvar_ridge(en_train, exog_train, lam=ridge_lambda)
    history = en_train.values.copy()
    for _ in range(steps_ahead):
        x_new = history[-p:].flatten().reshape(1, -1)
        y_new = [ridge.predict(x_new)[0] for ridge in coefs]
        history = np.vstack([history, y_new])
    fcst_array = history[-steps_ahead:]

elif model_type == "ARDL":

    # ==========================
    # 1. Fit ARDL model
    # ==========================
    model_res, fitted_df = run_ardl(en_ardl, ex_ardl)

    # ==========================
    # 2. Forecast target quarter (2025Q3)
    # ==========================
    train_end   = pd.Timestamp("2024-12-31")        # last known GDP/NTL
    steps_ahead = 3                                 # only 1-quarter ahead
    future_idx  = pd.date_range("2025-09-30", periods=1, freq="QE")   # 2025Q3

    # ==========================
    # 3. Exogenous values for 2025Q3
    #    Use last available (2025Q2)
    # ==========================
    x_future = pd.DataFrame(
        np.tile(ex_ardl.iloc[-1].values, (steps_ahead, 1)),
        columns=ex_ardl.columns,
        index=future_idx
    )

    # ==========================
    # 4. Proper ARDL prediction
    #    MUST use integer index, NOT timestamp
    # ==========================
    fcst_series = model_res.predict(
        start=len(en_ardl),
        end=len(en_ardl),
        exog_oos=x_future
    )

    # ==========================
    # 5. Match expected output shape
    # ==========================
    fcst_array = np.column_stack([
        fcst_series.values,
        np.tile(en_ardl["g"].iloc[-1], steps_ahead)
    ])

# Create forecast DataFrame
fcst_df = pd.DataFrame(fcst_array, index=future_idx, columns=["g", "ntlg"])

# Actuals - Use GDP growth rate from ntl, NOT log(GDP)
# The 'g' column in ntl contains "Real GDP YoY Growth Indonesia"
actual_gdp_growth = ntlm['g'].reindex(future_idx).rename("g_actual")

# Compare
compare = pd.DataFrame({
    "g_forecast": fcst_df["g"],
    "g_actual": actual_gdp_growth
}, index=future_idx)
compare["error"] = compare["g_forecast"] - compare["g_actual"]

mae = compare["error"].abs().mean()
rmse = np.sqrt((compare["error"]**2).mean())

# Print for all models/specs
print(f"{model_type}-{spec_type} → Forecast {future_idx.min().date()}–{future_idx.max().date()}")
display(compare.round(3))
print(f"MAE: {mae:.3f}, RMSE: {rmse:.3f}")

# Plot
plt.figure(figsize=(12,6))
plt.plot(en.index, en["g"], label="Actual GDP Growth in-sample", color="#7f6000")
plt.plot(fitted_df.index, fitted_df["g"], "--", label=f"Fitted {model_type}-{spec_type}", color="#FFBF00")
plt.plot(fcst_df.index, fcst_df["g"], "o--", label=f"Forecast GDP Growth ({model_type}-{spec_type})",
         color="#FFBF00", markerfacecolor="none")
plt.plot(compare.index, compare["g_actual"], "x-", label="Actual GDP Growth OOS", color="black", linewidth=2)

plt.axvspan(fcst_df.index[0], fcst_df.index[-1], color="gray", alpha=0.15)
plt.axhline(0, color="gray", linestyle=":")
plt.legend()

# Title with model + spec type
plt.title(f"{model_type}-{spec_type} — In-sample fit & Out-of-sample forecast")

plt.ylabel("GDP YoY Growth (%)")
plt.xlabel("Date")

# X-axis ticks: show every year 
import matplotlib.dates as mdates
plt.gca().xaxis.set_major_locator(mdates.YearLocator(1))
plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%Y'))
plt.xticks(rotation=45)

plt.tight_layout()
plt.show()

# === Prepare Excel output path ===
timestamp = datetime.now().strftime("%Y%m%d_%H%M")
output_file = f"model_results_{model_type}_{spec_type}_{timestamp}.xlsx"

# === 1. Forecast Comparison Sheet ===
compare_out = compare.copy()
compare_out.index.name = "Date"

# === 2. Fit Statistics Sheet ===
fit_stats = pd.DataFrame({
    "Model": [model_type],
    "Specification": [spec_type],
    "Steps_ahead": [steps_ahead],
    "MAE": [mae],
    "RMSE": [rmse],
    "Rank (VECM only)": [chosen_rank if model_type == "VECM" else None],
    "Deterministic type": [det_type if model_type == "VECM" else None],
    "Train_end": [train_end.strftime("%Y-%m-%d")]
})


# === 3. Coefficient Table Extraction (unified for all models) ===
coef_table = None
coef_table_detailed = None  # for VAR detailed output


# === Apply model-specific parsing ===
try:
    if model_type == "VECM":
        coef_table = parse_text_summary(
            model_res.summary().as_text(),
            start_pattern=r"equation g",
            end_pattern=r"equation ntlg"
        )

    elif model_type in ["VAR", "VAR_diff"]:
        coef_table, coef_table_detailed = parse_var_summary_tables(model_res)

    elif model_type == "ARDL":
        coef_table = parse_ardl_summary(model_res)

    elif model_type == "BVAR":
        coef_data = []
        for i, ridge in enumerate(coefs):
            row = {"equation": f"eq{i+1}"}
            if hasattr(ridge, "coef_"):
                row.update({f"beta_{j}": b for j, b in enumerate(ridge.coef_.flatten(), 1)})
            coef_data.append(row)
        coef_table = pd.DataFrame(coef_data)

    else:
        coef_table = pd.DataFrame({"info": ["No coefficient table extracted."]})

except Exception as e:
    coef_table = pd.DataFrame({"error": [f"Could not extract coefficients: {e}"]})
    coef_table_detailed = None


# === Write all results to Excel ===
with pd.ExcelWriter(output_file, engine="openpyxl") as writer:
    compare_out.to_excel(writer, sheet_name="Forecast_vs_Actual")
    fit_stats.to_excel(writer, sheet_name="Fit_Stats", index=False)

    if model_type in ["VAR", "VAR_diff"]:
        if coef_table is not None:
            coef_table.to_excel(writer, sheet_name="VAR_Merged", index=False)
        if coef_table_detailed is not None:
            coef_table_detailed.to_excel(writer, sheet_name="VAR_Detailed", index=False)
    elif coef_table is not None:
        coef_table.to_excel(writer, sheet_name="Model_Coefficients", index=False)

print(f"Results saved to {output_file}")

In [None]:
# === SETTINGS ===
model_types = ["VECM", "VAR", "VAR_diff", "BVAR", "ARDL"]
spec_types  = ["baseline", "covid", "scar", "q", "q_covid", "q_scar"]
steps_ahead = 4
ridge_lambda = 0.1
chosen_rank = 1
det_type = "ci"
max_lags = 8

# === OUTPUT FOLDER ===
timestamp = datetime.now().strftime("%Y%m%d_%H%M")
base_dir = f"forecast_results_{timestamp}"
os.makedirs(base_dir, exist_ok=True)

# === FORECAST HORIZON ===
if steps_ahead == 4:
    future_idx = pd.date_range("2024-09-30", periods=steps_ahead, freq="QE")
    train_end  = pd.Timestamp("2024-06-30")
elif steps_ahead == 8:
    future_idx = pd.date_range("2023-09-30", periods=steps_ahead, freq="QE")
    train_end  = pd.Timestamp("2023-06-30")
else:
    raise ValueError("steps_ahead must be 4 or 8.")

# === DATA PREPARATION ===
# assumes `ntlm` and `gdp` already defined in memory
en   = ntlm[['g','ntlg']]
exc  = ntlm[['covid']]
exs  = ntlm[['scar']]
exq  = ntlm[['q1','q2','q3']]
exqc = ntlm[['q1','q2','q3','covid']]
exqs = ntlm[['q1','q2','q3','scar']]
ntlm_train = ntlm.loc[:train_end].copy()
en_train   = en.loc[:train_end]

# align gdp
if 'Date' in gdp.columns:
    gdp['Date'] = pd.to_datetime(gdp['Date']) + QuarterEnd(0)
    gdp = gdp.set_index('Date')
else:
    gdp.index = pd.to_datetime(gdp.index) + QuarterEnd(0)
gdp = gdp.asfreq('QE-DEC')
gdp['log_gdp'] = np.log(gdp['GDP'])

# === HELPER FUNCTIONS ===
def parse_var_summary_tables(model_res):
    summary_text = str(model_res.summary())
    lines = summary_text.splitlines()
    results = []
    eq_name, capture, block_lines = None, False, []

    for line in lines:
        match_eq = re.match(r"Results for equation (.+)", line)
        if match_eq:
            if eq_name and block_lines:
                results.append((eq_name, "\n".join(block_lines)))
                block_lines = []
            eq_name = match_eq.group(1).strip()
            capture = True
            continue
        if capture and re.match(r"Correlation matrix", line):
            if eq_name and block_lines:
                results.append((eq_name, "\n".join(block_lines)))
            break
        elif capture and re.match(r"^[A-Za-zL0-9]", line.strip()):
            block_lines.append(line.strip())

    all_tables = []
    for eq_name, block in results:
        df = pd.read_fwf(
            io.StringIO(block),
            names=["variable", "coef", "std_err", "t_stat", "p_value"],
            infer_nrows=100
        )
        for c in ["coef", "std_err", "t_stat", "p_value"]:
            df[c] = pd.to_numeric(df[c], errors="coerce")
        df["equation"] = eq_name
        all_tables.append(df)

    if not all_tables:
        return pd.DataFrame({"info": ["No coefficients parsed from VAR summary."]}), None

    merged = None
    for df in all_tables:
        eq = df["equation"].iloc[0]
        sub = df[["variable", "coef"]].copy()
        sub.rename(columns={"coef": eq}, inplace=True)
        merged = sub if merged is None else pd.merge(merged, sub, on="variable", how="outer")

    detailed = pd.concat(all_tables, ignore_index=True)
    return merged, detailed


def parse_ardl_summary(model_res):
    try:
        summary_table = model_res.summary().tables[1]
        df = pd.read_html(summary_table.as_html(), header=0, index_col=0)[0]
        df.reset_index(inplace=True)
        df.rename(columns={"index": "variable"}, inplace=True)
        return df
    except Exception:
        if hasattr(model_res, "params"):
            return pd.DataFrame({
                "variable": model_res.params.index,
                "coef": model_res.params.values,
                "std_err": getattr(model_res, "bse", np.nan),
                "z": getattr(model_res, "tvalues", np.nan),
                "P>|z|": getattr(model_res, "pvalues", np.nan)
            })
        else:
            return pd.DataFrame({"info": ["Could not parse ARDL coefficients."]})


# === STORAGE FOR SUMMARY RESULTS ===
summary_rows = []

# === MAIN LOOP ===
for model_type in model_types:
    for spec_type in spec_types:

        # Select exog
        exog_map = {
            "baseline": None,
            "covid":   exc.loc[:train_end],
            "scar":    exs.loc[:train_end],
            "q":       exq.loc[:train_end],
            "q_covid": exqc.loc[:train_end],
            "q_scar":  exqs.loc[:train_end],
        }
        exog_train = exog_map.get(spec_type, None)

        # Prepare ARDL vars
        en_ardl   = ntlm[['g']].loc[:train_end]
        ex_ardl   = {
            "baseline": ntlm[['ntlg']],
            "covid":   ntlm[['ntlg','covid']],
            "scar":    ntlm[['ntlg','scar']],
            "q":       ntlm[['ntlg','q1','q2','q3']],
            "q_covid": ntlm[['ntlg','q1','q2','q3','covid']],
            "q_scar":  ntlm[['ntlg','q1','q2','q3','scar']],
        }[spec_type].loc[:train_end]

        # === Run model ===
        if model_type == "VECM":
            model_res, fitted_df = run_vecm(en_train, exog_train, rank=chosen_rank, det=det_type)
            last_exog = exog_train.iloc[-1].values if exog_train is not None else None
            exog_future = np.tile(last_exog, (steps_ahead, 1)) if last_exog is not None else None
            fcst_array = model_res.predict(steps=steps_ahead, exog_fc=exog_future)

        # elif model_type == "VAR":
        #     model_res, fitted_df, p = run_var(en_train, exog_train)
        
        #     try:
        #         # Try forecast with or without exog
        #         if getattr(model_res, "k_exog", 0) > 0 and exog_train is not None:
        #             last_exog = exog_train.iloc[-1].values
        #             exog_future = np.tile(last_exog, (steps_ahead, 1))
        #             fcst_array = model_res.forecast(
        #                 y=en_train.values[-p:], 
        #                 steps=steps_ahead, 
        #                 exog_future=exog_future
        #             )
        #         else:
        #             fcst_array = model_res.forecast(
        #                 y=en_train.values[-p:], 
        #                 steps=steps_ahead
        #             )

        #         except ValueError as e:
        #             # Detect the specific exog alignment error and skip gracefully
        #             if "No exog in model" in str(e) and "exog_future" in str(e):
        #                 print(f"⚠️ Skipping {model_type}-{spec_type}: {e}")
        #                 continue  # move to next spec/model combo
        #             else:
        #                 raise  # re-raise any other unexpected error




        # elif model_type == "VAR_diff":
        #     model_res, fitted_df, p = run_var(en_train.diff().dropna(), exog_train)
        #     y_input = en_train.diff().dropna().values[-p:]
        
        #     if getattr(model_res, "k_exog", 0) > 0 and exog_train is not None:
        #         last_exog = exog_train.iloc[-1].values
        #         exog_future = np.tile(last_exog, (steps_ahead, 1))
        #         fcst_array = model_res.forecast(y=y_input, steps=steps_ahead, exog_future=exog_future)
        #     else:
        #         fcst_array = model_res.forecast(y=y_input, steps=steps_ahead)



        # elif model_type == "BVAR":
        #     coefs, fitted_df, p = run_bvar_ridge(en_train, exog_train, lam=ridge_lambda)
        #     history = en_train.values.copy()
        #     for _ in range(steps_ahead):
        #         x_new = history[-p:].flatten().reshape(1, -1)
        #         y_new = [ridge.predict(x_new)[0] for ridge in coefs]
        #         history = np.vstack([history, y_new])
        #     fcst_array = history[-steps_ahead:]

        elif model_type == "ARDL":
            model_res, fitted_df = run_ardl(en_ardl, ex_ardl)
            x_future = pd.DataFrame(
                np.tile(ex_ardl.iloc[-1].values, (steps_ahead, 1)),
                columns=ex_ardl.columns,
                index=future_idx
            )
            fcst_series = model_res.predict(
                start=len(en_ardl),
                end=len(en_ardl) + steps_ahead - 1,
                exog_oos=x_future
            )
            fcst_array = np.column_stack([
                fcst_series.values,
                np.tile(en_ardl["g"].iloc[-1], steps_ahead)
            ])
        else:
            continue

        # === Forecast results ===
        fcst_df = pd.DataFrame(fcst_array, columns=en_train.columns, index=future_idx)
        actual_gdp = gdp['log_gdp'].reindex(future_idx).rename("g_actual")
        compare = pd.DataFrame({
            "g_forecast": fcst_df["g"],
            "g_actual": actual_gdp
        }, index=future_idx)
        compare["error"] = compare["g_forecast"] - compare["g_actual"]
        mae = compare["error"].abs().mean()
        rmse = np.sqrt((compare["error"]**2).mean())

        # === Plot ===
        plt.figure(figsize=(12,6))
        plt.plot(en.index, en["g"], label="Actual log(GDP) in-sample", color="#7f6000")
        plt.plot(fitted_df.index, fitted_df["g"], "--", label=f"Fitted {model_type}-{spec_type}", color="#FFBF00")
        plt.plot(fcst_df.index, fcst_df["g"], "o--", label=f"Forecast log(GDP) ({model_type}-{spec_type})",
                 color="#FFBF00", markerfacecolor="none")
        plt.plot(compare.index, compare["g_actual"], "x-", label="Actual log(GDP) OOS", color="black", linewidth=2)
        plt.axvspan(fcst_df.index[0], fcst_df.index[-1], color="gray", alpha=0.15)
        plt.axhline(0, color="gray", linestyle=":")
        plt.legend()
        plt.title(f"{model_type}-{spec_type} — In-sample fit & Out-of-sample forecast")
        plt.ylabel("log(GDP)")
        plt.xlabel("Date")
        plt.ylim(14.25, 15.25)
        plt.gca().xaxis.set_major_locator(mdates.YearLocator(1))
        plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%Y'))
        plt.xticks(rotation=45)

        plot_path = os.path.join(base_dir, f"{model_type}_{spec_type}_forecast.png")
        plt.tight_layout()
        plt.savefig(plot_path, dpi=200)
        plt.close()

        # === Coefficients extraction ===
        coef_table = None
        coef_table_detailed = None
        if model_type == "VAR":
            coef_table, coef_table_detailed = parse_var_summary_tables(model_res)
        elif model_type == "ARDL":
            coef_table = parse_ardl_summary(model_res)
        elif model_type == "BVAR":
            coef_data = []
            for i, ridge in enumerate(coefs):
                row = {"equation": f"eq{i+1}"}
                if hasattr(ridge, "coef_"):
                    row.update({f"beta_{j}": b for j, b in enumerate(ridge.coef_.flatten(), 1)})
                coef_data.append(row)
            coef_table = pd.DataFrame(coef_data)

        # === Save to Excel ===
        excel_path = os.path.join(base_dir, f"{model_type}_{spec_type}.xlsx")
        with pd.ExcelWriter(excel_path, engine="openpyxl") as writer:
            compare.to_excel(writer, sheet_name="Forecast_vs_Actual")
            pd.DataFrame({
                "Model": [model_type],
                "Specification": [spec_type],
                "Steps_ahead": [steps_ahead],
                "MAE": [mae],
                "RMSE": [rmse],
                "Rank (VECM only)": [chosen_rank if model_type == "VECM" else None],
                "Deterministic type": [det_type if model_type == "VECM" else None],
                "Train_end": [train_end.strftime("%Y-%m-%d")]
            }).to_excel(writer, sheet_name="Fit_Stats", index=False)
            if model_type == "VAR":
                if coef_table is not None:
                    coef_table.to_excel(writer, sheet_name="VAR_Merged", index=False)
                if coef_table_detailed is not None:
                    coef_table_detailed.to_excel(writer, sheet_name="VAR_Detailed", index=False)
            elif coef_table is not None:
                coef_table.to_excel(writer, sheet_name="Model_Coefficients", index=False)

        # record to summary
        summary_rows.append({
            "Model": model_type,
            "Specification": spec_type,
            "MAE": mae,
            "RMSE": rmse
        })

# === Final Summary Table ===
summary_df = pd.DataFrame(summary_rows)
summary_df.to_excel(os.path.join(base_dir, "model_summary.xlsx"), index=False)

print(f"\n✅ All models completed. Results saved in: {base_dir}")


In [None]:
# === SETTINGS ===
model_types = ["VECM", "VAR", "VAR_diff", "BVAR", "ARDL"]
spec_types  = ["baseline", "covid", "scar", "q", "q_covid", "q_scar"]
ridge_lambda = 0.1
chosen_rank = 1
det_type = "ci"
max_lags = 8

# === OUTPUT FOLDER ===
timestamp = datetime.now().strftime("%Y%m%d_%H%M")
base_dir = f"forecast_results_{timestamp}"
os.makedirs(base_dir, exist_ok=True)

# === DATA PREPARATION ===
en   = ntlm[['g','ntlg']]
exc  = ntlm[['covid']]
exs  = ntlm[['scar']]
exq  = ntlm[['q1','q2','q3']]
exqc = ntlm[['q1','q2','q3','covid']]
exqs = ntlm[['q1','q2','q3','scar']]

# --- Align GDP ---
if 'Date' in gdp.columns:
    gdp['Date'] = pd.to_datetime(gdp['Date']) + QuarterEnd(0)
    gdp = gdp.set_index('Date')
else:
    gdp.index = pd.to_datetime(gdp.index) + QuarterEnd(0)
gdp = gdp.asfreq('QE-DEC')
gdp['log_gdp'] = np.log(gdp['GDP'])

# === HELPERS ===
def parse_vecm_summary(model_res):
    try:
        summary_text = str(model_res.summary())
        lines = summary_text.splitlines()
        data_lines = [ln for ln in lines if re.match(r"^[A-Za-z]", ln.strip())]
        df = pd.read_fwf(io.StringIO("\n".join(data_lines)),
                         names=["variable", "coef", "std_err", "t_stat", "p_value"],
                         infer_nrows=100)
        for c in ["coef", "std_err", "t_stat", "p_value"]:
            df[c] = pd.to_numeric(df[c], errors="coerce")
        return df.dropna(subset=["coef"], how="all")
    except Exception as e:
        return pd.DataFrame({"error":[f"Could not parse VECM coefficients: {e}"]})

def parse_text_summary(summary_text, start_pattern=None, end_pattern=None):
    lines = summary_text.splitlines()
    start, end = None, None

    if start_pattern:
        for i, line in enumerate(lines):
            if re.search(start_pattern, line):
                start = i + 2
            elif end_pattern and start is not None and re.search(end_pattern,line):
                end = i - 1
                break
    if start is None:
        start, end = 0, len(lines)

    block_lines = lines[start:end]
    data_lines  = [ln for ln in block_lines if re.match(r"^[A-Za-zL0-9]", ln.strip())]

    if not data_lines:
        return None

    df = pd.read_fwf(
        io.StringIO("\n".join(data_lines)),
        names=["variable","coef","std_err","z","P>|z|","CI_low","CI_high"],
        infer_nrows=100
    )
    for col in ["coef","std_err","z","P>|z|","CI_low","CI_high"]:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors="coerce")
    return df.dropna(subset=["coef"], how="all")


summary_rows = []

# === MAIN LOOP ===
for model_type in model_types:
    for spec_type in spec_types:

        print(f"\n=== Running {model_type}-{spec_type} ===")

        # ============================================================
        # 1. TRAINING WINDOW & FORECAST HORIZON
        # ============================================================
        if model_type == "VECM":
            train_end   = pd.Timestamp("2023-12-31")
            steps_ahead = 8  # 2024Q1–2025Q3 (for example)

        elif model_type == "ARDL":
            # Train up to 2025Q2 and forecast 1 step = 2025Q3
            train_end   = pd.Timestamp("2025-06-30")   # last observed quarter
            steps_ahead = 1                            # only predict 2025Q3

        else:
            train_end   = pd.Timestamp("2024-09-30")
            steps_ahead = 4

        # === Common forecast index (for whatever we forecast) ===
        future_idx = pd.date_range(train_end + QuarterEnd(0),
                                   periods=steps_ahead,
                                   freq="QE")

        # ============================================================
        # 2. PREPARE DATA
        # ============================================================
        ntlm_train = ntlm.loc[:train_end].copy()
        en_train   = en.loc[:train_end]

        exog_map = {
            "baseline": None,
            "covid":   exc.loc[:train_end],
            "scar":    exs.loc[:train_end],
            "q":       exq.loc[:train_end],
            "q_covid": exqc.loc[:train_end],
            "q_scar":  exqs.loc[:train_end],
        }
        exog_train = exog_map.get(spec_type, None)

        # ARDL-specific endog/exog
        en_ardl = ntlm[['g']].loc[:train_end]
        ex_ardl = {
            "baseline": ntlm[['ntlg']],
            "covid":   ntlm[['ntlg','covid']],
            "scar":    ntlm[['ntlg','scar']],
            "q":       ntlm[['ntlg','q1','q2','q3']],
            "q_covid": ntlm[['ntlg','q1','q2','q3','covid']],
            "q_scar":  ntlm[['ntlg','q1','q2','q3','scar']],
        }[spec_type].loc[:train_end]

        # ============================================================
        # 3. RUN MODEL
        # ============================================================
        if model_type == "VECM":
            model_res, fitted_df = run_vecm(en_train, exog_train,
                                            rank=chosen_rank, det=det_type)
            last_exog = exog_train.iloc[-1].values if exog_train is not None else None
            exog_future = np.tile(last_exog, (steps_ahead, 1)) if last_exog is not None else None
            fcst_array = model_res.predict(steps=steps_ahead, exog_fc=exog_future)

        elif model_type == "ARDL":
            # ---- Fit ARDL on data up to 2025Q2 ----
            model_res, fitted_df = run_ardl(en_ardl, ex_ardl)

            # Some quick diagnostics so we see what the model "thinks"
            print("ARDL | last training index:", en_ardl.index[-1])
            print("ARDL | len(en_ardl):", len(en_ardl))

            # ---- Forecast 1 step ahead (index = len(en_ardl)) ----
            # Target quarter is 2025Q3 = 2025-09-30
            future_idx = pd.DatetimeIndex([pd.Timestamp("2025-09-30")])

            # Exogenous for 2025Q3: use latest available (2025Q2) values
            exog_oos = ex_ardl.iloc[[-1]].copy()
            exog_oos.index = future_idx

            # Important: use integer positions for ARDL .predict
            # We have observations indexed 0..len(en_ardl)-1 internally,
            # so next forecast is at position len(en_ardl)
            fcst_series = model_res.predict(
                start=len(en_ardl),      # first out-of-sample obs
                end=len(en_ardl),        # same, since 1-step ahead
                exog_oos=exog_oos
            )

            print("ARDL | forecast index (future_idx):", future_idx)
            print("ARDL | fcst_series:", fcst_series)

            # Match 2-column convention (g + placeholder ntlg)
            fcst_array = np.column_stack([
                fcst_series.values,
                np.tile(en_ardl["g"].iloc[-1], 1)
            ])

        else:
            # Skip other models for now if you want, or keep your VAR/BVAR logic
            continue

        # ============================================================
        # 4. FORECAST RESULTS & COMPARISON
        # ============================================================
        fcst_df = pd.DataFrame(fcst_array, columns=en_train.columns, index=future_idx)
        actual_gdp = gdp['log_gdp'].reindex(future_idx).rename("g_actual")

        compare = pd.DataFrame({
            "g_forecast": fcst_df["g"],
            "g_actual": actual_gdp
        }, index=future_idx)

        compare["error"] = compare["g_forecast"] - compare["g_actual"]
        mae  = compare["error"].abs().mean()
        rmse = np.sqrt((compare["error"]**2).mean())

        # ============================================================
        # 5. EXPORT EXCEL + COEFFICIENTS
        # ============================================================
        coef_table = None
        if model_type == "ARDL":
            coef_table = parse_ardl_summary(model_res)
        elif model_type == "VECM":
            coef_table = parse_text_summary(
                model_res.summary().as_text(),
                start_pattern=r"equation g",
                end_pattern=r"equation ntlg"
            )

        excel_path = os.path.join(base_dir, f"{model_type}_{spec_type}.xlsx")
        with pd.ExcelWriter(excel_path, engine="openpyxl") as writer:
            compare.to_excel(writer, sheet_name="Forecast_vs_Actual")
            pd.DataFrame({
                "Model": [model_type],
                "Specification": [spec_type],
                "Steps_ahead": [steps_ahead],
                "MAE": [mae],
                "RMSE": [rmse],
                "Train_end": [train_end.strftime("%Y-%m-%d")]
            }).to_excel(writer, sheet_name="Fit_Stats", index=False)
            if coef_table is not None:
                coef_table.to_excel(writer, sheet_name="Model_Coefficients", index=False)

        summary_rows.append({
            "Model": model_type,
            "Specification": spec_type,
            "MAE": mae,
            "RMSE": rmse
        })

# === FINAL SUMMARY ===
summary_df = pd.DataFrame(summary_rows)
summary_df.to_excel(os.path.join(base_dir, "model_summary.xlsx"), index=False)
print("\n✅ All models completed.")
