In [1]:
# ========== Basic Libraries ==========
import numpy as np
import pandas as pd
import os
import joblib

# ========== Models and Preprocessing ==========
from sklearn.linear_model import (
    LinearRegression, Ridge, Lasso, ElasticNet, TweedieRegressor
)
from sklearn.cross_decomposition import PLSRegression
from sklearn.decomposition import PCA
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import confusion_matrix

# ========== Evaluation Metrics ==========
from sklearn.metrics import (
    mean_squared_error, mean_absolute_error, r2_score, explained_variance_score
)

# ========== Visualization and Hyperparameter Tuning ==========
import matplotlib
import matplotlib.pyplot as plt
! install optuna
import optuna
optuna.logging.set_verbosity(optuna.logging.WARNING)

# ========== Global Configuration ==========
np.random.seed(42)
plt.rcdefaults()
print(matplotlib.rcParams['axes.prop_cycle'])


usage: install [-bCcpSsUv] [-f flags] [-g group] [-m mode] [-o owner]
               [-M log] [-D dest] [-h hash] [-T tags]
               [-B suffix] [-l linkflags] [-N dbdir]
               file1 file2
       install [-bCcpSsUv] [-f flags] [-g group] [-m mode] [-o owner]
               [-M log] [-D dest] [-h hash] [-T tags]
               [-B suffix] [-l linkflags] [-N dbdir]
               file1 ... fileN directory
       install -dU [-vU] [-g group] [-m mode] [-N dbdir] [-o owner]
               [-M log] [-D dest] [-h hash] [-T tags]
               directory ...
cycler('color', ['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd', '#8c564b', '#e377c2', '#7f7f7f', '#bcbd22', '#17becf'])


In [None]:

def load_datasets(npz_path="/Users/june/Documents/University of Manchester/Data Science/ERP/Project code/1_Data_Preprocessing/all_window_datasets.npz"):
    data = np.load(npz_path, allow_pickle=True) 
    datasets = {}
    for key in data.files:
        datasets[key] = data[key]
    return datasets
    

In [3]:

def r2_zero(y_true, y_pred):
    """
    Calculate zero-based R² (baseline is 0).
    y_true: array of true values (N,)
    y_pred: array of predicted values (N,)
    """
    rss = np.sum((y_true - y_pred)**2)  
    tss = np.sum(y_true**2)            
    return 1 - rss / tss

def calc_directional_metrics(y_true, y_pred, permnos=None):
    """
    Improved version:
    - Sample-level sign prediction
    - If grouped by stock, calculate Overall, Up, Down for each stock and then average
    """
    y_true = np.asarray(y_true)
    y_pred = np.asarray(y_pred)

    if permnos is None:
        s_true = np.sign(y_true)
        s_pred = np.sign(y_pred)
        mask = s_true != 0
        s_true = s_true[mask]
        s_pred = s_pred[mask]

        overall_acc = np.mean(s_true == s_pred)

        up_mask = s_true > 0
        down_mask = s_true < 0
        up_acc = np.mean(s_true[up_mask] == s_pred[up_mask]) if np.any(up_mask) else 0
        down_acc = np.mean(s_true[down_mask] == s_pred[down_mask]) if np.any(down_mask) else 0

    else:
        df = pd.DataFrame({"permno": permnos, "yt": y_true, "yp": y_pred})
        overall_accs = []
        up_accs = []
        down_accs = []

        for _, g in df.groupby("permno"):
            s_true = np.sign(g["yt"].values)
            s_pred = np.sign(g["yp"].values)
            mask = s_true != 0
            s_true = s_true[mask]
            s_pred = s_pred[mask]
            if len(s_true) == 0:
                continue
            overall_accs.append(np.mean(s_true == s_pred))

            up_mask = s_true > 0
            down_mask = s_true < 0
            up_accs.append(np.mean(s_true[up_mask] == s_pred[up_mask]) if np.any(up_mask) else np.nan)
            down_accs.append(np.mean(s_true[down_mask] == s_pred[down_mask]) if np.any(down_mask) else np.nan)

        overall_acc = np.nanmean(overall_accs)
        up_acc = np.nanmean(up_accs)
        down_acc = np.nanmean(down_accs)

    return overall_acc, up_acc, down_acc


def regression_metrics(y_true, y_pred, k, meta=None, permnos=None):
    """
    Includes:
    - Regression metrics
    - Pointwise directional accuracy
    - Market cap group metrics
    """
    y_true = np.asarray(y_true)
    y_pred = np.asarray(y_pred)
    n = len(y_true)

    r2 = r2_zero(y_true, y_pred)
    rmse = np.sqrt(mean_squared_error(y_true, y_pred))
    mae = mean_absolute_error(y_true, y_pred)
    mse = mean_squared_error(y_true, y_pred)

    dir_acc, up_acc, down_acc = calc_directional_metrics(y_true, y_pred, permnos)

    metrics = {
        "R²_zero": r2,
        "RMSE": rmse,
        "MAE": mae,
        "MSE": mse,
        "Directional Accuracy": dir_acc,
        "Up_Directional_Acc": up_acc,
        "Down_Directional_Acc": down_acc
    }

    if meta is not None and "MKTCAP_PERCENTILE" in meta:
        top_mask = meta["MKTCAP_PERCENTILE"] >= 0.75
        bottom_mask = meta["MKTCAP_PERCENTILE"] <= 0.25

        if np.any(top_mask):
            yt_top = y_true[top_mask]
            yp_top = y_pred[top_mask]
            perm_top = permnos[top_mask] if permnos is not None else None
            r2_top = r2_zero(yt_top, yp_top)
            rmse_top = np.sqrt(mean_squared_error(yt_top, yp_top))
            mae_top = mean_absolute_error(yt_top, yp_top)
            mse_top = mean_squared_error(yt_top, yp_top)
            dir_top, up_top, down_top = calc_directional_metrics(yt_top, yp_top, perm_top)
            metrics.update({
                "Top25_R2_zero": r2_top,
                "Top25_RMSE": rmse_top,
                "Top25_MAE": mae_top,
                "Top25_MSE": mse_top,
                "Top25_Dir_Acc": dir_top,
                "Top25_Up_Acc": up_top,
                "Top25_Down_Acc": down_top
            })

        if np.any(bottom_mask):
            yt_bot = y_true[bottom_mask]
            yp_bot = y_pred[bottom_mask]
            perm_bot = permnos[bottom_mask] if permnos is not None else None
            r2_bot = r2_zero(yt_bot, yp_bot)
            rmse_bot = np.sqrt(mean_squared_error(yt_bot, yp_bot))
            mae_bot = mean_absolute_error(yt_bot, yp_bot)
            mse_bot = mean_squared_error(yt_bot, yp_bot)
            dir_bot, up_bot, down_bot = calc_directional_metrics(yt_bot, yp_bot, perm_bot)
            metrics.update({
                "Bottom25_R2_zero": r2_bot,
                "Bottom25_RMSE": rmse_bot,
                "Bottom25_MAE": mae_bot,
                "Bottom25_MSE": mse_bot,
                "Bottom25_Dir_Acc": dir_bot,
                "Bottom25_Up_Acc": up_bot,
                "Bottom25_Down_Acc": down_bot
            })

    return metrics


In [4]:

def save_model(model, name, window, path="models/"):
    os.makedirs(path, exist_ok=True)
    joblib.dump(model, os.path.join(path, f"{name}_w{window}.joblib"))


def save_metrics(metrics_dict, name, window, path="results.csv"):
    """Save evaluation metrics"""
    row = pd.DataFrame([metrics_dict])
    row.insert(0, "Model", name)
    row.insert(1, "Window", window)

    if os.path.exists(path):
        df = pd.read_csv(path)
        df = df[~((df["Model"] == name) & (df["Window"] == window))]
        df = pd.concat([df, row], ignore_index=True)
        df.to_csv(path, index=False)
        print(f"[Update] Metrics updated for {name} w={window}")
    else:
        row.to_csv(path, index=False)
        print(f"[Create] New metrics file created with {name} w={window}")

def save_predictions(model_name, window_size, y_true, y_pred, permnos, path="predictions/"):
    os.makedirs(path, exist_ok=True)
    
    df = pd.DataFrame({
        "PERMNO": permnos,
        "y_true": y_true,
        "y_pred": y_pred
    })

    filename = f"{model_name}_w{window_size}.csv"
    df.to_csv(os.path.join(path, filename), index=False)
    print(f"[Save] {filename}")

In [None]:
TUNED_MODELS = {"Ridge", "Lasso", "Enet", "PCR", "PLS"}

from sklearn.model_selection import TimeSeriesSplit

def tune_model_with_optuna(model_name, X, y, permnos=None, n_trials=50):
    if model_name not in TUNED_MODELS:
        print(f"[Skip] {model_name} is not a tunable model. Skipped tuning.")
        return None

    tscv = TimeSeriesSplit(n_splits=5)

    def objective(trial):
        if model_name == "Ridge":
            alpha = trial.suggest_float("alpha", 1e-4, 100.0, log=True)
            model = Ridge(alpha=alpha)
        elif model_name == "Lasso":
            alpha = trial.suggest_float("alpha", 1e-4, 100.0, log=True)
            model = Lasso(alpha=alpha, max_iter=2000)
        elif model_name == "Enet":
            alpha = trial.suggest_float("alpha", 1e-4, 100.0, log=True)
            l1_ratio = trial.suggest_float("l1_ratio", 0.05, 0.9)
            model = ElasticNet(alpha=alpha, l1_ratio=l1_ratio, max_iter=2000)
        elif model_name == "PLS":
            max_c = max(1, min(30, X.shape[1]))
            n_components = trial.suggest_int("n_components", 2, max_c)
            model = PLSRegression(n_components=n_components)
        elif model_name == "PCR":
            max_c = max(1, min(30, X.shape[1]))
            n_components = trial.suggest_int("n_components", 2, max_c)
            model = make_pipeline(StandardScaler(), PCA(n_components=n_components), LinearRegression())

        cv_scores = []
        for train_idx, val_idx in tscv.split(X):
            X_tr, X_val = X[train_idx], X[val_idx]
            y_tr, y_val = y[train_idx], y[val_idx]

            model.fit(X_tr, y_tr)
            preds = model.predict(X_val)

            mse = mean_squared_error(y_val, preds)
            cv_scores.append(mse)

        return np.mean(cv_scores)

    study = optuna.create_study(
        direction="minimize",
        sampler=optuna.samplers.TPESampler(seed=42),
        pruner=optuna.pruners.MedianPruner()
    )
    study.optimize(objective, n_trials=n_trials, n_jobs=-1)

    if len(study.trials) == 0 or study.best_trial is None:
        print(f"[Skip Model] {model_name} failed to complete any trial. Skipping.")
        return None

    best_params = study.best_params
    best_score = study.best_value
    print(f"[Optuna] {model_name} best_MSE={best_score:.6f}, best_params={best_params}")

    if model_name == "Ridge":
        return Ridge(**best_params)
    elif model_name == "Lasso":
        return Lasso(**best_params)
    elif model_name == "Enet":
        return ElasticNet(**best_params)
    elif model_name == "PLS":
        return PLSRegression(**best_params)
    elif model_name == "PCR":
        return make_pipeline(StandardScaler(), PCA(n_components=best_params["n_components"]), LinearRegression())

    return None

def get_model(name: str):
    if name == "OLS":
        return LinearRegression()
    raise ValueError(f"Unexpected model: {name}. Only OLS is supported by default.")

Although the original paper includes a Generalized Linear Model with Group Lasso (GLM) for cross-sectional prediction, it is not appropriate for our time-series forecasting task. Group Lasso assumes that input features can be divided into meaningful groups, such as different types of firm characteristics (e.g., value, momentum, size), and applies regularization to entire groups of features. This makes sense in cross-sectional settings where features have an inherent grouped structure.

However, in our case, each input sample consists of lagged returns over a time window (e.g., returns from t−21 to t−1). These lagged features do not naturally belong to any specific group—they represent a sequential temporal structure rather than logically separable categories. Applying group regularization to such features would be arbitrary and may even hurt model performance.

Therefore, including GLM in our time-series model pipeline is unnecessary and potentially misleading. Instead, using standard Lasso, Ridge, and Elastic Net regularization is more appropriate for encouraging sparsity and stability in this context.

In [6]:
# ========== 6. Main training logic for models ==========

def train_and_evaluate(model_name, window_size,
                       X_train, y_train, X_test, y_test,
                       permnos_train, permnos_test, meta=None):

    if model_name in TUNED_MODELS:
        model = tune_model_with_optuna(model_name, X_train, y_train, permnos=permnos_train)
    else:
        model = get_model(model_name)

    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    print("\n=== Directional Sanity Check ===")
    print("Pos ratio (y_test):", (y_test > 0).mean())
    print("Neg ratio (y_test):", (y_test < 0).mean())
    sign_pred = np.sign(y_pred)
    print("Pred +1 ratio:", (sign_pred > 0).mean())
    print("Pred -1 ratio:", (sign_pred < 0).mean())
    
    conf = confusion_matrix(np.sign(y_test), sign_pred, labels=[1, -1])
    print("      Pred+  Pred-")
    print("+1 |", conf[0])
    print("-1 |", conf[1])

    metrics = regression_metrics(y_test, y_pred, k=X_test.shape[1], meta=meta, permnos=permnos_test)

    save_model(model, model_name, window_size)
    save_metrics(metrics, model_name, window_size)
    save_predictions(model_name, window_size, y_test, y_pred, permnos_test)

    return metrics

In [7]:
# ========== 7. Main dispatcher function: loop through all models and window sizes ==========

def loop_all_models():
    datasets = load_datasets("/Users/june/Documents/University of Manchester/Data Science/ERP/Project code/1_Data_Preprocessing/all_window_datasets.npz")
    model_list = ["OLS", "Ridge", "Lasso", "Enet", "PLS", "PCR"]
    window_sizes = [5, 21, 252, 512]

    for window in window_sizes:
        X_train = datasets[f"X_train_{window}"]
        y_train = datasets[f"y_train_{window}"]
        X_test = datasets[f"X_test_{window}"]
        y_test = datasets[f"y_test_{window}"]

        # Load train & test meta data
        meta_train_dict = datasets[f"meta_train_{window}"].item()
        meta_test_dict = datasets[f"meta_test_{window}"].item()

        meta_train = pd.DataFrame.from_dict(meta_train_dict)
        meta_test = pd.DataFrame.from_dict(meta_test_dict)

        # Extract permnos
        permnos_train = meta_train["PERMNO"].values
        permnos_test = meta_test["PERMNO"].values

        for model_name in model_list:
            print(f"Training {model_name} on Window = {window}")
            train_and_evaluate(
                model_name, window,
                X_train, y_train, X_test, y_test,
                permnos_train, permnos_test,  
                meta_test
            )


In [8]:

if __name__ == "__main__" :
    loop_all_models()


Training OLS on Window = 5

=== Directional Sanity Check ===
Pos ratio (y_test): 0.5225259359494813
Neg ratio (y_test): 0.47667117726657643
Pred +1 ratio: 0.8401353179972937
Pred -1 ratio: 0.15986468200270637
      Pred+  Pred-
+1 | [49038  8884]
-1 | [44017  8822]
[Update] Metrics updated for OLS w=5
[Save] OLS_w5.csv
Training Ridge on Window = 5
[Optuna] Ridge best_MSE=0.000298, best_params={'alpha': 97.57863488238746}

=== Directional Sanity Check ===
Pos ratio (y_test): 0.5225259359494813
Neg ratio (y_test): 0.47667117726657643
Pred +1 ratio: 0.840297699594046
Pred -1 ratio: 0.15970230040595398
      Pred+  Pred-
+1 | [49043  8879]
-1 | [44030  8809]
[Update] Metrics updated for Ridge w=5
[Save] Ridge_w5.csv
Training Lasso on Window = 5
[Optuna] Lasso best_MSE=0.000298, best_params={'alpha': 0.00010201015823920868}

=== Directional Sanity Check ===
Pos ratio (y_test): 0.5225259359494813
Neg ratio (y_test): 0.47667117726657643
Pred +1 ratio: 0.9054758682904827
Pred -1 ratio: 0.09452