MAIN MODEL – Forecasting Sri Lankan Yield Curve Factors

Import Libraries

In [None]:
import numpy as np
import pandas as pd

from sklearn.ensemble import RandomForestRegressor
from statsmodels.tsa.ar_model import AutoReg
from scipy.stats import t as student_t


Settings and inputs

In [None]:
FILE_PATH = "DATA.csv"
DATE_COL = "Date"

YIELD_COLS = [
    "3M", "6M", "1Y", "2Y", "3Y",
    "4Y", "5Y", "6Y", "10Y", "15Y", "20Y"
]

MATURITIES_YEARS = np.array([
    0.25,   # 3M
    0.5,    # 6M
    1.0,    # 1Y
    2.0,    # 2Y
    3.0,    # 3Y
    4.0,    # 4Y
    5.0,    # 5Y
    6.0,    # 6Y
    10.0,   # 10Y
    15.0,   # 15Y
    20.0    # 20Y
], dtype=float)

NS_TAU = 1.5  # fixed NS decay parameter

TRAIN_START = "2014-01-01"
TRAIN_END   = "2021-12-31"
TEST_START  = "2022-01-01"
TEST_END    = "2025-11-30"

FORECAST_HORIZONS = [20, 120]

# rolling Window
MAX_TRAIN_LENGTH = 1000
RF_LAGS = 5
RANDOM_STATE = 42


Load yield data

In [None]:
df = pd.read_csv(FILE_PATH)

df[DATE_COL] = pd.to_datetime(df[DATE_COL], dayfirst=True)
df = df.sort_values(DATE_COL)
df.set_index(DATE_COL, inplace=True)

print(df.head())
print(df.columns)
print(df.index.min(), df.index.max())


              3M    6M    1Y    2Y    3Y    4Y     5Y     6Y    10Y    15Y  \
Date                                                                         
2014-01-01  7.52  7.83  8.27  8.86  9.05  9.54  10.29  10.37  10.89  11.54   
2014-01-02  7.28  7.48  7.80  8.23  8.83  9.10   9.68  10.04  10.86  11.60   
2014-01-03  7.32  7.54  7.91  8.44  8.66  8.86   9.91  10.05  10.36  11.07   
2014-01-06  7.23  7.47  7.88  8.39  8.63  8.96   9.63   9.99  10.38  11.08   
2014-01-07  7.14  7.33  7.67  8.46  8.51  9.16   9.72   9.95  10.18  11.11   

              20Y  
Date               
2014-01-01  11.61  
2014-01-02  11.56  
2014-01-03  11.29  
2014-01-06  11.22  
2014-01-07  11.50  
Index(['3M', '6M', '1Y', '2Y', '3Y', '4Y', '5Y', '6Y', '10Y', '15Y', '20Y'], dtype='object')
2014-01-01 00:00:00 2025-10-30 00:00:00


In [None]:
df_yields = df[YIELD_COLS].astype(float)
df_yields = df_yields.interpolate(method="time")


Define NS loadings and estimate level, slope, and curvature factors for each date.

In [None]:
def ns_loadings(maturities, tau):
    x = maturities / tau
    x = np.where(x == 0, 1e-6, x)
    L1 = (1 - np.exp(-x)) / x
    L2 = L1 - np.exp(-x)
    return L1, L2

def estimate_ns_betas_row(yields_row, maturities, tau):
    y = np.array(yields_row, dtype=float)
    mask = ~np.isnan(y)
    y = y[mask]
    m = maturities[mask]
    if len(y) < 3:
        return np.array([np.nan, np.nan, np.nan])
    L1, L2 = ns_loadings(m, tau)
    L1, L2 = L1[mask], L2[mask]
    X = np.column_stack([np.ones_like(m), L1, L2])
    beta = np.linalg.lstsq(X, y, rcond=None)[0]
    return beta  # beta0, beta1, beta2

def estimate_ns_betas(df_y, maturities, tau):
    betas = []
    for _, row in df_y.iterrows():
        betas.append(estimate_ns_betas_row(row.values, maturities, tau))
    betas = np.array(betas)
    df_b = pd.DataFrame(
        betas, index=df_y.index,
        columns=["beta0", "beta1", "beta2"]
    )
    return df_b


In [None]:
df_betas = estimate_ns_betas(df_yields, MATURITIES_YEARS, NS_TAU)
df_betas = df_betas.dropna()


common_idx = df_yields.index.intersection(df_betas.index)
df_yields = df_yields.loc[common_idx]
df_betas  = df_betas.loc[common_idx]


df_betas.to_csv("ns_betas_all.csv")

df_betas.head()


Unnamed: 0_level_0,beta0,beta1,beta2
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2014-01-01,12.188816,-4.693285,-2.995884
2014-01-02,12.386834,-5.035773,-4.848685
2014-01-03,11.822269,-4.481531,-3.627143
2014-01-06,11.785633,-4.517516,-3.616667
2014-01-07,11.879325,-4.77728,-3.588571


Define training and testing windows

In [None]:
train_mask = (df_betas.index >= TRAIN_START) & (df_betas.index <= TRAIN_END)
test_mask  = (df_betas.index >= TEST_START)  & (df_betas.index <= TEST_END)

train_dates = df_betas.index[train_mask]
test_dates  = df_betas.index[test_mask]

print("Train:", train_dates.min(), "→", train_dates.max())
print("Test :", test_dates.min(),  "→", test_dates.max())
print("Number of test dates:", len(test_dates))


Train: 2014-01-01 00:00:00 → 2021-12-31 00:00:00
Test : 2022-01-03 00:00:00 → 2025-10-30 00:00:00
Number of test dates: 915


In [None]:
def reconstruct_ns_curve(betas, maturities, tau):
    beta0, beta1, beta2 = betas
    L1, L2 = ns_loadings(maturities, tau)
    return beta0 + beta1 * L1 + beta2 * L2


In [None]:
def make_lagged_matrix(series, n_lags):
    series = np.array(series, dtype=float)
    X, y = [], []
    for t in range(n_lags, len(series)):
        X.append(series[t-n_lags:t])
        y.append(series[t])
    return np.array(X), np.array(y)

def rf_multi_step_forecast(series, horizon, n_lags=5, random_state=42):
    series = np.array(series, dtype=float)

    if len(series) <= n_lags + 5:

        return np.repeat(series[-1], horizon)

    X, y = make_lagged_matrix(series, n_lags)

    rf = RandomForestRegressor(
        n_estimators=200,
        max_depth=6,
        min_samples_leaf=5,
        n_jobs=-1,
        random_state=random_state
    )
    rf.fit(X, y)

    history = list(series[-n_lags:])
    preds = []
    for _ in range(horizon):
        x_input = np.array(history[-n_lags:]).reshape(1, -1)
        y_pred = rf.predict(x_input)[0]
        preds.append(y_pred)
        history.append(y_pred)

    return np.array(preds)


Generate factor forecasts at the chosen horizon

In [None]:
def run_forecast_single_model(model_name, horizon):
    """
    model_name: 'RW', 'AR1', or 'RF'
    horizon   : forecast horizon in days (e.g. 20 or 120)

    Returns dict of DataFrames:
      - betas_hat
      - yields_hat
      - yields_actual
      - squared_errors
    All also saved to CSV.
    """
    forecast_betas = []   # [beta0, beta1, beta2]
    forecast_yields = []  # NS curve yields
    actual_yields = []    # actual yields on target date
    target_dates = []     # target dates (t+h)
    squared_errors = []   # mean squared error across maturities

    total_origins = len(test_dates)
    print(f"\nRunning model={model_name}, horizon={horizon} days")
    print("Total forecast origins:", total_origins)

    for i, origin_date in enumerate(test_dates, start=1):
        idx_origin = df_betas.index.get_loc(origin_date)
        idx_target = idx_origin + horizon
        if idx_target >= len(df_betas):
            break

        target_date = df_betas.index[idx_target]

        # training ends the day before origin
        train_end_idx = idx_origin - 1
        if train_end_idx < 0:
            continue

        # rolling window length
        if MAX_TRAIN_LENGTH is None:
            train_start_idx = 0
        else:
            train_start_idx = max(0, train_end_idx - MAX_TRAIN_LENGTH + 1)

        train_betas  = df_betas.iloc[train_start_idx:train_end_idx+1]
        train_yields = df_yields.iloc[train_start_idx:train_end_idx+1]


        if train_betas.index[-1] < pd.to_datetime(TRAIN_END):
            continue

        beta_hat = []

        # forecast each beta
        for col in ["beta0", "beta1", "beta2"]:
            series = train_betas[col].values

            if model_name == "RW":
                # random walk: last value carried forward
                forecast_series = np.repeat(series[-1], horizon)
                beta_hat.append(forecast_series[-1])

            elif model_name == "AR1":
                if len(series) > 5:
                    ar_model = AutoReg(series, lags=1, old_names=False).fit()
                    ar_pred = ar_model.predict(start=len(series),
                                               end=len(series)+horizon-1)
                    beta_hat.append(ar_pred[-1])
                else:
                    beta_hat.append(series[-1])

            elif model_name == "RF":
                if len(series) > RF_LAGS + 10:
                    try:
                        rf_pred = rf_multi_step_forecast(
                            series,
                            horizon=horizon,
                            n_lags=RF_LAGS,
                            random_state=RANDOM_STATE
                        )
                        beta_hat.append(rf_pred[-1])
                    except ValueError:
                        beta_hat.append(series[-1])
                else:
                    beta_hat.append(series[-1])

            else:
                raise ValueError("Unknown model_name. Use 'RW', 'AR1', or 'RF'.")

        beta_hat = np.array(beta_hat)
        y_hat = reconstruct_ns_curve(beta_hat, MATURITIES_YEARS, NS_TAU)
        y_actual = df_yields.loc[target_date, YIELD_COLS].values.astype(float)

        mse = np.mean((y_hat - y_actual) ** 2)

        forecast_betas.append(beta_hat)
        forecast_yields.append(y_hat)
        actual_yields.append(y_actual)
        target_dates.append(target_date)
        squared_errors.append(mse)

        if i % 50 == 0:
            print(f"Processed {i} / {total_origins} origins")

    # convert to DataFrames
    df_betas_hat = pd.DataFrame(
        forecast_betas,
        index=pd.to_datetime(target_dates),
        columns=["beta0_hat", "beta1_hat", "beta2_hat"]
    )

    df_yields_hat = pd.DataFrame(
        forecast_yields,
        index=pd.to_datetime(target_dates),
        columns=YIELD_COLS
    )

    df_yields_actual = pd.DataFrame(
        actual_yields,
        index=pd.to_datetime(target_dates),
        columns=YIELD_COLS
    )

    df_se = pd.DataFrame(
        {"MSE": squared_errors},
        index=pd.to_datetime(target_dates)
    )

    # file name pattern
    tag = f"{model_name}_h{horizon}"

    df_betas_hat.to_csv(f"betas_{tag}.csv")
    df_yields_hat.to_csv(f"yields_{tag}.csv")
    df_yields_actual.to_csv(f"actual_{tag}.csv")
    df_se.to_csv(f"squared_errors_{tag}.csv")

    print(f"Saved: betas_{tag}.csv, yields_{tag}.csv, actual_{tag}.csv, squared_errors_{tag}.csv")

    return {
        "betas_hat": df_betas_hat,
        "yields_hat": df_yields_hat,
        "yields_actual": df_yields_actual,
        "squared_errors": df_se
    }


Run forecasts for each horizon





In [None]:
results_RW_20  = run_forecast_single_model("RW",  horizon=20)
results_RW_120 = run_forecast_single_model("RW",  horizon=120)



Running model=RW, horizon=20 days
Total forecast origins: 915
Processed 50 / 915 origins
Processed 100 / 915 origins
Processed 150 / 915 origins
Processed 200 / 915 origins
Processed 250 / 915 origins
Processed 300 / 915 origins
Processed 350 / 915 origins
Processed 400 / 915 origins
Processed 450 / 915 origins
Processed 500 / 915 origins
Processed 550 / 915 origins
Processed 600 / 915 origins
Processed 650 / 915 origins
Processed 700 / 915 origins
Processed 750 / 915 origins
Processed 800 / 915 origins
Processed 850 / 915 origins
Saved: betas_RW_h20.csv, yields_RW_h20.csv, actual_RW_h20.csv, squared_errors_RW_h20.csv

Running model=RW, horizon=120 days
Total forecast origins: 915
Processed 50 / 915 origins
Processed 100 / 915 origins
Processed 150 / 915 origins
Processed 200 / 915 origins
Processed 250 / 915 origins
Processed 300 / 915 origins
Processed 350 / 915 origins
Processed 400 / 915 origins
Processed 450 / 915 origins
Processed 500 / 915 origins
Processed 550 / 915 origins
Pr

In [None]:
results_AR1_20  = run_forecast_single_model("AR1", horizon=20)
results_AR1_120 = run_forecast_single_model("AR1", horizon=120)



Running model=AR1, horizon=20 days
Total forecast origins: 915
Processed 50 / 915 origins
Processed 100 / 915 origins
Processed 150 / 915 origins
Processed 200 / 915 origins
Processed 250 / 915 origins
Processed 300 / 915 origins
Processed 350 / 915 origins
Processed 400 / 915 origins
Processed 450 / 915 origins
Processed 500 / 915 origins
Processed 550 / 915 origins
Processed 600 / 915 origins
Processed 650 / 915 origins
Processed 700 / 915 origins
Processed 750 / 915 origins
Processed 800 / 915 origins
Processed 850 / 915 origins
Saved: betas_AR1_h20.csv, yields_AR1_h20.csv, actual_AR1_h20.csv, squared_errors_AR1_h20.csv

Running model=AR1, horizon=120 days
Total forecast origins: 915
Processed 50 / 915 origins
Processed 100 / 915 origins
Processed 150 / 915 origins
Processed 200 / 915 origins
Processed 250 / 915 origins
Processed 300 / 915 origins
Processed 350 / 915 origins
Processed 400 / 915 origins
Processed 450 / 915 origins
Processed 500 / 915 origins
Processed 550 / 915 orig

In [None]:
results_RF_20  = run_forecast_single_model("RF",  horizon=20)
results_RF_120 = run_forecast_single_model("RF",  horizon=120)


In [None]:
import numpy as np
import pandas as pd
from scipy.stats import t as student_t

# Load true NS betas
df_betas_all = pd.read_csv("ns_betas_all.csv", index_col=0, parse_dates=True)
df_betas_all.head()


Unnamed: 0_level_0,beta0,beta1,beta2
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2014-01-01,12.188816,-4.693285,-2.995884
2014-01-02,12.386834,-5.035773,-4.848685
2014-01-03,11.822269,-4.481531,-3.627143
2014-01-06,11.785633,-4.517516,-3.616667
2014-01-07,11.879325,-4.77728,-3.588571


Forecast evaluation (RMSE and Diebold–Mariano)

In [None]:
def newey_west_variance(d, h):
    """
    Newey–West HAC variance estimator for DM test.
    d: loss differential series
    h: forecast horizon (used as truncation lag)
    """
    d = np.array(d, dtype=float)
    T = len(d)
    d_c = d - d.mean()
    gamma0 = np.dot(d_c, d_c) / T
    var = gamma0
    for lag in range(1, h):
        cov = np.dot(d_c[lag:], d_c[:-lag]) / T
        weight = 1 - lag / h
        var += 2 * weight * cov
    return var

def dm_test(loss1, loss2, h):
    """
    Diebold–Mariano test for equal predictive accuracy.
    loss1, loss2: loss series (e.g. squared errors) from two models
    h: forecast horizon
    """
    loss1 = np.array(loss1, dtype=float)
    loss2 = np.array(loss2, dtype=float)
    if len(loss1) != len(loss2):
        raise ValueError("Loss series must have same length.")
    d = loss1 - loss2
    T = len(d)
    d_bar = d.mean()
    var_d = newey_west_variance(d, h)
    dm_stat = d_bar / np.sqrt(var_d / T)
    p_value = 2 * (1 - student_t.cdf(abs(dm_stat), df=T-1))
    return dm_stat, p_value


In [None]:
def compute_factor_metrics_for_horizon(h):
    models = ["RW", "AR1", "RF"]
    factors = ["Level", "Slope", "Curvature"]

    # store squared errors per model & factor
    se_factor = {}   # dict: model -> DataFrame with columns Level/Slope/Curvature

    #build squared errors for each model
    for m in models:
        fname = f"betas_{m}_h{h}.csv"
        df_hat = pd.read_csv(fname, index_col=0, parse_dates=True)

        # true betas on the same forecast target dates
        df_true = df_betas_all.loc[df_hat.index][["beta0", "beta1", "beta2"]]

        # compute squared errors factor wise
        se_df = pd.DataFrame({
            "Level":      (df_hat["beta0_hat"] - df_true["beta0"])**2,
            "Slope":      (df_hat["beta1_hat"] - df_true["beta1"])**2,
            "Curvature":  (df_hat["beta2_hat"] - df_true["beta2"])**2,
        }, index=df_hat.index)

        se_factor[m] = se_df

        #  save squared errors
        se_df.to_csv(f"squared_errors_factors_{m}_h{h}.csv")

    #RMSE per model & factor
    rmse_table = pd.DataFrame(index=models, columns=factors, dtype=float)

    for m in models:
        rmse_table.loc[m, "Level"]     = np.sqrt(se_factor[m]["Level"].mean())
        rmse_table.loc[m, "Slope"]     = np.sqrt(se_factor[m]["Slope"].mean())
        rmse_table.loc[m, "Curvature"] = np.sqrt(se_factor[m]["Curvature"].mean())

    print(f"\nRMSE for factors (h={h}):")
    print(rmse_table)

    rmse_table.to_csv(f"rmse_factors_h{h}.csv")

    # DM tests per factor
    pairs = [("RW", "AR1"), ("RW", "RF"), ("AR1", "RF")]

    rows = []
    for (m1, m2) in pairs:
        for f in factors:
            se1 = se_factor[m1][f]
            se2 = se_factor[m2][f]


            idx_common = se1.index.intersection(se2.index)
            se1_aligned = se1.loc[idx_common]
            se2_aligned = se2.loc[idx_common]

            dm_stat, p_val = dm_test(se1_aligned, se2_aligned, h=h)

            rows.append({
                "model1": m1,
                "model2": m2,
                "factor": f,
                "horizon": h,
                "DM_stat": dm_stat,
                "p_value": p_val
            })

    df_dm = pd.DataFrame(rows)
    print(f"\nDM test results for factors (h={h}):")
    print(df_dm)

    df_dm.to_csv(f"dm_factors_h{h}.csv", index=False)

    return rmse_table, df_dm


Compute RMSE and DM statistics for 20 and 120 days.

In [None]:
rmse_20, dm_20   = compute_factor_metrics_for_horizon(20)
rmse_120, dm_120 = compute_factor_metrics_for_horizon(120)



RMSE for factors (h=20):
        Level     Slope  Curvature
RW   1.695509  1.543512   4.378590
AR1  1.961886  1.675774   4.633240
RF   1.790409  1.586260   4.253188

DM test results for factors (h=20):
  model1 model2     factor  horizon   DM_stat   p_value
0     RW    AR1      Level       20 -1.262417  0.207128
1     RW    AR1      Slope       20 -1.969450  0.049210
2     RW    AR1  Curvature       20 -0.855706  0.392390
3     RW     RF      Level       20 -1.002729  0.316263
4     RW     RF      Slope       20 -1.395204  0.163301
5     RW     RF  Curvature       20  1.368963  0.171355
6    AR1     RF      Level       20  0.715720  0.474351
7    AR1     RF      Slope       20  1.338384  0.181111
8    AR1     RF  Curvature       20  1.470735  0.141715

RMSE for factors (h=120):
         Level     Slope  Curvature
RW    4.793116  4.641371   8.267354
AR1  13.067448  4.922405   6.804007
RF    4.913760  4.661253   8.052094

DM test results for factors (h=120):
  model1 model2     factor  