In [1]:
import pandas as pd
from pathlib import Path
from statsmodels.tsa.stattools import adfuller, kpss
import numpy as np

## Helper: run ADF + KPSS and summarize

In [2]:
def unit_root_tests(series, name, adf_reg="c", kpss_reg="c"):
    """
    Run ADF (H0: unit root) and KPSS (H0: stationary) on a 1D series.

    Returns a dict with stats, p-values and a simple classification.
    """
    s = pd.Series(series).dropna()
    s = s.astype(float)

    if s.empty:
        return {
            "series": name,
            "nobs": 0,
            "adf_stat": np.nan,
            "adf_pvalue": np.nan,
            "kpss_stat": np.nan,
            "kpss_pvalue": np.nan,
            "decision": "no data"
        }

    # ADF: H0 = unit root
    adf_res = adfuller(s, regression=adf_reg, autolag="AIC")
    adf_stat, adf_pvalue = adf_res[0], adf_res[1]

    # KPSS: H0 = stationary
    # nlags="auto" (Bartlett kernel + automatic bandwidth)
    kpss_res = kpss(s, regression=kpss_reg, nlags="auto")
    kpss_stat, kpss_pvalue = kpss_res[0], kpss_res[1]

    # Combine both tests into a simple decision rule
    # (you can tweak thresholds if you prefer)
    alpha = 0.05
    if (adf_pvalue < alpha) and (kpss_pvalue > alpha):
        decision = "stationary (no unit root)"
    elif (adf_pvalue > alpha) and (kpss_pvalue < alpha):
        decision = "unit root / non-stationary"
    else:
        decision = "inconclusive / mixed"

    return {
        "series": name,
        "nobs": int(len(s)),
        "adf_stat": adf_stat,
        "adf_pvalue": adf_pvalue,
        "kpss_stat": kpss_stat,
        "kpss_pvalue": kpss_pvalue,
        "decision": decision
    }


def unit_root_table(df, cols, adf_reg="c", kpss_reg="c"):
    """
    Apply unit_root_tests to multiple columns of a DataFrame.
    """
    results = [
        unit_root_tests(df[col], col, adf_reg=adf_reg, kpss_reg=kpss_reg)
        for col in cols
    ]
    out = pd.DataFrame(results)
    return out[[
        "series",
        "nobs",
        "adf_stat",
        "adf_pvalue",
        "kpss_stat",
        "kpss_pvalue",
        "decision"
    ]]


## Apply to endogenous variables (rolling TSI)

In [3]:
import pandas as pd
from pathlib import Path

BASE_TSI_DIR = Path("../data/endogenous/prices") 
OUT_DIR      = Path("../data/endogenous/prices/differenced")

tsi_files = {
    "tsi_mhar_recov_neg": BASE_TSI_DIR / "rolling_tsi_mhar_recov_neg_recalculated.parquet",
    "tsi_mhar_recov_pos": BASE_TSI_DIR / "rolling_tsi_mhar_recov_pos_recalculated.parquet",
    "tsi_mhar_recov":     BASE_TSI_DIR / "rolling_tsi_mhar_recov_recalculated.parquet",
    "tsi_mhar_revar":     BASE_TSI_DIR / "rolling_tsi_mhar_revar_recalculated.parquet",
}

def load_tsi_df(path: Path, series_name: str) -> pd.DataFrame:
    """
    Load a rolling TSI parquet file where the index is the date and the
    single column is 'TSI_recalculated'. Returns a DF with ['date', series_name].
    """
    df = pd.read_parquet(path)

    # If date is the index, bring it out as a column
    if "date" not in df.columns:
        df = df.reset_index()

    # After reset_index, first column is the date, second is the TSI value
    # (in your case the value column is 'TSI_recalculated')
    cols = list(df.columns)
    date_col = cols[0]
    value_col = cols[1]

    df = df.rename(columns={date_col: "date", value_col: series_name})
    df["date"] = pd.to_datetime(df["date"])
    df = df.sort_values("date").reset_index(drop=True)

    return df


In [6]:
all_tsi_results = []
combined = None  # optional: to build a single DF with all 4 series

for series_name, fpath in tsi_files.items():
    df_tsi = load_tsi_df(fpath, series_name)

    # Unit-root test on this single series
    tsi_unit_root = unit_root_table(df_tsi, [series_name],
                                    adf_reg="c",
                                    kpss_reg="c")
    tsi_unit_root.insert(1, "tsi_type", series_name)

    all_tsi_results.append(tsi_unit_root)

    # build combined DF with all series (aligned by date) if you want later
    if combined is None:
        combined = df_tsi
    else:
        combined = combined.merge(df_tsi, on="date", how="outer")

# Summary table for all TSI tests
tsi_unit_root_all = pd.concat(all_tsi_results, ignore_index=True)
print("\n=== Unit-root tests for ALL rolling TSI series ===")
print(tsi_unit_root_all.round(4))

# Optional: inspect combined TSI DF
# print(combined.head())



=== Unit-root tests for ALL rolling TSI series ===
               series            tsi_type  nobs  adf_stat  adf_pvalue  \
0  tsi_mhar_recov_neg  tsi_mhar_recov_neg  1077   -0.7278      0.8394   
1  tsi_mhar_recov_pos  tsi_mhar_recov_pos  1077   -0.6529      0.8586   
2      tsi_mhar_recov      tsi_mhar_recov  1077   -1.3489      0.6065   
3      tsi_mhar_revar      tsi_mhar_revar  1077   -2.1480      0.2256   

   kpss_stat  kpss_pvalue                    decision  
0     3.7854         0.01  unit root / non-stationary  
1     4.1123         0.01  unit root / non-stationary  
2     3.9541         0.01  unit root / non-stationary  
3     3.4167         0.01  unit root / non-stationary  


look-up table. The actual p-value is smaller than the p-value returned.

  kpss_res = kpss(s, regression=kpss_reg, nlags="auto")
look-up table. The actual p-value is smaller than the p-value returned.

  kpss_res = kpss(s, regression=kpss_reg, nlags="auto")
look-up table. The actual p-value is smaller than the p-value returned.

  kpss_res = kpss(s, regression=kpss_reg, nlags="auto")
look-up table. The actual p-value is smaller than the p-value returned.

  kpss_res = kpss(s, regression=kpss_reg, nlags="auto")


In [7]:
# 1) build a single DF with all four TSI series
dfs = []
for name, path in tsi_files.items():
    dfs.append(load_tsi_df(path, name))

tsi_all = dfs[0]
for df in dfs[1:]:
    tsi_all = tsi_all.merge(df, on="date", how="outer")

tsi_all = tsi_all.sort_values("date").reset_index(drop=True)

tsi_cols = list(tsi_all.columns)
tsi_cols.remove("date")

# 2) create first differences
tsi_diff = tsi_all.copy()
for col in tsi_cols:
    tsi_diff[f"d_{col}"] = tsi_diff[col].diff()

diff_cols = [f"d_{c}" for c in tsi_cols]

# 3) run unit-root tests on first differences
tsi_diff_unit_root = unit_root_table(tsi_diff, diff_cols,
                                     adf_reg="c",
                                     kpss_reg="c")

print("\n=== Unit-root tests for FIRST DIFFERENCES of TSI ===")
print(tsi_diff_unit_root.round(4))


=== Unit-root tests for FIRST DIFFERENCES of TSI ===
                 series  nobs  adf_stat  adf_pvalue  kpss_stat  kpss_pvalue  \
0  d_tsi_mhar_recov_neg  1076  -13.9681         0.0     0.4728       0.0478   
1  d_tsi_mhar_recov_pos  1076  -11.4653         0.0     0.6835       0.0150   
2      d_tsi_mhar_recov  1076   -5.0242         0.0     0.5882       0.0237   
3      d_tsi_mhar_revar  1076  -11.8078         0.0     0.1067       0.1000   

                    decision  
0       inconclusive / mixed  
1       inconclusive / mixed  
2       inconclusive / mixed  
3  stationary (no unit root)  


look-up table. The actual p-value is greater than the p-value returned.

  kpss_res = kpss(s, regression=kpss_reg, nlags="auto")


In [8]:
# --- build combined endogenous DF ---
dfs = []
for name, path in tsi_files.items():
    dfs.append(load_tsi_df(path, name))

tsi_all = dfs[0]
for df in dfs[1:]:
    tsi_all = tsi_all.merge(df, on="date", how="outer")

tsi_all = tsi_all.sort_values("date").reset_index(drop=True)

# --- first differences ---
tsi_cols = [c for c in tsi_all.columns if c != "date"]
tsi_diff = tsi_all[["date"]].copy()

for col in tsi_cols:
    tsi_diff[f"d_{col}"] = tsi_all[col].diff()

# drop the first row (all diffs are NaN there)
tsi_diff = tsi_diff.dropna(how="all", subset=[c for c in tsi_diff.columns if c != "date"]).reset_index(drop=True)

# save
endog_path = OUT_DIR / "endogenous_tsi_diff.parquet"
tsi_diff.to_parquet(endog_path, index=False)
print(f"Saved differenced endogenous TSI to: {endog_path}")
print(tsi_diff.head())

Saved differenced endogenous TSI to: ..\data\endogenous\prices\differenced\endogenous_tsi_diff.parquet
        date  d_tsi_mhar_recov_neg  d_tsi_mhar_recov_pos  d_tsi_mhar_recov  \
0 2022-05-21              0.565159              0.353851          0.318845   
1 2022-05-22              0.028779              0.053486          0.018242   
2 2022-05-23              0.017773              0.001561          0.009926   
3 2022-05-24             -0.010183             -0.026224         -0.020802   
4 2022-05-25              0.021521             -0.015650         -0.001137   

   d_tsi_mhar_revar  
0          0.464115  
1          0.090025  
2         -0.035204  
3         -0.033416  
4         -0.004788  


## Apply to exogenous variables

In [9]:
EXOG_IN_PATH  = Path("../data/exogenous/all_exog.parquet")  
EXOG_OUT_PATH = Path("../data/exogenous/all_exog_ready.parquet")

In [10]:
exog = pd.read_parquet(EXOG_IN_PATH).copy()
exog.drop(columns=['year', 'month', 'day'], inplace=True)

# Normalize date column
if "date" not in exog.columns:
    for alt in ["date_local", "Date"]:
        if alt in exog.columns:
            exog = exog.rename(columns={alt: "date"})
            break

exog["date"] = pd.to_datetime(exog["date"]).dt.normalize()
exog = exog.sort_values("date").reset_index(drop=True)

print("Exog date range:", exog["date"].min().date(), "→", exog["date"].max().date())
print("Exog columns:", list(exog.columns))

Exog date range: 2021-05-21 → 2025-04-30
Exog columns: ['date', 'iberian_exception', 'TTF', 'co2', 'coal', 'load_energy_mwh_es', 'load_energy_mwh_fr', 'load_energy_mwh_pt', 'flow_net_mw_ES_FR', 'flow_net_mw_ES_PT', 'cac_eur_pts', 'ibex_eur_pts', 'psi_eur_pts']


In [11]:
# ---------------------------------------------------------------------
# Decide which columns are candidates for stationarity testing
# ---------------------------------------------------------------------
EXOG_DUMMY_COLS = [c for c in ["iberian_exception"] if c in exog.columns]

drop_non_reg = {"date", "year", "month", "day"}
numeric_cols = [c for c in exog.columns if c not in drop_non_reg and pd.api.types.is_numeric_dtype(exog[c])]

# exclude dummies from unit-root screening (keep as levels)
test_cols = [c for c in numeric_cols if c not in EXOG_DUMMY_COLS]

print("\nTesting these exogenous columns (levels):", test_cols)
print("Keeping these as dummies/levels:", EXOG_DUMMY_COLS)


Testing these exogenous columns (levels): ['TTF', 'co2', 'coal', 'load_energy_mwh_es', 'load_energy_mwh_fr', 'load_energy_mwh_pt', 'flow_net_mw_ES_FR', 'flow_net_mw_ES_PT', 'cac_eur_pts', 'ibex_eur_pts', 'psi_eur_pts']
Keeping these as dummies/levels: ['iberian_exception']


In [12]:
# ---------------------------------------------------------------------
# Unit-root tests on levels and choose which to difference
# ---------------------------------------------------------------------
alpha = 0.05
exog_unit_root = unit_root_table(exog, test_cols, adf_reg="c", kpss_reg="ct")
print("\n=== Unit-root tests for exogenous variables (levels) ===")
print(exog_unit_root.round(4))

# Non-stationary if ADF fails to reject OR KPSS rejects stationarity
is_nonstationary = (exog_unit_root["adf_pvalue"] > alpha) | (exog_unit_root["kpss_pvalue"] < alpha)

nonstat_cols = exog_unit_root.loc[is_nonstationary, "series"].tolist()
stat_cols    = exog_unit_root.loc[~is_nonstationary, "series"].tolist()

print("\nNon-stationary (difference these):", nonstat_cols)
print("Stationary (keep in levels):", stat_cols)

look-up table. The actual p-value is smaller than the p-value returned.

  kpss_res = kpss(s, regression=kpss_reg, nlags="auto")
look-up table. The actual p-value is smaller than the p-value returned.

  kpss_res = kpss(s, regression=kpss_reg, nlags="auto")
look-up table. The actual p-value is smaller than the p-value returned.

  kpss_res = kpss(s, regression=kpss_reg, nlags="auto")
look-up table. The actual p-value is smaller than the p-value returned.

  kpss_res = kpss(s, regression=kpss_reg, nlags="auto")
look-up table. The actual p-value is smaller than the p-value returned.

  kpss_res = kpss(s, regression=kpss_reg, nlags="auto")
look-up table. The actual p-value is greater than the p-value returned.

  kpss_res = kpss(s, regression=kpss_reg, nlags="auto")



=== Unit-root tests for exogenous variables (levels) ===
                series  nobs  adf_stat  adf_pvalue  kpss_stat  kpss_pvalue  \
0                  TTF  1441   -2.2543      0.1871     0.4505       0.0100   
1                  co2  1441   -3.1521      0.0229     0.7558       0.0100   
2                 coal  1441   -1.5538      0.5068     0.5172       0.0100   
3   load_energy_mwh_es  1441   -2.0083      0.2829     0.8738       0.0100   
4   load_energy_mwh_fr  1441   -1.7030      0.4296     0.5145       0.0100   
5   load_energy_mwh_pt  1441   -3.6906      0.0042     0.1167       0.1000   
6    flow_net_mw_ES_FR  1441   -5.3089      0.0000     0.4534       0.0100   
7    flow_net_mw_ES_PT  1441   -3.8600      0.0023     0.0980       0.1000   
8          cac_eur_pts  1441   -2.0117      0.2815     0.3310       0.0100   
9         ibex_eur_pts  1441    0.2913      0.9769     1.1100       0.0100   
10         psi_eur_pts  1441   -1.8111      0.3750     0.1425       0.0565   

     

look-up table. The actual p-value is smaller than the p-value returned.

  kpss_res = kpss(s, regression=kpss_reg, nlags="auto")
look-up table. The actual p-value is greater than the p-value returned.

  kpss_res = kpss(s, regression=kpss_reg, nlags="auto")
look-up table. The actual p-value is smaller than the p-value returned.

  kpss_res = kpss(s, regression=kpss_reg, nlags="auto")
look-up table. The actual p-value is smaller than the p-value returned.

  kpss_res = kpss(s, regression=kpss_reg, nlags="auto")


In [13]:
# ---------------------------------------------------------------------
# Build regression-ready exog frame
# ---------------------------------------------------------------------
X = exog[["date"]].copy()

# Keep stationary in levels
for c in stat_cols:
    X[c] = exog[c]

# Difference only non-stationary
for c in nonstat_cols:
    X[f"d_{c}"] = exog[c].diff()

# Keep dummies as-is
for c in EXOG_DUMMY_COLS:
    X[c] = exog[c].fillna(0).astype(int)


In [14]:
# Verify differenced series stationarity
diff_cols = [f"d_{c}" for c in nonstat_cols]
if diff_cols:
    exog_diff_unit_root = unit_root_table(X, diff_cols, adf_reg="c", kpss_reg="c")
    print("\n=== Unit-root tests for differenced exogenous variables (only those differenced) ===")
    print(exog_diff_unit_root.round(4))

# Drop rows with NaNs induced by differencing (and any other missing values in regressors)
reg_cols = [c for c in X.columns if c != "date"]
X_out = X.dropna(subset=reg_cols).reset_index(drop=True)

print("\nRegression-ready exog shape:", X_out.shape)
print(X_out.head())

look-up table. The actual p-value is greater than the p-value returned.

  kpss_res = kpss(s, regression=kpss_reg, nlags="auto")
look-up table. The actual p-value is greater than the p-value returned.

  kpss_res = kpss(s, regression=kpss_reg, nlags="auto")
look-up table. The actual p-value is greater than the p-value returned.

  kpss_res = kpss(s, regression=kpss_reg, nlags="auto")
look-up table. The actual p-value is greater than the p-value returned.

  kpss_res = kpss(s, regression=kpss_reg, nlags="auto")
look-up table. The actual p-value is greater than the p-value returned.

  kpss_res = kpss(s, regression=kpss_reg, nlags="auto")



=== Unit-root tests for differenced exogenous variables (only those differenced) ===
                 series  nobs  adf_stat  adf_pvalue  kpss_stat  kpss_pvalue  \
0                 d_TTF  1440  -13.3411         0.0     0.0708          0.1   
1                 d_co2  1440  -34.5145         0.0     0.0833          0.1   
2                d_coal  1440   -7.9576         0.0     0.1348          0.1   
3  d_load_energy_mwh_es  1440   -7.9721         0.0     0.0876          0.1   
4  d_load_energy_mwh_fr  1440   -8.4171         0.0     0.0898          0.1   
5   d_flow_net_mw_ES_FR  1440  -11.1158         0.0     0.1207          0.1   
6         d_cac_eur_pts  1440  -40.3742         0.0     0.0345          0.1   
7        d_ibex_eur_pts  1440  -26.0407         0.0     0.2903          0.1   
8         d_psi_eur_pts  1440  -38.0719         0.0     0.0165          0.1   

                    decision  
0  stationary (no unit root)  
1  stationary (no unit root)  
2  stationary (no unit root)  

look-up table. The actual p-value is greater than the p-value returned.

  kpss_res = kpss(s, regression=kpss_reg, nlags="auto")
look-up table. The actual p-value is greater than the p-value returned.

  kpss_res = kpss(s, regression=kpss_reg, nlags="auto")
look-up table. The actual p-value is greater than the p-value returned.

  kpss_res = kpss(s, regression=kpss_reg, nlags="auto")
look-up table. The actual p-value is greater than the p-value returned.

  kpss_res = kpss(s, regression=kpss_reg, nlags="auto")


In [15]:
X_out.head()

Unnamed: 0,date,load_energy_mwh_pt,flow_net_mw_ES_PT,d_TTF,d_co2,d_coal,d_load_energy_mwh_es,d_load_energy_mwh_fr,d_flow_net_mw_ES_FR,d_cac_eur_pts,d_ibex_eur_pts,d_psi_eur_pts,iberian_exception
0,2021-05-22,119301.0,1609.2125,0.0,0.0,0.0,-70887.0,-98050.0,71.2875,0.0,0.0,0.0,0
1,2021-05-23,110127.0,1017.291667,0.0,0.0,0.0,-46933.0,-38600.0,20.745833,0.0,0.0,0.0,0
2,2021-05-24,129947.0,1429.829167,-0.030001,1.39,-0.042563,89538.0,7100.0,-358.241667,22.080078,1.5,-49.849609,0
3,2021-05-25,134131.0,400.925,1.43,-1.44,0.00421,22154.0,175500.0,432.154167,-18.220215,3.200195,0.0,0
4,2021-05-26,134276.0,666.6875,0.24,1.57,0.508413,4884.0,-8950.0,-331.120833,1.330078,-11.799805,-1.380371,0


In [16]:
# ---------------------------------------------------------------------
# Save
# ---------------------------------------------------------------------
EXOG_OUT_PATH.parent.mkdir(parents=True, exist_ok=True)
X_out.to_parquet(EXOG_OUT_PATH, index=False)
print(f"\nSaved regression-ready exogenous variables to: {EXOG_OUT_PATH}")


Saved regression-ready exogenous variables to: ..\data\exogenous\all_exog_ready.parquet
