<a href="https://colab.research.google.com/github/Konstantinosil/Chortareas-Linardatos-Central-Bank-Credibility-Under-the-Shadow-Rate-Stance/blob/main/US_EA_UK_Sacrifice_Ratio%2C_Chortareas_%26_Linardatos.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
%pip -q install statsmodels linearmodels openpyxl

import warnings, logging, pathlib
import numpy as np, pandas as pd
import statsmodels.api as sm
from linearmodels.panel import PanelOLS

warnings.filterwarnings('ignore')
logging.basicConfig(level=logging.INFO, format='[%(levelname)s] %(message)s')

PATH_CRED = pathlib.Path('/content/credibility_planB.xlsx')
PATH_DATA = pathlib.Path('/content/chapter 4 august 2025 1st block.xlsx')

assert PATH_CRED.exists(), "Λείπει credibility_planB.xlsx στο /content"
assert PATH_DATA.exists(), "Λείπει chapter 4 ... 1st block.xlsx στο /content"

ECON        = ['US','EU','UK']
START, END  = '2008-01-01','2022-01-01'
HP_LAMBDA_M = 129_600
H_LIST      = (6, 12)
LAGS        = (3, 6, 12)

MIN_ABS_DPI = 0.02
WINSOR_SR   = True
W_PCTS      = (0.01, 0.99)

USE_TREND_PI = True
TREND_K      = 12

FORWARD_WINDOW = False

USE_CONTROLS = True

MIN_ENTITIES_FOR_FE = 2
MIN_T_FOR_FE        = 3

def parse_sheet(name):
    key = {'shadow rates':'SR','policy rates':'POLICY','inflation':'INF',
           'volatility':'VOL','composite leader indicator':'CLI'}
    parts = name.split()
    econ = parts[-1].upper()
    head = ' '.join(parts[:-1]).lower()
    for k,v in key.items():
        if k in head: return v, econ
    if head.startswith('rgdp'): return 'GDP', econ
    return None, None

def to_monthly(s: pd.Series) -> pd.Series:
    s = s.copy()
    if not isinstance(s.index, pd.DatetimeIndex):
        s.index = pd.to_datetime(s.index)
    f = (pd.infer_freq(s.index[:5]) or '').upper()
    if f.startswith(('D','B')):
        s = s.resample('MS').last()
    elif f.startswith('Q'):
        s = s.resample('QS').last().interpolate('linear').resample('MS').ffill()
    else:
        if not s.index.is_month_start.all():
            s.index = s.index.to_period('M').to_timestamp('start')
        s = s.asfreq('MS')
    return s

def ensure_ms(s: pd.Series) -> pd.Series:
    s = pd.Series(s)
    if not isinstance(s.index, pd.DatetimeIndex):
        s.index = pd.to_datetime(s.index)
    return s.asfreq('MS')

def winsorize(s: pd.Series, p=(0.01,0.99)) -> pd.Series:
    if s.dropna().empty: return s
    lo, hi = s.quantile(p[0]), s.quantile(p[1])
    return s.clip(lower=lo, upper=hi)

def load_inf_gdp_panel(path: pathlib.Path, econ_list, start, end):
    xls = pd.read_excel(path, sheet_name=None)
    raw = {}
    for sheet, df in xls.items():
        var, econ = parse_sheet(sheet)
        if var and econ in econ_list:
            df = df.iloc[:, :2].copy()
            df.columns = ['date', var]
            df['date'] = pd.to_datetime(df['date'])
            s = to_monthly(df.set_index('date')[var])
            raw[(econ,var)] = s.loc[str(start):str(end)]
    panel = (pd.concat(raw, axis=1)
               .swaplevel(0,1, axis=1).sort_index(axis=1)
               .stack(level=1).rename_axis(['date','econ']).sort_index())
    return panel

def load_credibility(path: pathlib.Path, econ_list):
    xl = pd.ExcelFile(path)
    try_order = ['cred_scaled_main', 'cred_scaled_GJR', 'cred_scaled_decommon']
    sheet = next((s for s in try_order if s in xl.sheet_names), xl.sheet_names[0])
    df = pd.read_excel(path, sheet_name=sheet)
    if 'date' in df.columns:
        df['date'] = pd.to_datetime(df['date']); df = df.set_index('date')
    else:
        df.index = pd.to_datetime(df.iloc[:,0], errors='coerce'); df = df.set_index(df.columns[0])
    out=[]
    for e in econ_list:
        cand = [c for c in df.columns if str(c).lower().startswith('cred_sc_') and e in str(c)]
        pref = [c for c in cand if 'egarch' in c.lower()]
        col  = pref[0] if pref else (cand[0] if cand else None)
        if col is None:
            cand2 = [c for c in df.columns if c == e]
            col = cand2[0] if cand2 else None
        if col is None:
            logging.warning(f"[{e}] δεν βρέθηκε στήλη credibility στο {sheet}")
            continue
        s = df[col].rename('cred_sc_OWN').to_frame(); s['econ']=e; out.append(s)
    cred = (pd.concat(out).reset_index().rename(columns={'index':'date'})
              .set_index(['date','econ']).sort_index())
    return cred[['cred_sc_OWN']]

def hp_gap_from_gdp(gdp: pd.Series, lam=HP_LAMBDA_M) -> pd.Series:
    s = ensure_ms(pd.Series(gdp).astype(float)).dropna()
    lvl = s.replace([np.inf,-np.inf], np.nan).dropna().clip(lower=1e-8)
    loglvl = np.log(lvl).interpolate('time')
    try:
        cycle, trend = sm.tsa.filters.hpfilter(loglvl, lamb=lam)
        gap = cycle * 100.0
    except Exception:
        tr = loglvl.rolling(24, min_periods=12, center=True).mean()
        gap = (loglvl - tr) * 100.0
    return gap.reindex(s.index)

def inflation_from_index(cpi: pd.Series, kind='yoy') -> pd.Series:
    """Return inflation in percentage points (YoY or MoM annualized)."""
    cpi = ensure_ms(pd.Series(cpi).astype(float)).dropna()
    l = np.log(cpi)
    if kind == 'yoy':
        pi = 100*(l - l.shift(12))
    elif kind == 'mom_ann':
        pi = 1200*(l - l.shift(1))
    else:
        raise ValueError("kind must be 'yoy' or 'mom_ann'")
    return pi

def trend_ma(pi, k=12):
    return pi.rolling(k, min_periods=k).mean()

def rolling_SR_from_pi_gap(pi: pd.Series, gap: pd.Series, H: int,
                           min_abs_dpi=MIN_ABS_DPI,
                           forward: bool=False) -> pd.Series:
    """
    Sacrifice Ratio (SR).
    Backward [t-H+1, t] (default) ή forward [t+1, t+H] αν forward=True.
    SR = (sum of negative gaps, %-years) / |drop in inflation|,
    κρατάμε μόνο disinflations (Δπ<0), αγνοούμε πολύ μικρούς παρονομαστές με min_abs_dpi.
    """
    pi  = ensure_ms(pd.Series(pi).astype(float)).sort_index()
    gap = ensure_ms(pd.Series(gap).astype(float)).sort_index()

    if not forward:
        cost_pm = gap.clip(upper=0).abs().rolling(H, min_periods=H).sum()
        cost_py = cost_pm / 12.0
        dpi = pi - pi.shift(H)
    else:
        cost_py = (gap.clip(upper=0).abs().shift(1)
                        .rolling(H, min_periods=H).sum()) / 12.0
        dpi = pi.shift(-H) - pi

    mask = (dpi < -min_abs_dpi)
    sr = (cost_py / dpi.abs()).where(mask)

    if WINSOR_SR:
        sr = winsorize(sr, W_PCTS)
    return sr

def standardize_panel_index(df: pd.DataFrame) -> pd.DataFrame:
    if not isinstance(df.index, pd.MultiIndex):
        raise ValueError("Expect MultiIndex index.")
    names = list(df.index.names)
    econ_lv = 'econ' if 'econ' in names else names[0]
    date_lv = 'date' if 'date' in names else names[1]
    econ = df.index.get_level_values(econ_lv)
    date = pd.to_datetime(df.index.get_level_values(date_lv))
    new_index = pd.MultiIndex.from_arrays([econ, date], names=['econ','date'])
    out = df.copy()
    out.index = new_index
    return out.sort_index()

def pooled_ols_HC3(y: pd.Series, X: pd.DataFrame):
    Xc = sm.add_constant(X, has_constant='add')
    return sm.OLS(y, Xc, missing='drop').fit(cov_type='HC3')

def pooled_ols_HAC(y: pd.Series, X: pd.DataFrame, maxlags: int):
    Xc = sm.add_constant(X, has_constant='add')
    return sm.OLS(y, Xc, missing='drop').fit(cov_type='HAC', cov_kwds={'maxlags': maxlags})

def fe_safe(y: pd.Series, X: pd.DataFrame, bandwidth: int):
    df = pd.concat([y, X], axis=1).dropna()
    if df.empty or df.shape[0] <= df.shape[1] + 1:
        return None, None

    df = standardize_panel_index(df)
    yv = df.iloc[:,0]; Xv = df.iloc[:,1:]

    ent = df.index.get_level_values('econ').nunique()
    T   = df.index.get_level_values('date').nunique()

    if ent < MIN_ENTITIES_FOR_FE or T < MIN_T_FOR_FE:
        logging.info(f"FE skipped (entities={ent}, T={T})")
        return None, None

    candidates = []
    if ent >= MIN_ENTITIES_FOR_FE and T >= MIN_T_FOR_FE:
        candidates.append(('FE_DK_tw', dict(entity_effects=True,  time_effects=True)))
        candidates.append(('FE_DK_ew', dict(entity_effects=True,  time_effects=False)))

        candidates.append(('FE_DK_twonly', dict(entity_effects=False, time_effects=True)))

    for label, eff in candidates:
        try:
            mod = PanelOLS(yv, Xv, **eff)
            res = mod.fit(cov_type='kernel', kernel='bartlett',
                          bandwidth=bandwidth, drop_absorbed=True)
            return res, label
        except Exception as e:
            logging.warning(f"{label} DK failed: {e}; trying robust")
            try:
                mod = PanelOLS(yv, Xv, **eff)
                res = mod.fit(cov_type='robust', drop_absorbed=True)
                return res, label.replace('DK','robust')
            except Exception as e2:
                logging.warning(f"{label} robust failed: {e2}")
                continue
    return None, None

panel = load_inf_gdp_panel(PATH_DATA, ECON, START, END)
cred  = load_credibility(PATH_CRED, ECON)

needed = set(['INF','GDP'])
have   = set(panel.columns.get_level_values(0))
missing = needed - have
if missing:
    raise KeyError(f"Χρειάζονται στήλες {missing} στο Chapter 4 αρχείο.")

sr_dict = {}
pi_panel = []

for econ in ECON:
    sub   = panel.xs(econ, level='econ').sort_index()
    cpi_e = sub.get('INF', pd.Series(dtype=float))
    gdp_e = sub.get('GDP', pd.Series(dtype=float))
    if cpi_e.dropna().empty or gdp_e.dropna().empty:
        continue

    gap_e = hp_gap_from_gdp(gdp_e)
    pi_e  = inflation_from_index(cpi_e, kind='yoy')
    if USE_TREND_PI:
        pi_e = trend_ma(pi_e, k=TREND_K)

    tmp_pi = pi_e.to_frame('PI').copy()
    tmp_pi['econ'] = econ
    tmp_pi = tmp_pi.set_index('econ', append=True).reorder_levels(['date','econ']).sort_index()
    pi_panel.append(tmp_pi)

    for H in H_LIST:
        sr = rolling_SR_from_pi_gap(pi_e, gap_e, H, min_abs_dpi=MIN_ABS_DPI, forward=FORWARD_WINDOW)
        sr.name = f'SR_H{H}'
        df_sr = sr.to_frame()
        df_sr['econ'] = econ
        df_sr = (df_sr.set_index('econ', append=True)
                        .reorder_levels(['date','econ'])
                        .sort_index())
        sr_dict[(econ, H)] = df_sr

sr_all = pd.concat(sr_dict.values(), axis=1) if sr_dict else pd.DataFrame()
if not sr_all.empty:
    sr_all = sr_all.loc[:, ~sr_all.columns.duplicated()]

pi_all = pd.concat(pi_panel).sort_index() if pi_panel else pd.DataFrame()

df = cred.copy()
for H in H_LIST:
    col = f'SR_H{H}'
    if (not sr_all.empty) and (col in sr_all.columns):
        df = df.join(sr_all[[col]], how='left')

for L in LAGS:
    df[f'cred_lag{L}'] = df.groupby(level='econ')['cred_sc_OWN'].shift(L)

if USE_CONTROLS and not pi_all.empty:
    df = df.join(pi_all[['PI']].rename(columns={'PI':'pi_level'}), how='left')

for H in H_LIST:
    col = f"SR_H{H}"
    n_nonnull = int(df[col].notna().sum()) if col in df.columns else 0
    logging.info(f"[Coverage] Non-null {col}: {n_nonnull}")
    if col in df.columns:
        tmp = df[[col]].copy()
        g = tmp.groupby(level='econ')[col].apply(lambda s: s.notna().sum())
        logging.info(f"[Coverage per entity] {col}: " + ", ".join([f"{k}={int(v)}" for k,v in g.items()]))

summ_text = {}
rows = []

def safe_add_result(tag, res, varname, model_label=None):
    if res is None:
        summ_text[tag] = 'n/a'
        return
    try:
        b  = float(res.params.get(varname, np.nan))
        se = float((res.bse if hasattr(res,'bse') else res.std_errors).get(varname, np.nan))
        t  = float((res.tvalues if hasattr(res,'tvalues') else res.tstats).get(varname, np.nan))
        p  = float(res.pvalues.get(varname, np.nan))
    except Exception:
        b=se=t=p=np.nan
    N  = int(res.nobs) if hasattr(res, 'nobs') else np.nan
    R2 = float(getattr(res, 'rsquared', np.nan))
    label = f" [{model_label}]" if model_label else ""
    summ_text[tag] = f"N={N} | R2={R2:.3f} | b={b:.4f}, SE={se:.4f}, t={t:.3f}, p={p:.4f}{label}"

for H in H_LIST:
    ycol = f'SR_H{H}'
    if ycol not in df.columns:
        continue

    for L in LAGS:
        xname = f'cred_lag{L}'
        cols = [ycol, xname]
        if USE_CONTROLS and 'pi_level' in df.columns:
            cols.append('pi_level')

        use = df[cols].dropna()
        if use.empty:
            continue

        ent = use.index.get_level_values('econ').nunique()
        T   = use.index.get_level_values('date').nunique()
        logging.info(f"[H={H}, L={L}] after dropna: entities={ent}, T={T}, N={use.shape[0]}")

        X = use[[xname]].copy()
        if USE_CONTROLS and 'pi_level' in use.columns:
            X['pi_level'] = use['pi_level']

        res_hc3 = pooled_ols_HC3(use[ycol], X)
        rows.append(['pooled_HC3', H, L,
                     res_hc3.params.get(xname, np.nan),
                     res_hc3.bse.get(xname, np.nan),
                     res_hc3.tvalues.get(xname, np.nan),
                     res_hc3.pvalues.get(xname, np.nan),
                     int(res_hc3.nobs), getattr(res_hc3,'rsquared',np.nan)])
        safe_add_result(f'pooled_HC3_H{H}_L{L}', res_hc3, xname, 'HC3')

        res_hac = pooled_ols_HAC(use[ycol], X, maxlags=H)
        rows.append(['pooled_HAC', H, L,
                     res_hac.params.get(xname, np.nan),
                     res_hac.bse.get(xname, np.nan),
                     res_hac.tvalues.get(xname, np.nan),
                     res_hac.pvalues.get(xname, np.nan),
                     int(res_hac.nobs), getattr(res_hac,'rsquared',np.nan)])
        safe_add_result(f'pooled_HAC_H{H}_L{L}', res_hac, xname, f'HAC maxlags={H}')

        res_fe, fe_label = fe_safe(use[ycol], X, bandwidth=max(6, H))
        if res_fe is not None:
            rows.append([fe_label, H, L,
                         res_fe.params.get(xname, np.nan),
                         res_fe.std_errors.get(xname, np.nan),
                         res_fe.tstats.get(xname, np.nan),
                         res_fe.pvalues.get(xname, np.nan),
                         int(res_fe.nobs), getattr(res_fe,'rsquared',np.nan)])
            safe_add_result(f'{fe_label}_H{H}_L{L}', res_fe, xname, fe_label)

reg_table = pd.DataFrame(rows, columns=['model','H','L','coef','se','t','pval','N','R2'])

print("\n=== [Credibility Data] ===")
print(cred.reset_index().head(10))

print("\n=== [INF & GDP Panel Data] ===")
print(panel.reset_index().head(10))

if not sr_all.empty:
    print("\n=== [Sacrifice Ratio (SR) Rolling Window] ===")
    print(sr_all.reset_index().head(10))
else:
    print("\n=== [Sacrifice Ratio (SR) Rolling Window] ===\n(κενό)")

print("\n=== [Merged Data] ===")
print(df.reset_index().head(10))

if not reg_table.empty:
    print("\n=== [Regression Results Table] ===")
    print(reg_table)
else:
    print("\n=== [Regression Results Table] ===\n(κενό)")

if summ_text:
    print("\n=== [Regression Summaries] ===")
    for key, val in summ_text.items():
        print(f"{key}: {val}")
else:
    print("\n=== [Regression Summaries] ===\n(δεν υπάρχουν)")



=== [Credibility Data] ===
        date econ  cred_sc_OWN
0 2008-01-01   EU    35.432621
1 2008-01-01   UK    23.602223
2 2008-01-01   US    45.712574
3 2008-02-01   EU    24.052893
4 2008-02-01   UK     0.000000
5 2008-02-01   US     0.000000
6 2008-03-01   EU    26.468545
7 2008-03-01   UK     2.638283
8 2008-03-01   US     0.000000
9 2008-04-01   EU    26.211599

=== [INF & GDP Panel Data] ===
        date econ         CLI          GDP      INF  POLICY        SR  \
0 2008-01-01   EU  101.418597  2480679.800   89.650    3.00  3.876700   
1 2008-01-01   UK  100.907600   489783.000   84.100    5.25  5.285628   
2 2008-01-01   US  101.325800    16843.003  211.080    3.94  2.655856   
3 2008-02-01   EU  101.251102  2480679.800   89.970    3.00  4.053112   
4 2008-02-01   UK  100.603000   489783.000   84.600    5.25  5.288706   
5 2008-02-01   US  101.193100    16843.003  211.693    2.98  2.137866   
6 2008-03-01   EU  101.044951  2480679.800   90.850    3.00  4.012206   
7 2008-03-01   