<a href="https://colab.research.google.com/github/Billy-Drunkenstein/Jupiter/blob/main/Feature_Validation/test_OLS.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from pathlib import Path
import os
from typing import Sequence, Union
import numpy as np
import pandas as pd
from typing import List, Tuple
from sklearn.linear_model import LinearRegression
from tqdm import tqdm

In [None]:
base_dir = "/gpfs/hddfs/shared/tzheng_ryin"
features_dir = "cnfut/cnfut_snap_pool_feather"
targets_dir = "cnfut/cnfut_snap_y_feather"
ref_headers_dir = "cnfut/FeatureHeaders.00000000"
output_dir = "cnfut_meta_matrices"

features_path = os.path.join(base_dir, features_dir)
targets_path = os.path.join(base_dir, targets_dir)
calendar_path = os.path.join(base_dir, "calendar/cn_calendar")
reference_headers_path = os.path.join(base_dir, ref_headers_dir)
output_path = Path(os.path.join(base_dir, output_dir))

In [None]:
target_cols = ["y60r05"]

# Read

In [None]:
with open(calendar_path, "r", encoding="utf-8") as f:
    calendar = [int(line.rstrip("\n")) for line in f]

try:
    with open(reference_headers_path, "r", encoding="utf-8") as f:
        ref_headers = [line.rstrip("\n") for line in f]
except FileNotFoundError as e:
    raise FileNotFoundError(
        "Reference Headers Not Found: make sure FeatureHeaders.00000000 exists in cnfut directory"
    ) from e


In [None]:
def read_meta_matrices(output_path, date):
    out = Path(output_path)
    headers_path = out / f"FeatureHeaders.{date}"
    xx_path = out / f"XX.{date}.csv"
    corr_path = out / f"FeatureCorr.{date}.csv"

    # Headers: list[str]
    with open(headers_path, "r", encoding="utf-8") as f:
        headers = [line.rstrip("\n") for line in f]

    # XX: np.ndarray shape (p+1, p+1)
    XX = np.loadtxt(xx_path, delimiter=",")

    # FeatureCorr: pd.DataFrame
    corr = pd.read_csv(corr_path)

    return headers, XX, corr


def assemble_dates(
    calendar: List[str], yyyymmdd: str, back_horizon: int
):
    if back_horizon < 0:
        raise ValueError("Backtest horizon must be positive")

    if yyyymmdd not in calendar:
        raise ValueError(f"Date currently restricted to cn_calendar dates")

    i = calendar.index(yyyymmdd)
    start_idx = i - (back_horizon - 1)
    if start_idx < 0:
        raise ValueError(
            f"Insufficient calendar history for {yyyymmdd} and backtest horizon {back_horizon}"
        )

    return calendar[start_idx : i + 1]


In [None]:
asof = 20251030

in_sample_dates = assemble_dates(calendar, asof, 30)

missing = []
broken = []

for d in in_sample_dates:
    p_headers = output_path / f"FeatureHeaders.{d}"
    p_corr = output_path / f"FeatureCorr.{d}.csv"
    p_xx = output_path / f"XX.{d}.csv"

    e_headers = p_headers.exists()
    e_corr = p_corr.exists()
    e_xx = p_xx.exists()

    n_exist = int(e_headers) + int(e_corr) + int(e_xx)

    if n_exist == 0:
        missing.append(d)
    elif n_exist != 3:
        broken.append(d)

if broken:
    raise FileNotFoundError(f"Broken dates: {broken}")
if missing:
    raise FileNotFoundError(f"Missing dates: {missing}")


# Assemble Meta Data

In [None]:
p = len(ref_headers)
fid_ref = [f"F{i}" for i in range(p)]
prefixes = [f"Y{k}_" for k in range(len(target_cols))]

xx = np.zeros((p, p), dtype=np.float32)

N = {pref: np.int64(0) for pref in prefixes}
x_sum = {pref: np.zeros((p,), dtype=np.float32) for pref in prefixes}
y_sum = {pref: np.float32(0.0) for pref in prefixes}
yy = {pref: np.float32(0.0) for pref in prefixes}
xy = {pref: np.zeros((p,), dtype=np.float32) for pref in prefixes}

for d in in_sample_dates:
    headers_path = output_path / f"FeatureHeaders.{d}"
    corr_path = output_path / f"FeatureCorr.{d}.csv"
    xx_path = output_path / f"XX.{d}.csv"

    with headers_path.open("r", encoding="utf-8") as f:
        headers = [line.strip() for line in f if line.strip()]
    if headers != ref_headers:
        raise ValueError(f"FeatureHeaders mismatch at {d}")

    XX_d = pd.read_csv(xx_path, header=None).to_numpy(dtype=np.float32, copy=False)
    if XX_d.shape != (p + 1, p + 1):
        raise ValueError(f"XX shape mismatch: expected {(p+1, p+1)}, got {XX_d.shape}")

    XTX_d = XX_d[1:, :p]
    if XTX_d.shape != (p, p):
        raise ValueError(f"Bad XTX block at {d}: expected {(p, p)}, got {XTX_d.shape}")

    if not np.all(XX_d[1:, p] == 0):
        raise ValueError("Unexpected nonzero entries in XX last column")

    xx += XTX_d


    df = pd.read_csv(corr_path)

    if "fid" not in df.columns:
        raise ValueError(f"Missing fid {d}")
    if df.shape[0] != p:
        raise ValueError("Feature Column Count Mismatch")
    if df["fid"].tolist() != fid_ref:
        raise ValueError(f"fid mismatch {d}")

    for pref in prefixes:
        need = [f"{pref}N", f"{pref}X", f"{pref}XX", f"{pref}Y", f"{pref}YY", f"{pref}XY"]
        miss = [c for c in need if c not in df.columns]
        if miss:
            raise ValueError(f"Missing columns {d}: {miss}")

        N_d = np.int64(df[f"{pref}N"].iloc[0])
        X_d = df[f"{pref}X"].to_numpy(dtype=np.float32, copy=False)
        Y_d = np.float32(df[f"{pref}Y"].iloc[0])
        YY_d = np.float32(df[f"{pref}YY"].iloc[0])
        XY_d = df[f"{pref}XY"].to_numpy(dtype=np.float32, copy=False)

        N[pref] += N_d
        x_sum[pref] += X_d
        y_sum[pref] += Y_d
        yy[pref] += YY_d
        xy[pref] += XY_d

# OLS

In [None]:
betas = {}
metrics = {}

xx64 = xx.astype(np.float64, copy=False)

for pref in prefixes:
    n = int(N[pref])
    assert n > 0

    xs = x_sum[pref].astype(np.float64, copy=False)
    ys = float(y_sum[pref])
    yty = float(yy[pref])
    xty = xy[pref].astype(np.float64, copy=False)

    G = np.empty((p + 1, p + 1), dtype=np.float64)
    G[0, 0] = n
    G[0, 1:] = xs
    G[1:, 0] = xs
    G[1:, 1:] = xx64

    g = np.empty((p + 1,), dtype=np.float64)
    g[0] = ys
    g[1:] = xty

    beta = np.linalg.solve(G, g)

    betaTg = float(beta @ g)
    sse = yty - betaTg

    if sse < 0 and sse > -1e-6 * max(1.0, yty):
        sse = 0.0

    mse = sse / n
    rmse = np.sqrt(mse)

    ybar = ys / n
    tss = yty - n * (ybar ** 2)
    if tss <= 0:
        r2 = np.nan
    else:
        r2 = 1.0 - (sse / tss)

    betas[pref] = beta.astype(np.float64, copy=False)
    metrics[pref] = {
        "N": n,
        "SSE": sse,
        "MSE": mse,
        "RMSE": rmse,
        "R2": r2,
    }