# Imports

In [None]:
import numpy as np
import pandas as pd
import yfinance as yf
import matplotlib.pyplot as plt
from matplotlib.lines import Line2D
from pandas.tseries.frequencies import to_offset
from pandas.api.types import is_integer_dtype
import statsmodels.api as sm
import math
from itertools import combinations
import ta
import seaborn as sns
import statsmodels.formula.api as smf
import warnings
from statsmodels.tools.sm_exceptions import ValueWarning
warnings.filterwarnings("ignore")
warnings.simplefilter("ignore")
from statsmodels.stats.anova import anova_lm
import torch
from torch import nn
from torch.utils.data import TensorDataset, Dataset, DataLoader
from torch import tensor
from sklearn.ensemble import RandomForestRegressor
from pmdarima import auto_arima    
from statsmodels.tsa.statespace.sarimax import SARIMAX
from itertools import product
import multiprocessing as mp
from copy import deepcopy
import gc
from concurrent.futures import ThreadPoolExecutor, as_completed
from collections import Counter, defaultdict
from math import ceil
from numpy import sqrt, exp, log
from scipy.special import ndtr
from dataclasses import dataclass
from typing import Dict, List, Tuple, Optional

# Introduction

In [None]:
strike_price = 100
premium_call = 4
premium_put = 6
stock_prices = np.linspace(50, 150, 500)

pnl_call = np.maximum(stock_prices - strike_price, 0) - premium_call
pnl_put = np.maximum(strike_price - stock_prices, 0) - premium_put
pnl_straddle = pnl_call + pnl_put

In [None]:
#This produces the plots in Figure 1

plt.style.use('seaborn-v0_8-whitegrid')
call_color = "#1f77b4"
put_color = "#ff7f0e"
straddle_color = "#2ca02c"
strike_color = "#d62728"

#Call option PnL
fig = plt.figure(figsize=(7, 4))
plt.plot(stock_prices, pnl_call, label="Call Option Payoff", color=call_color, linewidth=2)
plt.axhline(0, color='black', linewidth=1, linestyle='--')
plt.axvline(strike_price, color=strike_color, linestyle='--', linewidth=2, label="Strike Price")
plt.title("Payoff of a Call Option", fontsize=30, weight="bold")
plt.xlabel("Stock Price at Expiration", fontsize=25)
plt.ylabel("Profit", fontsize=25)
plt.tick_params(axis="x", colors="black", labelsize=20)
plt.tick_params(axis="y", colors="black", labelsize=20)
plt.legend(fontsize=15, frameon=True, edgecolor='black')
plt.grid(True, linestyle=':', linewidth=0.7)
plt.tight_layout()
plt.show()
plt.close(fig)

#Put option PnL
fig = plt.figure(figsize=(7, 4))
plt.plot(stock_prices, pnl_put, label="Put Option Payoff", color=put_color, linewidth=2)
plt.axhline(0, color='black', linewidth=1, linestyle='--')
plt.axvline(strike_price, color=strike_color, linestyle='--', linewidth=2, label="Strike Price")
plt.title("Payoff of a Put Option", fontsize=30, weight="bold")
plt.xlabel("Stock Price at Expiration", fontsize=25)
plt.ylabel("Profit", fontsize=25)
plt.tick_params(axis="x", colors="black", labelsize=20)
plt.tick_params(axis="y", colors="black", labelsize=20)
plt.legend(fontsize=15, frameon=True, edgecolor='black')
plt.grid(True, linestyle=':', linewidth=0.7)
plt.tight_layout()
plt.show()
plt.close(fig)

#Straddle PnL
fig = plt.figure(figsize=(7, 4))
plt.plot(stock_prices, pnl_straddle, label="Straddle Payoff", color=straddle_color, linewidth=2)
plt.axhline(0, color='black', linewidth=1, linestyle='--')
plt.axvline(strike_price, color=strike_color, linestyle='--', linewidth=2, label="Strike Price")
plt.title("Payoff of a Straddle", fontsize=30, weight="bold")
plt.xlabel("Stock Price at Expiration", fontsize=25)
plt.ylabel("Profit", fontsize=25)
plt.tick_params(axis="x", colors="black", labelsize=20)
plt.tick_params(axis="y", colors="black", labelsize=20)
plt.legend(fontsize=15, frameon=True, edgecolor='black')
plt.grid(True, linestyle=':', linewidth=0.7)
plt.tight_layout()
plt.show()
plt.close(fig)

# Data Collection and Preliminary Checks

In [None]:
#We will perform our analysis on the period 2013-2021 (9 years), 
#and backtest our trading strategy on the period 2022-2024 (3 years)

start_date_analysis, end_date_analysis, end_date_backtest = "2013-01-01", "2022-01-01", "2025-01-01"
tickers = ["AAPL", "ABBV", "AMD", "AMGN", "AMZN", "AVGO", "AXP", "BAC", "BMY", "C", "GILD", "GOOG", "GS", "JNJ", "JPM", 
           "LLY", "MA", "META", "MRK", "MS", "MSFT", "NVDA", "ORCL", "PFE", "REGN", "SCHW", "TSLA", "V", "VRTX", "WFC"]
n_tickers = len(tickers)   #30 tickers in this study

In [None]:
sector_of = {}
for tkr in tickers:
    try:
        sector_of[tkr] = yf.Ticker(tkr).info.get("sector", "Unknown")
    except Exception:
        sector_of[tkr] = "Unknown"

unique_sectors = sorted({s for s in sector_of.values() if s != "Unknown"})
sector_dict = {s: i + 1 for i, s in enumerate(unique_sectors)}
sector_dict["Unknown"] = 0         

In [None]:
frames = []

for tkr in tickers:
    try:
        tmp = (
            yf.Ticker(tkr)
              .history(start=start_date_analysis, end=end_date_analysis, auto_adjust=False)
              .reset_index()
              .assign(
                  Ticker=tkr,
                  Sector=sector_of[tkr]
              )
              .loc[:, ["Date", "Ticker", "Sector",
                       "Open", "High", "Low", "Close", "Volume"]]
        )
        frames.append(tmp)
    except Exception as e:
        print(f"Skipping {tkr} (price fetch failed): {e}")

df = pd.concat(frames, ignore_index=True)

#Convert Date to plain date
df["Date"] = pd.to_datetime(df["Date"]).dt.date

#Map "Sector" to its integer code (filling any unknown sectors with 0)
df["Sector"] = df["Sector"].map(sector_dict).fillna(0).astype(int)

In [None]:
#Number of trading days per ticker
n = round(len(df) / n_tickers)
n

In [None]:
#No missing values
len(df[df.isnull().any(axis=1)])

In [None]:
#We have a single entry with a "Volume" value of 0 (on that day, AMD moved its listing from NYSE to NASDAQ, so no
#trading occurred for this stock)
df[(df[["Close", "High", "Low", "Open", "Volume"]] <= 0).any(axis=1)]

In [None]:
#To avoid issues later, we change this data to the data of the previous trading day
for col in ["Open", "High", "Low", "Close", "Volume"]:
    df.loc[5038, col] = df.loc[5037, col]

In [None]:
#The tickers are grouped and sorted as expected
df["Ticker"].tolist() == [stock for stock in tickers for _ in range(n)]

In [None]:
#Dates are in increasing order for all tickers
check = True
grouped_df = df.groupby("Ticker")
is_date_order_maintained = True
for ticker, group in grouped_df:
    if not group["Date"].is_monotonic_increasing: check = False
check

In [None]:
#All tickers contain the same dates
common_dates = None
for ticker, group in grouped_df:
    unique_dates = set(group["Date"])
    if common_dates is None:
        common_dates = unique_dates
    elif common_dates != unique_dates:
        common_dates = common_dates.intersection(unique_dates)

common_dates == set(df["Date"])

In [None]:
#We check the correlations between pairs of stocks. To do so, we first create a table with 1 + 
#"n_tickers" columns ("Date" + "n_tickers" columns for the close prices of each stock). We name 
#these columns by the corresponding ticker

df_by_ticker = df.pivot(index="Date", columns="Ticker", values="Close")
df_by_ticker.reset_index(inplace=True)

In [None]:
#This creates the plot in Figure 2

stocks_corr_matrix = df_by_ticker.iloc[:, 1:].corr() * 100 
    
fig = plt.figure(figsize=(12, 10))
fig.patch.set_facecolor("white")

sns.heatmap(stocks_corr_matrix, annot=True, fmt='.0f', cmap="coolwarm", cbar=True, 
annot_kws={"color": "black", "fontsize": 14}, linewidths=0.5, linecolor="gray")
    
plt.title("Correlation Matrix of the Different Stocks", color="black", fontsize=20, weight="bold")      
plt.xlabel("Close Price of Different Stocks", color="black", fontsize=20)
plt.ylabel("Close Price of Different Stocks", color="black", fontsize=20)

ax = plt.gca()
ax.set_facecolor("white")
ax.tick_params(axis="x", colors="black", labelrotation=45, labelsize=10)
ax.tick_params(axis="y", colors="black", labelrotation=0, labelsize=10)
for spine in ax.spines.values():
    spine.set_edgecolor("black")
    spine.set_linewidth(1)
plt.show()
plt.close(fig)

# Feature and Target Engineering

### Feature Construction

In [None]:
#Here we define some helper functions to aid in the computation of some predictors in the cell below

x_vector = np.arange(20)
x_vector_mean = x_vector.mean()
x_vector_var = x_vector.var(ddof=0)

def _slope_computer(y):
    y_mean = y.mean()
    cov_xy = ((x_vector - x_vector_mean) * (y - y_mean)).mean()
    return (cov_xy / x_vector_var) / y.mean()  #% per day


triu_mask = np.triu_indices(20, k=1)  #i<j mask for a 20×20 matrix

def _tau_weighted(y):
    """
    Modified Kendall's Tau statistic on a fixed 20-day window of prices.

    C = Σ max(price_j - price_i, 0)      over all i<j
    D = Σ max(price_i - price_j, 0)      over all i<j
    τ = (C - D) / (C + D)                ∈ [-1, +1]
    """
    #pair-wise differences for i<j
    y = np.asarray(y)
    diff = y[None, :] - y[:, None]           #diff[i,j] = price_j − price_i
    diff = diff[triu_mask]                   #keep only i<j half (190 values)

    C = diff[diff > 0].sum()             #concordant (up-moves),   ≥ 0
    D = (-diff[diff < 0]).sum()          #discordant (down-moves), ≥ 0
    denom = C + D
    if denom == 0:
        return np.nan                    #when all prices are identical, return NaN
    return (C - D) / denom

spy_return = yf.Ticker("SPY").history(start=start_date_analysis, end=end_date_analysis, auto_adjust=False)["Close"].pct_change()
spy_return.index = spy_return.index.tz_localize(None)

In [None]:
#This function computes several predictors derived from price and volume data. They are all computed by-ticker

def add_features(df_ticker):
    """
    This function takes a DataFrame for a single ticker and adds several columns, all computed by-ticker
    """
    df = df_ticker.copy()
    prices = df["Close"]

    #Normalised slope of linear regression fitted with y = past prices and x = [0, 1, ..., 19]
    df["Slope"] = prices.rolling(20, min_periods=20).apply(_slope_computer)

    #Magnitude-Aware Kendall’s Tau statistic, computed on past prices
    df["Tau"] = prices.rolling(20, min_periods=20).apply(_tau_weighted)

    #Day-by-day return
    ticker_return = prices.pct_change()
    df["Return"] = ticker_return

    #Coefficient and slope of linear regression fitted with y = past prices and x = past SPY prices on the same dates
    spy = (spy_return.reindex(df["Date"]).reset_index(drop=True))
    spy.index = df.index
    cov_spy_and_ticker = spy.rolling(30, min_periods=30).cov(ticker_return)
    var_spy = spy.rolling(30, min_periods=30).var()
    beta = cov_spy_and_ticker / var_spy.replace(0, np.nan)     #avoid /0
    alpha = ticker_return.rolling(30, min_periods=30).mean() - beta * spy.rolling(30, min_periods=30).mean()
    df["Beta"] = beta
    df["Alpha"] = alpha

    #Approximate number of shares traded
    df["Shares_traded"] = df["Volume"] / prices

    #Exponentially weighted volatility (standard deviation of returns)
    df["Volatility"] = df["Return"].ewm(span=20).std()
    
    #Normalised difference between exponential moving averages
    ema_10 = ta.trend.EMAIndicator(close=df["Close"], window=10, fillna=False).ema_indicator()
    ema_30 = ta.trend.EMAIndicator(close=df["Close"], window=30, fillna=False).ema_indicator()
    df["EMA_diff"] = (ema_10 - ema_30) / prices
    
    #Momentum (using MACD)
    macd_indicator = ta.trend.MACD(
        close=df["Close"], window_slow=24, window_fast=12, window_sign=9, fillna=False)
    hist_series = macd_indicator.macd_diff()
    df["MACD_momentum"] = np.where((hist_series > 0) & (hist_series.diff() > 0), 2,
        np.where((hist_series > 0) & (hist_series.diff() <= 0), 1,
        np.where((hist_series <= 0) & (hist_series.diff() <= 0), -2,
        np.where((hist_series <= 0) & (hist_series.diff() > 0), -1,
                    np.nan))))

    #Relative strength index
    df["RSI"] = ta.momentum.RSIIndicator(close=df["Close"], window=14, fillna=False).rsi()
    
    #Bollinger bands binary variables
    bb_indicator = ta.volatility.BollingerBands(close=df["Close"], window=20, window_dev=2, fillna=False)            
    bb_upper = bb_indicator.bollinger_hband()
    bb_lower = bb_indicator.bollinger_lband()
    df["Bol_up"] = (prices > bb_upper).astype(int)
    df["Bol_down"] = (prices < bb_lower).astype(int)
    
    #Stochastic oscillator signal
    stoch_indicator = ta.momentum.StochasticOscillator(high=df["High"], low=df["Low"], 
                      close=df["Close"], window=14, smooth_window=3, fillna=False)             
    df["Stoch_osc_signal"] = stoch_indicator.stoch() - stoch_indicator.stoch_signal()
    
    #Commodity channel index
    df["CCI"] = cci_indicator = ta.trend.CCIIndicator(
        high=df["High"], low=df["Low"], close=df["Close"], window=20, fillna=False).cci()

    #On-balance volume
    df["OBV"] = ta.volume.OnBalanceVolumeIndicator(close=df["Close"], volume=df["Volume"], 
             fillna=False).on_balance_volume() / df["Volume"]       
    
    #Money flow index
    df["MFI"] = ta.volume.MFIIndicator(high=df["High"], low=df["Low"], close=df["Close"], 
                volume=df["Volume"], window=14, fillna=False).money_flow_index()
    
    #Parabolic Stock and Reverse (SAR)
    df["PSAR"] = ta.trend.PSARIndicator(high=df["High"], low=df["Low"], close=df["Close"]
                 ).psar() / prices
    
    return df

### Target Construction

In [None]:
#This function computes, for each stock on each day, the target variable and several lagged versions of it, to use 
#as predictors. In this analysis, we only work with one predictor, but the framework allows onw to experiment with 
#many targets, each with a different horizon

def add_targets(df_ticker, target_horizons=(1, 5, 10), future=True):
    """
    For each horizon "h" in "target_horizons", add a column that is the maximum absolute % move
    (up or down) in the next (if "future" is True) or previous (if "future" is False) "h" trading days. 
    This column is called "Target<h>" if "future" is True, and "Lagged_target<h>" if "future" is False.
    """
    df = df_ticker.copy()
    high, low, close = df["High"], df["Low"], df["Close"]

    for h in target_horizons:
        if future: #this part is forward-looking (so it is used to compute the target variables)
            highest = high.shift(-1).iloc[::-1].rolling(h, min_periods=h).max().iloc[::-1]
            lowest = low.shift(-1).iloc[::-1].rolling(h, min_periods=h).min().iloc[::-1]

            #largest move in the next "h" days (from tomorrow)
            df[f"Target{h}"] = np.maximum((highest - close).abs() / close, (lowest  - close).abs() / close)

        else: #this part is backward-looking (so it is used to compute the target variables)
            highest = high.rolling(h, min_periods=h).max()
            lowest = low.rolling(h, min_periods=h).min()

            #largest move in the previous "h" days (including today). This could be defined in many other ways
            df[f"Lagged_target{h}"] = np.maximum((close - highest).abs() / highest, (close - lowest).abs() / lowest)

    return df

In [None]:
#Now we apply the above functions to every ticker separately, adding both the predictors and the target to "df".
#We choose to have only one target variable, while we use 5 lagged versions of the target as predictors

horizons = (1, 3, 5, 10, 20)
max_horizon = max(horizons)
target_horizon = 10  #We may experiment with different target variables

df = (df.groupby("Ticker", group_keys=False, sort=False).apply(add_features)
.groupby("Ticker", group_keys=False, sort=False).apply(add_targets, target_horizons=horizons, future=False)
.groupby("Ticker", group_keys=False, sort=False).apply(add_targets, target_horizons=(target_horizon,), future=True))

df = df.rename(columns={f"Target{target_horizon}" : "Target"})

In [None]:
#For each ticker, the first "h-1" values of "Lagged_target<h>" and the last "h" values of "Target<h>" are null
df.head(12)

In [None]:
#Comments on the "Tau" variable. This creates the plots in Figure 3

for i in range(8):
    ticker = tickers[i]
    dfplot = df[df["Ticker"] == ticker]

    fig, ax = plt.subplots(figsize=(12, 4))
    fig.patch.set_facecolor("white")
    
    dates_plot = np.array(dfplot["Date"])
    prices_plot = np.array(dfplot["Close"])
    tau_plot = np.array(dfplot["Tau"])
    discrete_tau_plot = np.where(tau_plot >  0.4,  1, np.where(tau_plot < -0.4, -1, 0))

    color_map = {1: "green", 0: "blue", -1: "red"} #This is the color mapping

    #We iterate through the data and plot segments with different colours
    for i in range(len(dates_plot) - 1):
        plt.plot(dates_plot[i:i+2], prices_plot[i:i+2], color=color_map[discrete_tau_plot[i]])

    legend_elements = [
        Line2D([0], [0], color="green", lw=2, label="Price when Tau > 0.4"),
        Line2D([0], [0], color="blue", lw=2, label="Price when -0.4 ≤ Tau ≤ 0.4"),
        Line2D([0], [0], color="red", lw=2, label="Price when Tau < -0.4")]

    legend = plt.legend(handles=legend_elements, loc="upper left", fontsize=17, frameon=True)
    legend.get_frame().set_edgecolor("black")  
    legend.get_frame().set_linewidth(2)  

    plt.xlabel("Date", color="black", fontsize=28)
    plt.ylabel("Price", color="black", fontsize=28)
    plt.title(f"Close Price of {ticker} Over Time, Coloured by Tau", color="black", 
              fontsize=28, weight="bold")          
    plt.grid(color="gray", linewidth=0.5)

    ax.tick_params(axis="x", colors="black", labelsize=20)
    ax.tick_params(axis="y", colors="black", labelsize=20)

    ax.set_facecolor("white")
    ax.tick_params(axis="x", colors="black")
    ax.tick_params(axis="y", colors="black")

    for spine in ax.spines.values():
        spine.set_edgecolor("black")
        spine.set_linewidth(1)
    
    plt.show()
    plt.close(fig)

# Directional Mean Squared Error Loss Function

In [None]:
#This shows that "Target" is a very good proxy of realised volatility, as "Volatility" has a 75%
#correlation with (a 10-day shifted version of) "Target"

df["Target_shifted_10d"] = df.groupby("Ticker")["Target"].shift(10)
df_clean = df.dropna(subset=["Target_shifted_10d"])
print(df_clean["Volatility"].corr(df_clean["Target_shifted_10d"]))
df.drop(columns=["Target_shifted_10d"], inplace=True)

In [None]:
#Definition of the Directional Mean Squared Error (DMSE) loss function

loss_penalty = 9
class DMSE(nn.Module):
    def __init__(self, over_penalty = loss_penalty, reduction = "mean"):
        super().__init__()
        self.over_penalty = over_penalty
        self.reduction = reduction

    def forward(self, y_hat: torch.Tensor, y: torch.Tensor) -> torch.Tensor:
        diff2 = (y - y_hat) ** 2
        loss = torch.where(y_hat <= y, diff2, diff2 * self.over_penalty)
        return loss.mean() if self.reduction == "mean" else loss.sum()

In [None]:
#This cell creates Figure 4

y = 1.0  #true value
y_hat = np.linspace(0, 2, 400)  #predicted values
loss = np.where(y_hat <= y, (y - y_hat)**2, loss_penalty * (y - y_hat)**2)  #DMSE

#Plot
plt.figure(figsize=(8, 5))
plt.plot(y_hat, loss, label='DMSE Loss', color='blue')
plt.axvline(x=y, color='gray', linestyle='--', label='True Value y = 1.0')
plt.xlabel('Prediction $\hat{y}$', fontsize=23)
plt.ylabel('Loss $L(y, \hat{y})$', fontsize=23)
plt.title('Directional Mean Squared Error Loss Function', color="black", fontsize=20, weight="bold")
plt.tick_params(axis="x", colors="black", labelsize=20)
plt.tick_params(axis="y", colors="black", labelsize=20)

legend = plt.legend(frameon=True, fontsize=16)
legend.get_frame().set_edgecolor('black')
legend.get_frame().set_linewidth(1.2)

plt.grid(True)
plt.show()

# Pre-Modelling Analysis

### Slightly Reduced Dataset

In [None]:
#We exclude from this part of the analysis every row containing null values, keeping the order of the data unchanged

df2 = df.dropna().copy()
df2.columns

### Correlation Between Variables

In [None]:
#We compute and plot the correlation matrix of the variables, which is shown in Figure 5

corr_matrix = df2.drop(["Date", "Ticker"], axis=1).corr() * 100

fig = plt.figure(figsize=(12, 10))
fig.patch.set_facecolor("white")

sns.heatmap(corr_matrix, annot=True, fmt='.0f',cmap='coolwarm', cbar=True, 
            annot_kws={"color": "black", "fontsize": 14}, linewidths=0.5, linecolor="gray")

plt.title("Correlation Matrix of our Variables", color="black", fontsize=30, weight="bold")
plt.xlabel(" ", color="black", fontsize=20)
plt.ylabel(" ", color="black", fontsize=20)

ax = plt.gca()
ax.set_facecolor("white")
ax.tick_params(axis="x", colors="black", labelrotation=90, labelsize=15)
ax.tick_params(axis="y", colors="black", labelsize=15)
for spine in ax.spines.values():
    spine.set_edgecolor("black")
    spine.set_linewidth(1)     
plt.show()
plt.close(fig)

In [None]:
df2.drop(["High", "Low", "Open", "Close", "Volume", "Slope"], axis=1, inplace=True)

### Mixed-Effects Models for Significance Tests

In [None]:
continuous_predictors = ['Tau', 'Return', 'Beta', 'Alpha', 'Shares_traded', 'Volatility', 'EMA_diff', 'RSI', 'Stoch_osc_signal', 
'CCI', 'OBV', 'MFI', 'PSAR', 'Lagged_target1', 'Lagged_target3', 'Lagged_target5', 'Lagged_target10', 'Lagged_target20']
categorical_predictors = ['Sector', 'MACD_momentum', 'Bol_up', 'Bol_down']

In [None]:
#Helper function to label the significance of p-values
def significance_label(p):
    if pd.isna(p): return ""
    elif p <= 0.001: return "***"
    elif p <= 0.01: return "**"
    elif p <= 0.05: return "*"
    else: return ""

In [None]:
#We fit a mixed linear model for each continuous predictor. This cell creates Table 1

pvalue_column = []
group_var_column = []
for var in continuous_predictors:
    formula = f"Target ~ {var}"
    model = smf.mixedlm(formula, df2, groups=df2["Ticker"])
    result = model.fit(reml=False, method='lbfgs', maxiter=1000)
    pvalue_column.append(result.pvalues[var])
    group_var_column.append(float(result.cov_re.iloc[0, 0]))
    #print(result.summary())
        
data = pd.DataFrame({"Variable" : continuous_predictors, "Group Variance" : group_var_column, "P-value" : pvalue_column})
data["Significance"] = data["P-value"].apply(significance_label)
data.sort_values(by="P-value", inplace=True)

data

### Visual Exploration of the Variables

In [None]:
#Density plot of "Target". This creates the plot in Figure 6

plt.figure(figsize=(8, 5))

sns.kdeplot(data=df2["Target"], fill=True, color='turquoise', linewidth=2)

plt.title(f"Density of Target", fontsize=28, weight="bold")
plt.ylabel("Density", fontsize=24)
plt.xlabel("Target", fontsize=24)

ax = plt.gca()
ax.set_facecolor("white")
ax.tick_params(axis="y", colors="black", labelsize=19)
ax.tick_params(axis="x", colors="black", labelsize=19)
for spine in ax.spines.values():
    spine.set_edgecolor("black")
    spine.set_linewidth(1)

plt.grid(False)
plt.tight_layout()
plt.show()

In [None]:
#Distribution of "Target" conditional on the quantiles of each continuous predictor.
#This creates the first part of Figure 7

quantile_bins = [0.2, 0.4, 0.6, 0.8]
palette_red1 = ["lightcoral", "salmon", "coral", "tomato", "orangered"]

for var in continuous_predictors:
    #Compute quantiles for the variable
    quantiles = df2[var].quantile(quantile_bins).values
    bins = [-np.inf] + list(quantiles) + [np.inf]
    labels = [f"<Q20", "[Q20,Q40)", "[Q40,Q60)", "[Q60,Q80)", "≥Q80"]

    #Bin the variable
    df2[f'{var}_bin'] = pd.cut(df2[var], bins=bins, labels=labels, include_lowest=True)

    #Prepare data: list of Target values grouped by binned variable
    data = [df2.loc[df2[f'{var}_bin'] == lab, 'Target'].dropna() for lab in labels]

    #Plot
    plt.figure(figsize=(8, 6))
    bp = plt.boxplot(
        data,
        labels=labels,
        patch_artist=True,
        boxprops=dict(facecolor='lightblue', color='black'),
        medianprops=dict(color='black'),
        whiskerprops=dict(color='black'),
        capprops=dict(color='black'),
        flierprops=dict(marker='o', color='black', markersize=5)
    )

    for box, col in zip(bp['boxes'], palette_red1):
        box.set_facecolor(col)

    plt.title(f"Target by {var} quantiles", fontsize=25, weight="bold")
    plt.xlabel(f"{var} quantile bin", fontsize=30)
    plt.ylabel("Target", fontsize=30)

    ax = plt.gca()
    ax.set_facecolor("white")
    ax.tick_params(axis="y", colors="black", labelsize=21)
    ax.tick_params(axis="x", colors="black", labelsize=20)
    for spine in ax.spines.values():
        spine.set_edgecolor("black")
        spine.set_linewidth(1)

    plt.grid(False)
    plt.tight_layout()
    plt.show()
    df2.drop([f'{var}_bin'], axis=1, inplace=True)

In [None]:
#Distribution of "Target" conditional on the categories of each categorical predictor.
#This creates the second part of Figure 7

palette_red2 = ["mistyrose", "lightcoral", "salmon", "darksalmon", "coral", "tomato", 
                "orangered", "orange", "darkorange", "red", "firebrick", "darkred"]

for var in categorical_predictors:
    categories = sorted(df2[var].dropna().unique())
    data = [df2.loc[df2[var] == cat, 'Target'].dropna() for cat in categories]

    plt.figure(figsize=(8, 6))
    bp = plt.boxplot(data, labels=categories, patch_artist=True, boxprops=dict(facecolor='lightblue', color='black'),
        medianprops=dict(color='black'), whiskerprops=dict(color='black'), capprops=dict(color='black'), 
        flierprops=dict(marker='o', color='black', markersize=5))

    #Colour the boxes
    for idx, box in enumerate(bp['boxes']):
        box.set_facecolor(palette_red2[idx % len(palette_red2)])

    plt.title(f"Target by {var} categories", fontsize=25, weight="bold")
    plt.xlabel(f"{var}", fontsize=30)
    plt.ylabel("Target", fontsize=30)

    ax = plt.gca()
    ax.set_facecolor("white")
    ax.tick_params(axis="y", colors="black", labelsize=21)
    ax.tick_params(axis="x", colors="black", labelsize=20)
    for spine in ax.spines.values():
        spine.set_edgecolor("black")
        spine.set_linewidth(1)

    plt.grid(False)
    plt.tight_layout()
    plt.show()

In [None]:
#Distribution of each continuous predictor conditional on "Target". 
#This creates the first part of Figure 8

bins   = [-np.inf, 0.04, 0.08, 0.12, 0.20, np.inf]
labels = ['≤0.04', '(0.04,0.08]', '(0.08,0.12]', '(0.12,0.20]', '>0.20']

df2['Target_bin'] = pd.cut(df2['Target'], bins=bins, labels=labels, right=True, include_lowest=True)

palette_blue = ["deepskyblue", "dodgerblue", "cornflowerblue", "royalblue", "blue"]

for var in continuous_predictors: 
    plt.figure(figsize=(8, 6))
    data = [df2.loc[df2['Target_bin'] == lab, var].dropna() for lab in labels]

    bp = plt.boxplot(
        data,
        labels=labels,
        patch_artist=True,
        boxprops=dict(facecolor='lightblue', color='black'),
        medianprops=dict(color='black'),
        whiskerprops=dict(color='black'),
        capprops=dict(color='black'),
        flierprops=dict(marker='o', color='black', markersize=5))
    
    
    for box, col in zip(bp['boxes'], palette_blue):
        box.set_facecolor(col)

    plt.title(f"{var} by Target", fontsize=25, weight="bold")
    plt.xlabel("Target", fontsize=30)
    plt.ylabel(var, fontsize=30)

    ax = plt.gca()
    ax.set_facecolor("white")
    ax.tick_params(axis="y", colors="black", labelsize=21)
    ax.tick_params(axis="x", colors="black", labelsize=18)
    for spine in ax.spines.values():
        spine.set_edgecolor("black")
        spine.set_linewidth(1)

    plt.grid(False)
    plt.tight_layout()
    plt.show()

In [None]:
#Distribution of each categorical predictor conditional on "Target". 
#This creates the second part of Figure 8

for var in categorical_predictors:
    plt.figure(figsize=(10, 6))

    count_tbl = df2.groupby(['Target_bin', var], observed=False).size().reset_index(name='count')
    pivot_cnt = (count_tbl
                 .pivot(index=var, columns='Target_bin', values='count')
                 .fillna(0))
    pivot_freq = pivot_cnt.div(pivot_cnt.sum(axis=1), axis=0) 

    ax = pivot_freq.plot(kind='bar', width=0.8, color=palette_blue[:pivot_freq.shape[1]], figsize=(10, 6))

    plt.title(f"{var} by Target", fontsize=25, weight="bold")
    plt.xlabel(var, fontsize=30)
    plt.ylabel("Frequency", fontsize=30)

    ax.set_facecolor("white")
    ax.tick_params(axis="y", colors="black", labelsize=21)
    ax.tick_params(axis="x", colors="black", labelsize=18)
    ax.set_xticklabels(ax.get_xticklabels(), rotation=0)  
    for spine in ax.spines.values():
        spine.set_edgecolor("black")
        spine.set_linewidth(1)

    legend = plt.legend(title="Target",
                        loc='upper right',  
                        frameon=True, fontsize=18)
    legend.get_title().set_fontsize(18)
    legend.get_frame().set_edgecolor("black")

    plt.grid(False)
    plt.tight_layout()
    plt.show()
    plt.close(fig)

In [None]:
df2.drop(["Target_bin"], axis=1, inplace=True)

# Modelling

### Preliminary Steps for Model Training

In [None]:
#One-hot encode the "Sector" variable
sector_dummies = pd.get_dummies(df2["Sector"], prefix="Sector", dtype=int, drop_first=True)
df2 = pd.concat([df2, sector_dummies], axis=1)

#This list contains the names of all the candidate predictors
predictors = continuous_predictors + categorical_predictors + list(sector_dummies.columns)
predictors.remove("Sector")

In [None]:
#Train–test split (first 7 years vs. following 2 years) + per-ticker scaling

df_train = df2[pd.to_datetime(df2["Date"]) < pd.to_datetime("2020-01-01")]
df_test = df2[pd.to_datetime(df2["Date"]) >= pd.to_datetime("2020-01-01")]

#Per-ticker mean and std for every predictor, computed on the train period
scaler_dict_analysis = {} #This will be a nested dictionary indexed by ticker. For every ticker, it will contain the mean and std of each predictor on the train set    
for ticker, g in df_train.groupby("Ticker"): #"g" is a sub-dataframe for one ticker
    mu = g[continuous_predictors].mean().astype("float32") #Pandas Series indexed by the predictors to standardise. The values are the predictors' means
    std = g[continuous_predictors].std(ddof=0).replace(0, 1e-12).astype("float32")  #avoid /0
    scaler_dict_analysis[ticker] = {"mean": mu, "std": std}


#helper function to standardise the continuous predictors by ticker using the means and stds stored in "scaler_dict_analysis"
def standardise(df_part: pd.DataFrame, variables: list[str]) -> pd.DataFrame:
    """
    Return a copy of df_part where the columns in *variables* are
    standardised (per-ticker) using the mean / std computed on the train set.
    All other columns are left unchanged.
    """
    pieces = []
    for ticker, g in df_part.groupby("Ticker", sort=False):
        m = scaler_dict_analysis[ticker]["mean"]
        s = scaler_dict_analysis[ticker]["std"]
        g_std = g.copy()
        g_std[variables] = ((g_std[variables] - m) / s).astype("float32")
        pieces.append(g_std)

    #Re-assemble in original row order
    df_out = pd.concat(pieces).loc[df_part.index]
    return df_out

In [None]:
df_train_stand = standardise(df_train, continuous_predictors)
df_test_stand = standardise(df_test, continuous_predictors)

In [None]:
X_train_stand, y_train = df_train_stand[predictors], df_train["Target"]
X_test_stand, y_test = df_test_stand[predictors], df_test["Target"]

#We prepare the data
X_train_stand_np = X_train_stand.values.astype("float32")
X_test_stand_np = X_test_stand.values.astype("float32")
y_train_np = y_train.values.astype("float32").reshape(-1, 1)
y_test_np = y_test.values.astype("float32").reshape(-1, 1)

In [None]:
#This function trains a model by minimising the DMSE on the given training set

def train_model(model_type, X_train, y_train, *, epochs=70, learn_rate=1e-2, 
                over_penalty=loss_penalty, batch_size=1024, seed=0):

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu") #for running-time purposes
    torch.manual_seed(seed)

    Xtr = torch.as_tensor(X_train, dtype=torch.float32, device=device)
    ytr = torch.as_tensor(y_train, dtype=torch.float32, device=device)

    train_dl = DataLoader(
        TensorDataset(Xtr, ytr),
        batch_size=batch_size,
        shuffle=False, 
        pin_memory=False,
        num_workers=0,
        persistent_workers=False)

    if model_type == "linear regression":
        model = nn.Linear(Xtr.shape[1], 1, bias=True)
    elif model_type == "neural network":
        #This builds an arbitrarily deep MLP from a list of widths
        #Each element of "hidden_sizes" represents the width of a particular layer of the network
        hidden_sizes = [1024, 512, 256, 256, 128, 128, 64, 32]
        layers, in_dim = [], Xtr.shape[1]
        for h in hidden_sizes:
            layers.extend([nn.Linear(in_dim, h), nn.ReLU()])  #we use the relu activation function
            in_dim = h
        layers.append(nn.Linear(in_dim, 1))
        model = nn.Sequential(*layers)

    model = model.to(device) 
    criterion = DMSE(over_penalty=over_penalty) #This is the custom loss function defined above
    optim = torch.optim.Adam(model.parameters(), lr=learn_rate)

    model.train()
    for _ in range(epochs):
        for xb, yb in train_dl:
            optim.zero_grad()
            loss = criterion(model(xb), yb)
            loss.backward()
            optim.step()

    return model

### Linear Regression

##### ANOVA

In [None]:
#Type-II ANOVA table. This creates the left-hand part of Table 2.

formula = "Target ~ " + " + ".join(predictors)  #a constant term is added by default
lm = smf.ols(formula, data=df_train_stand).fit()
anova_df_type2 = anova_lm(lm, typ=2).sort_values("PR(>F)", na_position='last')
anova_df_type2["Significance"] = anova_df_type2["PR(>F)"].apply(significance_label)
anova_df_type2

In [None]:
#Using the significance order given by the type-II ANOVA, we fit the usual Type-I (sequential) ANOVA.
#This creates the right-hand part of Table 2.

order = [p for p in anova_df_type2.index if p in predictors]  #this excludes "Residual"
formula_ord = "Target ~ 1 + " + " + ".join(order)

lm_ord = smf.ols(formula_ord, data=df_train_stand).fit()

anova_df_type1 = anova_lm(lm_ord, typ=1)
anova_df_type1["Significance"] = anova_df_type1["PR(>F)"].apply(significance_label)
anova_df_type1

##### AIC and BIC

In [None]:
#Here we define some helper functions to compute the actual loss, our estimate of sigma, 
#the log-likelihood, the AIC and the BIC under the DMSE framework.

def dmse_piecewise_loss(residuals: np.ndarray, alpha: float) -> np.ndarray:
    r2 = residuals**2
    return np.where(residuals >= 0.0, r2, alpha * r2)


def dmse_sigma_hat(residuals: np.ndarray, alpha: float) -> float:
    return dmse_piecewise_loss(residuals, alpha).mean() 


def dmse_loglik(residuals: np.ndarray, alpha: float) -> float:
    """
    Log-likelihood at (β̂, σ̂) under the two-piece Gaussian implied by DMSE.
    Uses σ̂² = mean L_(alpha)(r) and log-lik = n(0.5 log(2/π) - 0.5 log σ̂² - log(1 + alpha^{-0.5})) - n/2.
    """
    n = residuals.size
    L = dmse_piecewise_loss(residuals, alpha)
    sig2_hat = L.mean()                       #σ̂²
    return n * (0.5*np.log(2/np.pi) - 0.5*np.log(sig2_hat) - np.log1p(1/np.sqrt(alpha))) - n/2


def dmse_aic_bic(residuals: np.ndarray, alpha: float, n_params: int):
    n = residuals.shape[0]
    ll = dmse_loglik(residuals, alpha)
    aic = -2*ll + 2*n_params
    bic = -2*ll + n_params * np.log(n)
    return ll, aic, bic

In [None]:
#IRLS for DMSE linear regression
def fit_dmse_linear_irls(X: np.ndarray, y: np.ndarray, *, alpha=9, max_iter=20, tol=1e-8):
    """
    Returns (y_hat, residuals, coef, intercept).
    X: (n, p), y: (n,)
    We include an intercept by augmenting X with a column of 1s.
    """
    n, p = X.shape
    X1 = np.c_[np.ones((n, 1), dtype=np.float64), X.astype(np.float64, copy=False)]
    y  = y.astype(np.float64, copy=False)

    #start from OLS (cheap, good init)
    beta = np.linalg.lstsq(X1, y, rcond=None)[0]  #(p+1,)

    #IRLS loop
    for _ in range(max_iter):
        r = y - X1 @ beta
        w = np.where(r >= 0.0, 1.0, float(alpha))      #DMSE weights
        #WLS normal equations: (X^T W X) beta = X^T W y
        WX  = X1 * w[:, None]
        XT_W_X = X1.T @ WX
        XT_W_y = X1.T @ (w * y)
        beta_new = np.linalg.solve(XT_W_X, XT_W_y)
        if np.max(np.abs(beta_new - beta)) <= tol:
            beta = beta_new
            break
        beta = beta_new

    y_hat = X1 @ beta
    residuals = y - y_hat
    intercept = beta[0]
    coef = beta[1:]
    return y_hat, residuals, coef, intercept

#Scorer for a given subset
class DMSESubsetScorer:
    def __init__(self, X_df, y_series, alpha=9):
        self.X_df = X_df
        self.y = y_series.to_numpy(dtype=np.float64)
        self.alpha = alpha
        self.cache = {} 

    def score(self, feature_list):
        key = frozenset(feature_list)
        if key in self.cache:
            return self.cache[key]

        X = self.X_df[list(feature_list)].to_numpy(dtype=np.float64)
        _, residuals, coef, intercept = fit_dmse_linear_irls(X, self.y, alpha=self.alpha)

        #params = p betas + intercept + sigma
        n_params = len(feature_list) + 2
        ll, aic, bic = dmse_aic_bic(residuals, alpha=self.alpha, n_params=n_params)
        payload = {
            "ll": ll, "aic": aic, "bic": bic,
            "features": list(feature_list),
            "coef": coef, "intercept": intercept
        }
        self.cache[key] = payload
        return payload

#Stepwise search
def stepwise_selection_dmse(
    X_df, y_series, *, criterion="aic", alpha=9, tol=1e-8, forward_only=False):
    """
    - criterion: 'aic' or 'bic'
    - forward_only: if True, skip backward sweeps (much faster)
    """
    scorer = DMSESubsetScorer(X_df, y_series, alpha=alpha)
    remaining = list(X_df.columns)
    selected = []
    current_score = np.inf
    best_payload = None
    rng = np.random.default_rng(0)

    while remaining:
        #Optionally sub-sample candidates to cut work
        cand_pool = remaining

        #forward step
        candidates = []
        for cand in cand_pool:
            trial = selected + [cand]
            payload = scorer.score(trial)
            candidates.append((payload[criterion], cand, payload))
        best_new_score, best_cand, best_payload = min(candidates, key=lambda t: t[0])

        if best_new_score < current_score - tol:
            remaining.remove(best_cand)
            selected.append(best_cand)
            current_score = best_new_score
        else:
            break

        if forward_only:
            continue  #skip backward sweep

        #backward sweep
        improved = True
        while improved and len(selected) > 1:
            improved = False
            drops = []
            for combo in combinations(selected, len(selected) - 1):
                payload = scorer.score(list(combo))
                drops.append((payload[criterion], list(combo), payload))
            best_drop_score, best_combo, best_drop_payload = min(drops, key=lambda t: t[0])
            if best_drop_score < current_score - tol:
                selected = best_combo
                best_payload = best_drop_payload
                current_score = best_drop_score
                improved = True

    #fallback to best singleton if none selected
    if not selected:
        singleton_payloads = [DMSESubsetScorer(X_df, y_series, alpha=alpha).score([c]) for c in X_df.columns]
        best_payload = min(singleton_payloads, key=lambda d: d[criterion])

    return best_payload

In [None]:
X_all = df_train_stand[predictors]
y_all = df_train_stand["Target"]
ALPHA = loss_penalty

best_aic = stepwise_selection_dmse(
    X_all, y_all, criterion="aic", alpha=ALPHA,
    forward_only=False)

best_bic = stepwise_selection_dmse(
    X_all, y_all, criterion="bic", alpha=ALPHA,
    forward_only=False)

print("\nAIC-minimising model")
print(f"Number of predictors: {len(best_aic['features'])}")
print(f"AIC: {best_aic['aic']:.4f}")
print(f"Log-likelihood: {best_aic['ll']:.4f}")
print("Selected features:", best_aic["features"])

print("\nBIC-minimising model")
print(f"Number of predictors: {len(best_bic['features'])}")
print(f"BIC: {best_bic['bic']:.4f}")
print(f"Log-likelihood: {best_bic['ll']:.4f}")
print("Selected features:", best_bic["features"])

##### Out-of-Sample DMSE Estimation for Best Linear Regression Model 

In [None]:
predictors_lr = best_bic["features"]

In [None]:
#We train (on the train set) a single linear regression model on all the stocks. We then test it performance on the test set.
#We only use the predictors selected by the stepwise procedure above.

X_train_lr_subset = X_train_stand[predictors_lr].values.astype("float32")
X_test_lr_subset = X_test_stand[predictors_lr].values.astype("float32")

y_train_col = y_train_np if getattr(y_train_np, "ndim", 1) == 2 else y_train_np.reshape(-1, 1)

final_lr_model = train_model(
        "linear regression",
        X_train_lr_subset,
        y_train_col,
        epochs=15,
        learn_rate=5e-2,
        batch_size=len(X_train_lr_subset)
    )

with torch.no_grad():
    device = next(final_lr_model.parameters()).device
    X_test_t = torch.as_tensor(X_test_lr_subset, dtype=torch.float32, device=device)
    y_test_col = y_test_np if getattr(y_test_np, "ndim", 1) == 2 else y_test_np.reshape(-1, 1)
    y_test_t = torch.as_tensor(y_test_col, dtype=torch.float32, device=device)
    preds = final_lr_model(X_test_t)
    preds = torch.clamp(preds, min=0.0)   #force negative predictions to 0
    final_lr_dmse = DMSE(over_penalty=loss_penalty)(preds, y_test_t).item()

print(f"The global linear regression model achieves a DMSE of {final_lr_dmse:.5f} on the test set") 

In [None]:
#We now plot, for the first 10 stocks, the actual and predicted "Target" over time, using the single linear 
#regression model trained above. This creates the images in Figure 9

plot_df = df_test.loc[:, ["Ticker", "Date"]].copy()
y_true = y_test_t.squeeze(1).cpu().numpy()
y_pred = preds.squeeze(1).cpu().numpy()
plot_df["TrueTarget"] = y_true
plot_df["PredTarget"] = y_pred

for tkr in tickers[:10]:
    sub = plot_df.loc[plot_df["Ticker"] == tkr].sort_values("Date")
    if sub.empty:
        continue

    fig, ax = plt.subplots(figsize=(12, 4))
    ax.plot(sub["Date"], sub["TrueTarget"], lw=2.0, label="True Target")
    ax.plot(sub["Date"], sub["PredTarget"], lw=2.0, label="Predicted Target")

    ax.set_title(f"{tkr}: Actual and Predicted Target, Linear Regression Trained on all Stocks", fontsize=20, weight="bold")
    ax.set_ylabel("Target", fontsize=25)
    ax.set_xlabel("")
    ax.tick_params(axis="x", colors="black", labelsize=15)
    ax.tick_params(axis="y", colors="black", labelsize=18)
    ax.legend(loc="upper right", frameon=True, fontsize=20)
    ax.set_facecolor("white")    
    ax.grid(True, which="major", linestyle="-", alpha=0.15)

    ax.spines["top"].set_visible(False)
    ax.spines["right"].set_visible(False)
    fig.tight_layout()
    plt.show()
    plt.close(fig)

In [None]:
#We now train a different linear regression model for each stock (only the first 10 here). The training is performed
#on the first 7 years, the testing on the subsequent 2 years, and the predictors used are the same as before.

single_lr_models = {}
N_test = len(df_test)
#container for the assembled per-ticker predictions in test-row order
y_pred_single_all = np.full(N_test, np.nan, dtype=np.float32)


for tkr in tickers[:10]:
    idx_tr = np.flatnonzero((df_train["Ticker"].values == tkr))
    idx_te = np.flatnonzero((df_test["Ticker"].values == tkr))

    if len(idx_tr) == 0 or len(idx_te) == 0:
        continue 

    #Slice the (already standardized) features + target for this ticker
    X_tr_tkr = X_train_lr_subset[idx_tr, :]
    y_tr_tkr = y_train_np[idx_tr]
    y_tr_tkr = y_tr_tkr if getattr(y_tr_tkr, "ndim", 1) == 2 else y_tr_tkr.reshape(-1, 1)

    #Train linear regression exactly as before
    model_tkr = train_model(
        "linear regression",
        X_tr_tkr.astype("float32"),
        y_tr_tkr.astype("float32"),
        epochs=15,
        learn_rate=5e-2,
        batch_size=len(X_tr_tkr),
        seed=0
    )
    single_lr_models[tkr] = model_tkr

    #Predict on that ticker's test rows and clamp negatives to 0
    X_te_tkr = X_test_lr_subset[idx_te, :].astype("float32")
    with torch.no_grad():
        device_tkr = next(model_tkr.parameters()).device
        X_te_t = torch.as_tensor(X_te_tkr, dtype=torch.float32, device=device_tkr)
        preds_t = model_tkr(X_te_t)
        preds_t = torch.clamp(preds_t, min=0.0).squeeze(1).cpu().numpy().astype(np.float32)

    #Insert into the right positions of the full test array
    y_pred_single_all[idx_te] = preds_t

mask = ~np.isnan(y_pred_single_all)
with torch.no_grad():
    device_eval = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    y_true_masked = (y_test_np[mask] if getattr(y_test_np, "ndim", 1) == 1 else y_test_np[mask, 0]).reshape(-1, 1)
    y_pred_masked = y_pred_single_all[mask].reshape(-1, 1)

    y_true_t = torch.as_tensor(y_true_masked, dtype=torch.float32, device=device_eval)
    y_pred_t = torch.as_tensor(y_pred_masked, dtype=torch.float32, device=device_eval)
    dmse_single_lr = DMSE(over_penalty=loss_penalty)(y_pred_t, y_true_t).item()

print(f"(Per-ticker) linear regression achieves a DMSE of {dmse_single_lr:.5f} on the test set")

In [None]:
#We now plot, for the first 10 stocks, the actual and predicted "Target" over time (on the test set), using the per-ticker
#linear regression models trained above. The predictors used are the same as above. This creates the images in Figure 10

plot_df_single = df_test.loc[:, ["Ticker", "Date"]].copy()

if "y_true_1d" not in globals():
    y_true_1d = (y_test_np.astype("float32").reshape(-1)
                 if getattr(y_test_np, "ndim", 1) == 1
                 else y_test_np[:, 0].astype("float32"))

#Per-ticker predictions assembled in test-row order
plot_df_single["TrueTarget"]  = y_true_1d
plot_df_single["PredTarget"]  = y_pred_single_all.astype("float32")

for tkr in tickers[:10]:
    sub = plot_df_single.loc[plot_df_single["Ticker"] == tkr].sort_values("Date")
    if sub.empty:
        continue

    fig, ax = plt.subplots(figsize=(12, 4))
    ax.plot(sub["Date"], sub["TrueTarget"], lw=2.0, label="True Target")
    ax.plot(sub["Date"], sub["PredTarget"], lw=2.0, label="Predicted Target")

    ax.set_title(f"{tkr}: Actual and Predicted Target, Per-Ticker Linear Regression",
                 fontsize=20, weight="bold")
    ax.set_ylabel("Target", fontsize=25)
    ax.set_xlabel("")
    ax.tick_params(axis="x", colors="black", labelsize=15)
    ax.tick_params(axis="y", colors="black", labelsize=18)
    ax.legend(loc="upper right", frameon=True, fontsize=20)
    ax.set_facecolor("white")
    ax.grid(True, which="major", linestyle="-", alpha=0.15)
    ax.spines["top"].set_visible(False)
    ax.spines["right"].set_visible(False)
    fig.tight_layout()
    plt.show()
    plt.close(fig)

### Feedforward Neural Network

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class MLP(nn.Module):
    def __init__(self, in_dim, hidden_sizes, dropout=0.0, batchnorm=False, spectral_norm=False):
        super().__init__()
        layers = []
        d = in_dim
        for h in hidden_sizes:
            linear = nn.Linear(d, h)
            if spectral_norm:
                linear = nn.utils.spectral_norm(linear)
            block = [linear]
            if batchnorm:
                block.append(nn.BatchNorm1d(h))
            block += [nn.ReLU()]
            if dropout and dropout > 0:
                block.append(nn.Dropout(p=dropout))
            layers += block
            d = h
        self.backbone = nn.Sequential(*layers)
        self.head = nn.Sequential(
            nn.Linear(d, 1),
            nn.Softplus(beta=1.0)
        )

    def forward(self, x):
        return self.head(self.backbone(x))


#Elastic penalties (where we exclude biases and normalisation parameters)
def elastic_penalty(model, l1=0.0, l2=0.0):
    if l1 == 0.0 and l2 == 0.0:
        return torch.zeros((), device=next(model.parameters()).device)
    l1_term = torch.zeros((), device=next(model.parameters()).device)
    l2_term = torch.zeros((), device=next(model.parameters()).device)
    for m in model.modules():
        if isinstance(m, nn.Linear):
            W = m.weight
            if l1 > 0:
                l1_term = l1_term + W.abs().sum()
            if l2 > 0:
                l2_term = l2_term + (W * W).sum()
    return l1 * l1_term + l2 * l2_term

@torch.no_grad()
def dmse_score(model, X, y, over_penalty=9):
    model.eval()
    xb = torch.as_tensor(X, dtype=torch.float32, device=device)
    yb = torch.as_tensor(y, dtype=torch.float32, device=device).reshape(-1, 1)
    preds = model(xb)
    diff2 = (yb - preds) ** 2
    loss = torch.where(preds <= yb, diff2, diff2 * over_penalty).mean()
    return float(loss.item())


#This is a helper function to perform the validation split

def chronological_split(X, y, ticker_ids, val_frac=0.2):
    """
    For each ticker (where dates are sorted chronologically),
    take the last val_frac of *each* ticker as validation.
    """
    ticker_ids = np.asarray(ticker_ids)
    idx_tr, idx_va = [], []
    for t in np.unique(ticker_ids):
        idx = np.flatnonzero(ticker_ids == t)  #contiguous block
        k = int(round(len(idx) * (1.0 - val_frac)))
        idx_tr.append(idx[:k])
        idx_va.append(idx[k:])
    idx_tr = np.concatenate(idx_tr); idx_va = np.concatenate(idx_va)
    return (X[idx_tr], y[idx_tr]), (X[idx_va], y[idx_va])

def make_adamw(model, lr, weight_decay):
    decay, no_decay = [], []
    for module in model.modules():
        if isinstance(module, nn.Linear):
            decay.append(module.weight)     #decay on weights
            if module.bias is not None:
                no_decay.append(module.bias)
        elif isinstance(module, (nn.BatchNorm1d, nn.LayerNorm)):
            no_decay += list(module.parameters())
    names = {id(p): n for n, p in model.named_parameters()}
    for p in model.parameters():
        if id(p) not in map(id, decay + no_decay):
            no_decay.append(p)
    return torch.optim.AdamW(
        [{"params": decay,    "weight_decay": weight_decay, "lr": lr},
         {"params": no_decay, "weight_decay": 0.0,          "lr": lr}]
    )

#Training
def train_one_model(
    X_train_np, y_train_np, ticker_ids_train=None,
    hidden_sizes=(512, 256, 128, 64),
    dropout=0.0,
    batchnorm=False,
    spectral_norm=False,
    input_noise_std=0.0,
    #optimisation
    epochs=80,
    batch_size=1024,
    lr=1e-3,
    weight_decay=0.0,   
    l1=0.0,        
    l2=0.0,           
    patience=15,
    over_penalty=9,
    seed=0,
    val_frac=0.2):

    torch.manual_seed(seed)
    n_features = X_train_np.shape[1]

    (Xtr_np, ytr_np), (Xva_np, yva_np) = chronological_split(
        X_train_np, y_train_np, ticker_ids_train, val_frac=val_frac)

    use_val = (Xva_np.size > 0) 

    Xtr = torch.as_tensor(Xtr_np, dtype=torch.float32, device=device)
    ytr = torch.as_tensor(ytr_np, dtype=torch.float32, device=device).reshape(-1, 1)
    if use_val:
        Xva = torch.as_tensor(Xva_np, dtype=torch.float32, device=device)
        yva = torch.as_tensor(yva_np, dtype=torch.float32, device=device).reshape(-1, 1)

    #no shuffling, and don't drop the last partial batch
    train_dl = DataLoader(TensorDataset(Xtr, ytr), batch_size=batch_size,
                          shuffle=False, drop_last=False) 

    model = MLP(n_features, hidden_sizes, dropout, batchnorm, spectral_norm).to(device)
    criterion = DMSE(over_penalty=over_penalty)
    optim = make_adamw(model, lr=lr, weight_decay=weight_decay)
    sched = torch.optim.lr_scheduler.ReduceLROnPlateau(
        optim, mode="min", factor=0.5,
        patience=max(3, patience // 3), verbose=False
    )

    best_val, best_state = float("inf"), None
    history = {"epoch": [], "train_dmse": [], "val_dmse": [], "lr": []}
    no_improve = 0

    for epoch in range(1, epochs + 1):
        model.train()
        for xb, yb in train_dl:
            optim.zero_grad(set_to_none=True)
            xb_aug = xb + torch.randn_like(xb) * input_noise_std if input_noise_std > 0 else xb
            preds = model(xb_aug)
            loss = criterion(preds, yb)
            if l1 > 0 or l2 > 0:
                loss = loss + elastic_penalty(model, l1=l1, l2=l2)
            loss.backward()
            optim.step()

        with torch.no_grad():
            model.eval()
            train_dmse = dmse_score(model, Xtr_np, ytr_np, over_penalty=over_penalty)
            if use_val:
                val_dmse = dmse_score(model, Xva_np, yva_np, over_penalty=over_penalty)
                metric = val_dmse
            else:
                val_dmse = float("nan")
                metric = train_dmse  

            history["epoch"].append(epoch)
            history["train_dmse"].append(train_dmse)
            history["val_dmse"].append(val_dmse)
            history["lr"].append(optim.param_groups[0]["lr"])

            sched.step(metric)

            if use_val:
                if val_dmse + 1e-12 < best_val:
                    best_val = val_dmse
                    best_state = deepcopy(model.state_dict())
                    no_improve = 0
                else:
                    no_improve += 1
                if no_improve >= patience:
                    break

    if use_val and best_state is not None:
        model.load_state_dict(best_state)

    if use_val:
        with torch.no_grad():
            yva_hat = model(torch.as_tensor(Xva_np, dtype=torch.float32, device=device)).cpu().numpy().ravel()
        yva_mean = yva_np.mean() if yva_np.size else 0.0
        mean_ratio = float(yva_hat.mean() / (yva_mean + 1e-8))
        pred_std   = float(yva_hat.std())
        score_out  = best_val
    else:
        mean_ratio = float("nan")
        pred_std   = float("nan")
        score_out  = train_dmse 

    return model, history, score_out, {"mean_ratio": mean_ratio, "pred_std": pred_std}


#This runs the experiments
def run_all(X_train_stand_np, y_train_np, X_test_stand_np, y_test_np, over_penalty=9, ticker_ids_train=None):
    configs = [
        ("SpectralNorm+Dropout", dict(dropout=0.20, batchnorm=False, spectral_norm=True,
                                      input_noise_std=0.0, lr=1e-3, weight_decay=5e-4,
                                      l1=0.0, l2=0.0, hidden_sizes=(512,256,128,64), patience=15, epochs=80)),
        ("ElasticNet+Dropout",   dict(dropout=0.30, batchnorm=False, spectral_norm=False,
                                      input_noise_std=0.0, lr=1e-3, weight_decay=0.0,
                                      l1=1e-6, l2=1e-5, hidden_sizes=(512,256,128,64), patience=15, epochs=80)),
        ("L2+Dropout+BatchNorm", dict(dropout=0.25, batchnorm=True,  spectral_norm=False,
                                      input_noise_std=0.0, lr=1e-3, weight_decay=1e-3,
                                      l1=0.0, l2=0.0, hidden_sizes=(512,256,128,64), patience=15, epochs=80)),
        ("InputNoise+L2",        dict(dropout=0.00, batchnorm=False, spectral_norm=False,
                                      input_noise_std=0.05, lr=1e-3, weight_decay=1e-3,
                                      l1=0.0, l2=0.0, hidden_sizes=(512,256,128,64), patience=15, epochs=80)),
    ]

    models = {}
    rows = []
    for name, kwargs in configs:
        model, history, val_dmse, diags = train_one_model(X_train_stand_np, y_train_np, 
                                          ticker_ids_train=ticker_ids_train, over_penalty=over_penalty, **kwargs)
        models[name] = {"model": model, "history": history, "val_dmse": val_dmse, **diags}
        rows.append((name, val_dmse, abs(diags["mean_ratio"] - 1.0), -diags["pred_std"]))


    #Rank by validation DMSE, then prefer mean close to true mean, then larger prediction spread
    rows.sort(key=lambda t: (t[1], t[2], t[3]))
    best_name, best_val = rows[0][0], rows[0][1]

    #Retrain on the full 7-year window with the chosen hyperparameters
    best_cfg = {name: cfg for (name, cfg) in configs}[best_name]
    best_model, _, _train_dmse_full, _ = train_one_model(
    X_train_stand_np, y_train_np,
    ticker_ids_train=ticker_ids_train,
    over_penalty=over_penalty,
    val_frac=0.0,                #no hold-out
    **best_cfg
)

    test_dmse = dmse_score(best_model, X_test_stand_np, y_test_np, over_penalty=over_penalty)

    df_results = pd.DataFrame({
    "config":   [n for (n, _, _, _) in rows],
    "val_DMSE": [models[n]["val_dmse"] for (n, _, _, _) in rows],
    "mean_ratio": [models[n]["mean_ratio"] for (n, _, _, _) in rows],
    "pred_std":   [models[n]["pred_std"]   for (n, _, _, _) in rows],
    }).sort_values("val_DMSE").reset_index(drop=True)

    df_results["test_DMSE_best"] = None
    df_results.loc[df_results["config"] == best_name, "test_DMSE_best"] = test_dmse

    print(f"Selected best config: {best_name}  |  val DMSE = {best_val:.5f}  |  test DMSE = {test_dmse:.5f}")
    return best_model, best_name, df_results, models

In [None]:
ticker_ids_train = df_train["Ticker"].to_numpy()

best_model, best_name, df_results, models = run_all(
    X_train_stand_np, y_train_np, X_test_stand_np, y_test_np,
    over_penalty=9,
    ticker_ids_train=ticker_ids_train)

best_model.eval() 
with torch.no_grad():
    xb = torch.as_tensor(X_test_stand_np, dtype=torch.float32, device=device)
    preds = best_model(xb).cpu().numpy().ravel()

In [None]:
#Per-ticker plots using predictions from the global feedforward neural network

plot_df_global = df_test.loc[:, ["Ticker", "Date"]].copy()
plot_df_global["TrueTarget"] = y_test_np.ravel()
plot_df_global["PredTarget_GlobalMLP"] = np.asarray(preds).ravel()

for tkr in tickers[10:20]:
    sub = plot_df_global.loc[plot_df_global["Ticker"] == tkr].sort_values("Date")
    if sub.empty:
        continue
    fig, ax = plt.subplots(figsize=(12, 4))
    ax.plot(sub["Date"], sub["TrueTarget"], lw=2.0, label="True Target")
    ax.plot(sub["Date"], sub["PredTarget_GlobalMLP"], lw=2.0, label="Predicted Target")

    ax.set_title(f"{tkr}: Actual and Predicted Target, Feedforward Neural Network Trained on all Stocks", 
                 fontsize=20, weight="bold")
    ax.set_ylabel("Target", fontsize=25)
    ax.set_xlabel("")
    ax.tick_params(axis="x", colors="black", labelsize=15)
    ax.tick_params(axis="y", colors="black", labelsize=18)
    ax.legend(loc="upper right", frameon=True, fontsize=20)
    ax.set_facecolor("white")
    ax.grid(True, which="major", linestyle="-", alpha=0.15)
    ax.spines["top"].set_visible(False)
    ax.spines["right"].set_visible(False)
    fig.tight_layout()
    plt.show()

In [None]:
per_ticker_r = (
    plot_df_global.dropna(subset=["TrueTarget","PredTarget_GlobalMLP"])
    .groupby("Ticker")
    .apply(lambda d: d["TrueTarget"].corr(d["PredTarget_GlobalMLP"]))
    .sort_values(ascending=False)
)
print(per_ticker_r.round(3))

In [None]:
per_ticker_models = {}
N_test = len(df_test)
y_pred_single_mlp_all = np.full(N_test, np.nan, dtype=np.float32)

for tkr in tickers[10:20]:
    #indices for this ticker in train/test (aligned to X_*_stand_np / y_*_np)
    idx_tr = np.flatnonzero(df_train["Ticker"].values == tkr)
    idx_te = np.flatnonzero(df_test["Ticker"].values  == tkr)
    if len(idx_tr) == 0 or len(idx_te) == 0:
        continue

    #slice arrays
    X_tr = X_train_stand_np[idx_tr, :]
    y_tr = y_train_np[idx_tr]
    X_te = X_test_stand_np[idx_te, :]
    y_te = y_test_np[idx_te]

    #ticker ids for chronological_split inside train_one_model/run_all
    ticker_ids_train_tkr = df_train["Ticker"].values[idx_tr]  #array of the same ticker
    
    best_model_tkr, best_name_tkr, df_results_tkr, models_tkr = run_all(
        X_tr, y_tr, X_te, y_te,
        over_penalty=9,
        ticker_ids_train=ticker_ids_train_tkr
    )

    per_ticker_models[tkr] = {
        "model": best_model_tkr,
        "best_name": best_name_tkr,
        "df_results": df_results_tkr,
        "models": models_tkr
    }
    
    best_model_tkr.eval()
    with torch.no_grad():
        xb_te = torch.as_tensor(X_te, dtype=torch.float32, device=device)
        preds_te = best_model_tkr(xb_te).cpu().numpy().ravel().astype(np.float32)

    #place back in global test order
    y_pred_single_mlp_all[idx_te] = preds_te


mask = ~np.isnan(y_pred_single_mlp_all)
y_true_all = y_test_np[mask].reshape(-1, 1)
y_pred_all = y_pred_single_mlp_all[mask].reshape(-1, 1)

with torch.no_grad():
    dmse_aggregate = DMSE(over_penalty=9)(
        torch.as_tensor(y_pred_all, dtype=torch.float32, device=device),
        torch.as_tensor(y_true_all, dtype=torch.float32, device=device)
    ).item()
print(f"(Per-ticker MLP) Aggregate DMSE over the 10 tickers' test samples: {dmse_aggregate:.5f}")

In [None]:
plot_df_single = df_test.loc[:, ["Ticker", "Date"]].copy()
plot_df_single["TrueTarget"] = y_test_np.ravel()
plot_df_single["PredTarget_PerTickerMLP"] = y_pred_single_mlp_all

for tkr in tickers[10:20]:
    sub = plot_df_single.loc[plot_df_single["Ticker"] == tkr].sort_values("Date")
    if sub.empty:
        continue
    
    fig, ax = plt.subplots(figsize=(12, 4))
    ax.plot(sub["Date"], sub["TrueTarget"], lw=2.0, label="True Target")
    ax.plot(sub["Date"], sub["PredTarget_PerTickerMLP"], lw=2.0, label="Predicted Target")

    ax.set_title(f"{tkr}: Actual and Predicted Target, Per-Ticker Feedforward Neural Network", 
                 fontsize=20, weight="bold")
    ax.set_ylabel("Target", fontsize=25)
    ax.set_xlabel("")
    ax.tick_params(axis="x", colors="black", labelsize=15)
    ax.tick_params(axis="y", colors="black", labelsize=18)
    ax.legend(loc="upper right", frameon=True, fontsize=20)
    ax.set_facecolor("white")
    ax.grid(True, which="major", linestyle="-", alpha=0.15)
    ax.spines["top"].set_visible(False)
    ax.spines["right"].set_visible(False)
    fig.tight_layout()
    plt.show()

### Random Forest

In [None]:
#For the random forest model, we use non-standardised data
X_train = df_train[predictors]
X_test = df_test[predictors]

#We fit a full random forest model
rf = RandomForestRegressor(n_estimators=100, random_state=1, n_jobs=-1)
rf.fit(X_train, y_train)

#This computes feature importance based on the reduction in impurity attributable to each variable
#This is Table 4
importance_df_rf = pd.DataFrame({"Predictor": predictors, "Importance": rf.feature_importances_})
importance_df_rf.sort_values(by="Importance", ascending=False, inplace=True)
importance_df_rf

In [None]:
sorted_predictors_rf = list(importance_df_rf["Predictor"])

In [None]:
imp = list(importance_df_rf["Importance"])
sum(imp[:6])

In [None]:
X_train_np = X_train.to_numpy(dtype=np.float32, copy=False)
X_test_np  = X_test.to_numpy(dtype=np.float32,  copy=False)
y_train_np = y_train.to_numpy(dtype=np.float32, copy=False)
y_test_np  = y_test.to_numpy(dtype=np.float32,  copy=False)

col_to_idx = {c: i for i, c in enumerate(predictors)}

criterion = DMSE() 
y_test_tensor = torch.as_tensor(y_test_np.reshape(-1, 1), dtype=torch.float32)


performance_rf = {}

for k in range(1, len(sorted_predictors_rf) + 1):
    selected_variables = sorted_predictors_rf[:k]
    idx = [col_to_idx[c] for c in selected_variables]

    X_rf_train_subset = X_train_np[:, idx]
    X_rf_test_subset  = X_test_np[:, idx]

    temporary_model = RandomForestRegressor(n_estimators=100, random_state=1, n_jobs=-1)
    temporary_model.fit(X_rf_train_subset, y_train_np)

    temporary_y_pred = temporary_model.predict(X_rf_test_subset)

    y_pred_tensor = torch.as_tensor(temporary_y_pred.reshape(-1, 1), dtype=torch.float32)
    performance_rf[k] = criterion(y_pred_tensor, y_test_tensor).item()

In [None]:
#This creates Figure 13

fig = plt.figure(figsize=(8, 5)) 
fig.patch.set_facecolor("white") 

plt.plot(list(performance_rf.keys()), list(performance_rf.values()), color="magenta")       
plt.xlabel("Number of Predictors", fontsize=20)
plt.ylabel("DMSE", fontsize=20)
plt.title(f"Out-of-Sample DMSE of Random Forest as a Function of Number of Predictors", 
          fontsize=13, weight="bold")                         
plt.grid(color="gray", linestyle="--", linewidth=0.5)
ax = plt.gca() 
ax.set_facecolor("white") 
ax.tick_params(axis="x", colors="black", labelsize=13) 
ax.tick_params(axis="y", colors="black", labelsize=16)  
ax.set_xticks(range(1, len(sorted_predictors_rf)+1))
for spine in ax.spines.values(): 
    spine.set_edgecolor("black") 
    spine.set_linewidth(1)

plt.show()
plt.close(fig)

In [None]:
#We start by training, on the first 7 years, a global random forest which includes all stocks (and predictors).
#We then test it on the following 2 years

rf_global_model = RandomForestRegressor(
    n_estimators=100, random_state=1, n_jobs=-1 
)
rf_global_model.fit(X_train_np, y_train_np)

#Predict on the test set
rf_global_pred = rf_global_model.predict(X_test_np).astype(np.float32)

#DMSE on the test set
y_pred_tensor = torch.as_tensor(rf_global_pred.reshape(-1, 1), dtype=torch.float32)
rf_global_dmse = criterion(y_pred_tensor, y_test_tensor).item()
print(f"Global RF (all predictors, all stocks) — Test DMSE: {rf_global_dmse:.5f}")

#Per-ticker time-series plots using the global RF predictions
plot_df_rf_global = df_test.loc[:, ["Ticker", "Date"]].copy()
plot_df_rf_global["TrueTarget"] = y_test_np.ravel()
plot_df_rf_global["PredTarget_RF_Global"] = rf_global_pred.ravel()

for tkr in tickers[20:]:
    sub = plot_df_rf_global.loc[plot_df_rf_global["Ticker"] == tkr].sort_values("Date")
    if sub.empty:
        continue
    fig, ax = plt.subplots(figsize=(12, 4))
    ax.plot(sub["Date"], sub["TrueTarget"], lw=2.0, label="True Target")
    ax.plot(sub["Date"], sub["PredTarget_RF_Global"], lw=2.0, label="Predicted Target")
    
    ax.set_title(f"{tkr}: Actual and Predicted Target, Random Forest Trained on all Stocks", 
                 fontsize=20, weight="bold")
    ax.set_ylabel("Target", fontsize=25)
    ax.set_xlabel("")
    ax.tick_params(axis="x", colors="black", labelsize=15)
    ax.tick_params(axis="y", colors="black", labelsize=18)
    ax.legend(loc="upper right", frameon=True, fontsize=20)
    ax.set_facecolor("white")
    ax.grid(True, which="major", linestyle="-", alpha=0.15)
    ax.spines["top"].set_visible(False)
    ax.spines["right"].set_visible(False)
    fig.tight_layout()
    plt.show()
    plt.close(fig)

In [None]:
#We now train a different random forest model for each stock, performing the same tests as before

rf_models_by_ticker = {}

N_test = len(df_test)
rf_per_ticker_pred_all = np.full(N_test, np.nan, dtype=np.float32)

for tkr in tickers[20:]:
    idx_tr = np.flatnonzero(df_train["Ticker"].values == tkr)
    idx_te = np.flatnonzero(df_test["Ticker"].values  == tkr)
    if len(idx_tr) == 0 or len(idx_te) == 0:
        continue

    X_tr_tkr = X_train_np[idx_tr, :]
    y_tr_tkr = y_train_np[idx_tr]
    X_te_tkr = X_test_np[idx_te, :]

    #Train a fresh RF for this ticker
    rf_tkr = RandomForestRegressor(
        n_estimators=100, random_state=1, n_jobs=-1
    )
    rf_tkr.fit(X_tr_tkr, y_tr_tkr)
    rf_models_by_ticker[tkr] = rf_tkr

    #Predict on this ticker's test rows
    yhat_tkr = rf_tkr.predict(X_te_tkr).astype(np.float32)

    #Place predictions back into the global array using the test indices
    rf_per_ticker_pred_all[idx_te] = yhat_tkr

#Overall DMSE computed only on the last 10 tickers
mask_last10 = np.array(pd.Index(df_test["Ticker"]).isin(tickers[20:])).astype(bool)

y_pred_subset = rf_per_ticker_pred_all[mask_last10]
y_true_subset = y_test_np[mask_last10]

y_pred_tensor_pt = torch.as_tensor(y_pred_subset.reshape(-1, 1), dtype=torch.float32)
y_true_tensor_pt = torch.as_tensor(y_true_subset.reshape(-1, 1), dtype=torch.float32)

rf_per_ticker_dmse = criterion(y_pred_tensor_pt, y_true_tensor_pt).item()
print(f"Per-ticker RFs (last 10 tickers) — Test DMSE: {rf_per_ticker_dmse:.5f}")

In [None]:
#We now produce the same plots as before, using the per-ticker random forests

plot_df_rf_single = df_test.loc[:, ["Ticker", "Date"]].copy()
plot_df_rf_single["TrueTarget"] = y_test_np.ravel()
plot_df_rf_single["PredTarget_RF_PerTicker"] = rf_per_ticker_pred_all.ravel()

for tkr in tickers[20:]:
    sub = plot_df_rf_single.loc[plot_df_rf_single["Ticker"] == tkr].sort_values("Date")
    if sub.empty:
        continue
    fig, ax = plt.subplots(figsize=(12, 4))
    ax.plot(sub["Date"], sub["TrueTarget"], lw=2.0, label="True Target")
    ax.plot(sub["Date"], sub["PredTarget_RF_PerTicker"], lw=2.0, label="Predicted Target")
    
    ax.set_title(f"{tkr}: Actual and Predicted Target, Per-Ticker Random Forest", 
                 fontsize=20, weight="bold")
    ax.set_ylabel("Target", fontsize=25)
    ax.set_xlabel("")
    ax.tick_params(axis="x", colors="black", labelsize=15)
    ax.tick_params(axis="y", colors="black", labelsize=18)
    ax.legend(loc="upper right", frameon=True, fontsize=20)
    ax.set_facecolor("white")
    ax.grid(True, which="major", linestyle="-", alpha=0.15)
    ax.spines["top"].set_visible(False)
    ax.spines["right"].set_visible(False)
    fig.tight_layout()
    plt.show()
    plt.close(fig)

### Autoregressive Moving-Average Integrated Models

In [None]:
#ARIMA/ARIMAX via H reduced-frequency subseries

H = 10  #horizon

#Some DMSE helper functions
def dmse_np(y_true, y_pred, alpha=9.0):
    y_true = np.asarray(y_true, dtype=float).ravel()
    y_pred = np.asarray(y_pred, dtype=float).ravel()
    diff2 = (y_true - y_pred) ** 2
    w = (y_pred > y_true).astype(float) * (alpha - 1.0) + 1.0
    return float(np.mean(diff2 * w))

def mse_np(y_true, y_pred):
    y_true = np.asarray(y_true, dtype=float).ravel()
    y_pred = np.asarray(y_pred, dtype=float).ravel()
    return float(np.mean((y_true - y_pred) ** 2))

#Some fitting helper functions
def _fit_sarimax(endog, exog, order, *, trend=None):
    with warnings.catch_warnings():
        warnings.simplefilter("ignore")
        trend_use = "c" if order[1] == 0 else "n"
        mod = SARIMAX(
            endog=endog, exog=exog, order=order,
            seasonal_order=(0,0,0,0), trend=trend_use,
            enforce_stationarity=False, enforce_invertibility=False,
            measurement_error=False, mle_regression=True,
            missing="drop"              
        )
        res = mod.fit(disp=False, maxiter=50, method="lbfgs")  
    return res

def _bic_of_order_dmse(y_tr, x_tr, order, alpha):
    """
    DMSE-based BIC for a candidate ARIMA/ARIMAX order on the reduced-frequency subseries.
    We fit (Gaussian) SARIMAX to get 1-step-ahead in-sample predictions, compute residuals,
    evaluate the DMSE log-likelihood, and then form BIC = -2*ll + k*log(n),
    with k counting ALL estimated params (AR, MA, intercept, exog, variance).
    """
    try:
        res = _fit_sarimax(y_tr, x_tr, order)  
        yhat = np.asarray(res.fittedvalues, dtype=float)  
        resid = np.asarray(y_tr, dtype=float) - yhat
        ll = dmse_loglik(resid, alpha=alpha)  
        k = int(res.params.size)               #includes intercept, AR, MA, exog, sigma2
        n = resid.size
        bic = -2.0 * ll + k * np.log(n)
        return float(bic) if np.isfinite(bic) else np.inf
    except Exception:
        return np.inf

def _initial_order_bic(y_tr, x_tr=None, alpha=9.0):
    grid = [(p, d, q) for p in range(4) for d in (0, 1) for q in range(4)]
    best, best_bic = None, np.inf
    for od in grid:
        bic = _bic_of_order_dmse(y_tr, x_tr, od, alpha)
        if bic < best_bic:
            best_bic, best = bic, od
    return best if best is not None else (0, 0, 0)

def _local_grid_around(p0, d0, q0):
    P = [x for x in {p0-1, p0, p0+1} if 0 <= x <= 2]   #cap at 2
    Q = [x for x in {q0-1, q0, q0+1} if 0 <= x <= 2]
    D = [0, 1]
    return [(p, d, q) for p in P for d in D for q in Q]

def _fit_and_forecast(y_tr_sub, x_tr_sub, y_va, x_va, order, use_exog):
    """
    Fit on y_tr_sub (optionally with x_tr_sub) and one-step-ahead forecast
    the validation horizon len(y_va) on the reduced-frequency index.
    """
    try:
        res = _fit_sarimax(y_tr_sub, (x_tr_sub if use_exog else None), order)
        res2 = res.apply(endog=y_va, exog=(x_va if use_exog else None), refit=False)
        yhat = np.asarray(res2.fittedvalues, dtype=float)[-len(y_va):]  #1-step ahead for appended
        if yhat.shape[0] != len(y_va):
            return None, None
        return yhat, res
    except Exception:
        return None, None


#Core routine for one ticker
def _predict_one_ticker(df_tr_tkr, df_te_tkr, predictors, H=10, alpha=9.0, val_frac=0.10):
    """
    -Initial order guess on each remainder-class subseries via BIC.
    -Local grid around (p0,d0,q0).
    -Internal DMSE validation (last 10% of the training subseries). For each candidate order, fit both 
    ARIMA (no exog) and ARIMAX (with exog). Keep the (order, model_type) with lowest DMSE; fall back to 
    (0,0,0) if needed.
    -Refit on entire training subseries with the selected choice and forecast the test subseries. Recombine 
    H subseries back to raw test dates.
    """
    if df_te_tkr.empty:
        return np.array([], dtype=float), pd.Series([], dtype="datetime64[ns]")

    #Build a raw-time index t = 0..(n_all-1) across train+test (sorted by Date).
    df_all = pd.concat([df_tr_tkr, df_te_tkr], axis=0, ignore_index=True).sort_values("Date")
    df_all = df_all.reset_index(drop=True)
    df_all["t_raw"] = np.arange(len(df_all), dtype=int)
    #Map raw index back to each split
    df_tr = df_all.iloc[:len(df_tr_tkr)]
    df_te = df_all.iloc[len(df_tr_tkr):]

    #Pre-extract arrays
    y_tr_all = df_tr["Target"].to_numpy(dtype=float)
    y_te_all = df_te["Target"].to_numpy(dtype=float)
    X_tr_all = df_tr[predictors].to_numpy(dtype=float) if predictors else None
    X_te_all = df_te[predictors].to_numpy(dtype=float) if predictors else None
    t_tr = df_tr["t_raw"].to_numpy()
    t_te = df_te["t_raw"].to_numpy()

    #Container for test predictions (aligned to df_te order)
    yhat_te = np.full(shape=(len(df_te),), fill_value=np.nan, dtype=float)
    bic_vals = []
    val_mse_vals = []

    #Work remainder class by remainder class
    for r in range(H):
        #Indices in reduced-frequency subseries (training)
        idx_tr_r = np.flatnonzero(t_tr % H == r)
        if idx_tr_r.size == 0:
            continue
        y_tr_r = y_tr_all[idx_tr_r]
        X_tr_r = None if X_tr_all is None else X_tr_all[idx_tr_r, :]

        #Initial order via BIC on the whole training subseries y^(r)
        p0, d0, q0 = _initial_order_bic(y_tr_r, None, alpha=alpha)

        #Local grid
        candidates = _local_grid_around(p0, d0, q0)

        #Validation split on the subseries (last 10% as validation)
        n = len(y_tr_r)
        n_va = max(1, int(np.floor(val_frac * n)))
        n_tr_sub = max(1, n - n_va)
        y_tr_sub, y_va = y_tr_r[:n_tr_sub], y_tr_r[n_tr_sub:]
        X_tr_sub = None if X_tr_r is None else X_tr_r[:n_tr_sub, :]
        X_va = None if X_tr_r is None else X_tr_r[n_tr_sub:, :]

        #Internal DMSE validation: try ARIMA and ARIMAX for each order
        best_tuple = None  #(dmse, order, use_exog, fitted_result_for_refit_is_irrelevant)
        for od in candidates:
            #ARIMA (no exog)
            yhat_a, _ = _fit_and_forecast(y_tr_sub, None, y_va, None, od, use_exog=False)
            if yhat_a is not None:
                dmse_a = mse_np(y_va, yhat_a)  #validation uses MSE
                if (best_tuple is None) or (dmse_a < best_tuple[0] - 1e-10) or \
                   (abs(dmse_a - best_tuple[0]) <= 1e-10 and best_tuple[2] is True):
                    best_tuple = (dmse_a, od, False)
            #ARIMAX (with contemporaneous X)
            if X_tr_sub is not None:
                yhat_x, _ = _fit_and_forecast(y_tr_sub, X_tr_sub, y_va, X_va, od, use_exog=True)
                if yhat_x is not None:
                    dmse_x = mse_np(y_va, yhat_x)  #validation uses MSE
                    if (best_tuple is None) or (dmse_x < best_tuple[0] - 1e-10) or \
                       (abs(dmse_x - best_tuple[0]) <= 1e-10 and best_tuple[2] is False):
                        best_tuple = (dmse_x, od, True)  #prefer exog on ties  

        #Fallback if nothing converged: try (0,0,0) without exog; if still failing, use mean
        if best_tuple is None:
            od = (0, 0, 0)
            yhat_try, _ = _fit_and_forecast(y_tr_sub, None, y_va, None, od, use_exog=False)
            if yhat_try is None:
                best_tuple = (mse_np(y_va, np.full_like(y_va, y_tr_sub.mean())), od, False)
            else:
                best_tuple = (mse_np(y_va, yhat_try), od, False)

        val_mse_vals.append(best_tuple[0])   #store chosen validation error (MSE)
        _, best_order, best_use_exog = best_tuple

        #Final refit on full training subseries and forecast this remainder's test subseries
        idx_te_r = np.flatnonzero(t_te % H == r)
        if idx_te_r.size == 0:
            continue

        y_te_r = y_te_all[idx_te_r]
        X_te_r = None if X_te_all is None else X_te_all[idx_te_r, :]

        try:
            res_full = _fit_sarimax(y_tr_r, (X_tr_r if best_use_exog else None), best_order)
            res_ext  = res_full.apply(endog=y_te_r, exog=(X_te_r if best_use_exog else None), refit=False)
            yhat_r   = np.asarray(res_ext.fittedvalues, dtype=float)[-len(y_te_r):]
            #DMSE-based BIC for the selected model on the training subseries
            fit_resid = y_tr_r - np.asarray(res_full.fittedvalues, dtype=float)
            ll_r = dmse_loglik(fit_resid, alpha=alpha)
            k_r  = int(res_full.params.size)
            n_r  = fit_resid.size
            bic_r = -2.0 * ll_r + k_r * np.log(n_r)
            bic_vals.append(bic_r)

        except Exception:
            yhat_r = np.full(shape=len(y_te_r), fill_value=float(np.mean(y_tr_r)), dtype=float)
            bic_vals.append(np.nan)  #keep lengths aligned

        #Place predictions back on the raw test rows for this remainder
        yhat_te[idx_te_r] = yhat_r

    #Ensure we produced all predictions (fill any remaining NaNs with subseries mean)
    if np.isnan(yhat_te).any():
        m = np.nanmean(yhat_te)
        yhat_te = np.where(np.isnan(yhat_te), m, yhat_te)

    diag = {
    "bic_mean": float(np.nanmean(bic_vals)) if bic_vals else np.nan,
    "val_mse_mean": float(np.nanmean(val_mse_vals)) if val_mse_vals else np.nan,
    }
    return yhat_te, df_te["Date"].reset_index(drop=True), diag


#Driver over all tickers
def arima_subseries_pipeline(
    df_train, df_test, predictors, H=10, alpha=9.0, val_frac=0.10, do_plots=False, tickers=None
):
    #If no list is provided, use all tickers present in BOTH train and test
    if tickers is None:
        all_train = pd.Index(df_train["Ticker"].unique())
        all_test  = pd.Index(df_test["Ticker"].unique())
        tickers = pd.Index(sorted(all_train.intersection(all_test)))
    else:
        #allow a single string or any iterable of tickers
        if isinstance(tickers, str):
            tickers = [tickers]
        allowed = set(df_train["Ticker"]).intersection(set(df_test["Ticker"]))
        tickers = pd.Index([t for t in tickers if t in allowed])

    out_rows = []
    preds_all = []

    counter = 0
    for tkr in tickers:
        tr = df_train.loc[df_train["Ticker"] == tkr].sort_values("Date").reset_index(drop=True)
        te = df_test.loc[df_test["Ticker"] == tkr].sort_values("Date").reset_index(drop=True)
        if len(tr) == 0 or len(te) == 0:
            continue

        yhat_te, te_dates, diag = _predict_one_ticker(tr, te, predictors, H=H, alpha=alpha, val_frac=val_frac)
        y_te = te["Target"].to_numpy(dtype=float)
        test_err = dmse_np(y_te, yhat_te, alpha=alpha)   #DMSE on test


        #Store rows for global prediction DF
        preds_all.append(pd.DataFrame({
            "Ticker": tkr,
            "Date": te_dates.to_numpy(),
            "TrueTarget": y_te,
            "PredTarget_ARIMAX_subseries": yhat_te
        }))

        out_rows.append({
            "Ticker": tkr,
            "BIC": diag["bic_mean"],
            "Validation Error": diag["val_mse_mean"],  #MSE on validation
            "Test Error": test_err                     #DMSE on test
        })


        if do_plots and counter < 10:
            fig, ax = plt.subplots(figsize=(12, 4))
            ax.plot(te_dates, y_te, lw=2.0, label="True Target")
            ax.plot(te_dates, yhat_te, lw=2.0, label="Predicted Target")
            ax.set_title(f"{tkr}: Actual and Predicted Target, ARIMA/ARIMAX",
                         fontsize=20, weight="bold")
            ax.set_ylabel("Target", fontsize=25)
            ax.set_xlabel("")
            ax.tick_params(axis="x", colors="black", labelsize=15)
            ax.tick_params(axis="y", colors="black", labelsize=18)
            ax.legend(loc="upper right", frameon=True, fontsize=20)
            ax.set_facecolor("white")
            ax.grid(True, which="major", linestyle="-", alpha=0.15)
            ax.spines["top"].set_visible(False)
            ax.spines["right"].set_visible(False)
            fig.tight_layout()
            plt.show()
            plt.close(fig)

    preds_df = pd.concat(preds_all, axis=0, ignore_index=True)
    summary_df = (
        pd.DataFrame(out_rows)[["Ticker", "BIC", "Validation Error", "Test Error"]]
        .sort_values("Test Error")
        .reset_index(drop=True)
    )
    
    return preds_df, summary_df

In [None]:
preds_df_arimax, summary_df_arimax = arima_subseries_pipeline(
    df_train, df_test, predictors, H=H, alpha=9.0, val_frac=0.10, do_plots=True)

display(summary_df_arimax)

df_test_with_arimax = (df_test.merge(preds_df_arimax[["Ticker", "Date", "PredTarget_ARIMAX_subseries"]],
                       on=["Ticker", "Date"], how="left"))

### Long Short-Term Memory Neural Network

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
warnings.filterwarnings("ignore")

SEQ_LEN   = 20
HIDDEN    = 64
LAYERS    = 2  
BATCH_T   = 32
LR        = 3e-3
EPOCHS    = 40
PATIENCE  = 10
ALPHA     = loss_penalty
EPS       = 1e-8
DROPOUT   = 0.20         
BIDIR     = True         
HEAD_H    = 128   

def dmse_loss(y_true, y_pred, alpha=loss_penalty, reduction="mean"):
    """
    Numpy/Pandas-friendly wrapper around the DMSE torch loss.
    Returns a Python float (mean by default).
    """
    #to 1-D torch tensors
    y_true_t = torch.as_tensor(np.asarray(y_true).ravel(), dtype=torch.float32)
    y_pred_t = torch.as_tensor(np.asarray(y_pred).ravel(), dtype=torch.float32)

    #mask out NaN/inf just in case
    m = torch.isfinite(y_true_t) & torch.isfinite(y_pred_t)
    if not torch.any(m):
        return float("nan")

    crit = DMSE(over_penalty=alpha, reduction=reduction)
    with torch.no_grad():
        val = crit(y_hat=y_pred_t[m], y=y_true_t[m]).item()
    return float(val)

def _ensure_dt_naive(df_like):
    df_like["Date"] = pd.to_datetime(df_like["Date"]).dt.tz_localize(None)
    return df_like

for _d in (df2, df_train, df_test):
    _ensure_dt_naive(_d)

ticker_to_idx = {t:i for i,t in enumerate(tickers)}

#Time ranges (inclusive)
train_start = pd.to_datetime(df_train["Date"]).min()
train_end   = pd.to_datetime(df_train["Date"]).max()
test_start  = pd.to_datetime(df_test["Date"]).min()
test_end    = pd.to_datetime(df_test["Date"]).max()

F = len(predictors)
S = len(tickers)

#Build aligned panel tensors: X_panel[T, S, F], y_panel[T, S], dates[T]
def build_panel_arrays(df, tickers, predictors):
    df = df.sort_values(["Date", "Ticker"])
    dates = pd.to_datetime(sorted(df["Date"].unique()))
    T = len(dates)
    X_panel = np.empty((T, len(tickers), len(predictors)), dtype=np.float32)
    y_panel = np.empty((T, len(tickers)), dtype=np.float32)

    #pre-index by date for quick reindex per ticker
    for s, tkr in enumerate(tickers):
        sub = (df.loc[df["Ticker"] == tkr]
                 .set_index("Date")
                 .reindex(dates))  #ensures alignment
        X_panel[:, s, :] = sub[predictors].to_numpy(dtype=np.float32)
        y_panel[:, s]    = sub["Target"].to_numpy(dtype=np.float32)
    return dates, X_panel, y_panel

dates_all, X_panel, y_panel = build_panel_arrays(df2, tickers, predictors)
T_total = len(dates_all)

#Observability mask: Target must be known for all tickers at time t
obs_mask_all = np.all(~np.isnan(y_panel), axis=1)

#Time index helpers
train_time_mask = (dates_all >= train_start) & (dates_all <= train_end)
test_time_mask  = (dates_all >= test_start)  & (dates_all <= test_end)

#For sequence t, we need indices [t-SEQ_LEN, ..., t-1] available.
seq_ok_mask = np.arange(T_total) >= SEQ_LEN

#Standardise predictors using ONLY training history that can appear in input windows
scaler_time_mask = (dates_all <= train_end)
X_mu = np.nanmean(X_panel[scaler_time_mask], axis=(0,1))    
X_sd = np.nanstd (X_panel[scaler_time_mask], axis=(0,1)) + EPS
X_panel = (X_panel - X_mu) / X_sd

#Custom class for the dataset
class PanelSequenceDataset(Dataset):
    """
    Items are time indices t. Each item returns:
      X: (S, SEQ_LEN, F)
      y: (S,)
      t: int  (time index)
    """
    def __init__(self, X_panel, y_panel, dates, time_mask):
        self.X = X_panel
        self.y = y_panel
        self.dates = dates
        base = np.where(time_mask & obs_mask_all & seq_ok_mask)[0]
        self.t_idx = base.astype(np.int32)

    def __len__(self):
        return len(self.t_idx)

    def __getitem__(self, i):
        t = int(self.t_idx[i])
        X_seq = self.X[t-SEQ_LEN:t, :, :]           #(SEQ_LEN, S, F)
        X_seq = np.transpose(X_seq, (1,0,2))        #(S, SEQ_LEN, F)
        y_t   = self.y[t, :]                        #(S,)
        return torch.from_numpy(X_seq), torch.from_numpy(y_t), t  #this is an integer

#Build time masks for train/val/test
#Validation = last 10% of training time indices (after all filters)
train_ds_tmp = PanelSequenceDataset(X_panel, y_panel, dates_all, train_time_mask)
n_train_times = len(train_ds_tmp)
n_val_times   = max(1, int(0.10 * n_train_times))
#Split by raw order:
train_time_idx = train_ds_tmp.t_idx[:-n_val_times] if n_train_times > n_val_times else train_ds_tmp.t_idx[:0]
val_time_idx   = train_ds_tmp.t_idx[-n_val_times:] if n_val_times > 0 else np.array([], dtype=np.int32)

def mask_from_indices(idxs, T_len):
    m = np.zeros(T_len, dtype=bool); m[idxs] = True; return m

train_mask_final = mask_from_indices(train_time_idx, T_total)
val_mask_final   = mask_from_indices(val_time_idx,   T_total)

train_ds = PanelSequenceDataset(X_panel, y_panel, dates_all, train_mask_final)
val_ds   = PanelSequenceDataset(X_panel, y_panel, dates_all, val_mask_final)
test_ds  = PanelSequenceDataset(X_panel, y_panel, dates_all, test_time_mask)

train_loader = DataLoader(train_ds, batch_size=BATCH_T, shuffle=True,  drop_last=False)
val_loader   = DataLoader(val_ds,   batch_size=BATCH_T, shuffle=False, drop_last=False)
test_loader  = DataLoader(test_ds,  batch_size=BATCH_T, shuffle=False, drop_last=False)

print(f"[LSTM] Tickers: {S}, Features: {F}, Train steps: {len(train_ds)}, Val steps: {len(val_ds)}, Test steps: {len(test_ds)}")

#Target standardisation (per-ticker, only on the training set)
train_targets_mat = y_panel[train_ds.t_idx, :]                              #(n_train_times, S)
Y_MU = np.nanmean(train_targets_mat, axis=0).astype(np.float32)             #(S,)
Y_SD = (np.nanstd(train_targets_mat, axis=0) + EPS).astype(np.float32)      #(S,)

Y_MU_T = torch.tensor(Y_MU, dtype=torch.float32, device=device)             #(S,)
Y_SD_T = torch.tensor(Y_SD, dtype=torch.float32, device=device)             #(S,)

print(f"[LSTM] y mean (median across tickers)={np.nanmedian(Y_MU):.5f}, "
      f"y std (median across tickers)={np.nanmedian(Y_SD):.5f}")

#Custom model class
class SharedTickerLSTM(nn.Module):
    """
    Shared per-ticker BiLSTM encoder + attention pooling + MLP head (non-negative output).
    Forward expects X of shape (B, S, SEQ_LEN, F); we flatten to (B*S, SEQ_LEN, F).
    """
    def __init__(self, input_size, hidden_size=HIDDEN, num_layers=LAYERS,
                 bidirectional=BIDIR, dropout=DROPOUT, head_hidden=HEAD_H):
        super().__init__()
        self.bidirectional = bidirectional
        self.D = 2 if bidirectional else 1

        #Normalise features step-wise before the LSTM
        self.in_norm = nn.LayerNorm(input_size)

        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            bidirectional=bidirectional,
            dropout=(dropout if num_layers > 1 else 0.0),
            batch_first=True
        )

        #Additive attention over the SEQ_LEN dimension
        self.attn = nn.Linear(hidden_size * self.D, 1, bias=False)

        #Head now ends with Softplus to keep y_hat >= 0 during training
        self.head = nn.Sequential(
            nn.Linear(hidden_size * self.D, head_hidden),
            nn.ReLU(),
            nn.Dropout(0.10),
            nn.Linear(head_hidden, 1),
            nn.Softplus() 
        )

    def forward(self, x):  #x: (B, S, SEQ_LEN, F)
        B, S, L, F_ = x.shape
        x = x.reshape(B*S, L, F_)             #(B*S, SEQ_LEN, F)
        x = self.in_norm(x)                   
        out, _ = self.lstm(x)                 #(B*S, SEQ_LEN, H*D)

        scores = self.attn(out).squeeze(-1)   #(B*S, SEQ_LEN)
        weights = torch.softmax(scores, dim=1)
        context = (out * weights.unsqueeze(-1)).sum(dim=1)

        y_hat = self.head(context).squeeze(-1)  #(B*S,)
        return y_hat.view(B, S)


model = SharedTickerLSTM(input_size=F, hidden_size=HIDDEN, num_layers=LAYERS).to(device)
with torch.no_grad():
    #small Xavier on the first head layer is fine by default, set last layer close to mean
    last_linear = model.head[-2]   #the final Linear before Softplus
    last_linear.bias.fill_(float(np.nanmean(Y_MU)))
    last_linear.weight.mul_(0.01)

criterion = DMSE(over_penalty=ALPHA, reduction="mean").to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=1e-5)

scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode="min", factor=0.5, patience=2, min_lr=1e-5, verbose=True
)

num_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"[LSTM] Trainable parameters: {num_params:,}")

In [None]:
def eval_dmse(loader):
    model.eval()
    tot_loss, n = 0.0, 0
    with torch.no_grad():
        for Xb, yb, _ in loader:
            Xb = Xb.to(device, dtype=torch.float32)
            yb = yb.to(device, dtype=torch.float32) 
            yhat = model(Xb) 
            loss = criterion((yhat - Y_MU_T)/Y_SD_T, (yb - Y_MU_T)/Y_SD_T)
            bs = yb.numel()
            tot_loss += float(loss.item()) * bs
            n += bs
    return tot_loss / max(1, n)

best_val = np.inf
best_state = None
train_curve, val_curve = [], []
epochs_no_improve = 0

for epoch in range(1, EPOCHS+1):
    model.train()
    for Xb, yb, _ in train_loader:
        Xb = Xb.to(device, dtype=torch.float32)
        yb = yb.to(device, dtype=torch.float32)
        optimizer.zero_grad()
        yhat = model(Xb)
        loss = criterion((yhat - Y_MU_T)/Y_SD_T, (yb - Y_MU_T)/Y_SD_T)
        loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(), max_norm=5.0)
        optimizer.step()

    tr = eval_dmse(train_loader)
    va = eval_dmse(val_loader)
    train_curve.append(tr); val_curve.append(va)

    if va + 1e-10 < best_val:
        best_val = va
        best_state = {k: v.detach().cpu().clone() for k, v in model.state_dict().items()}
        epochs_no_improve = 0
    else:
        epochs_no_improve += 1

    print(f"[Epoch {epoch:02d}] train DMSE={tr:.5f} | val DMSE={va:.5f} | best={best_val:.5f}")
    scheduler.step(va)
    if epochs_no_improve >= PATIENCE:
        print(f"Early stopping (patience={PATIENCE})")
        break

if best_state is not None:
    model.load_state_dict(best_state)

In [None]:
model.eval()
test_records = []
with torch.no_grad():
    for Xb, yb, tb in test_loader:          #tb is (B,) tensor of int time indices
        Xb = Xb.to(device, dtype=torch.float32)
        yb = yb.to(device, dtype=torch.float32)
        yhat = model(Xb)
        y_true = yb.cpu().numpy()
        y_pred = np.maximum(yhat.cpu().numpy(), 0.0)
        tb = tb.cpu().numpy().astype(int)  
        for b in range(y_true.shape[0]):
            dt = dates_all[int(tb[b])]    
            for s, tkr in enumerate(tickers):
                test_records.append((dt, tkr, float(y_true[b, s]), float(y_pred[b, s])))

lstm_pred_df = (pd.DataFrame(test_records, columns=["Date","Ticker","True","Pred"])
                  .sort_values(["Ticker","Date"])
                  .reset_index(drop=True))

for tkr in tickers[10:20]:
    sub = lstm_pred_df.loc[lstm_pred_df["Ticker"] == tkr].sort_values("Date")
    if sub.empty:
        continue

    fig, ax = plt.subplots(figsize=(12, 4))
    ax.plot(sub["Date"], sub["True"], lw=2.0, label="True Target")
    ax.plot(sub["Date"], sub["Pred"], lw=2.0, label="Predicted Target")

    ax.set_title(f"{tkr}: Actual and Predicted Target, LSTM NN Trained on all Stocks",
                 fontsize=20, weight="bold")
    ax.set_ylabel("Target", fontsize=25)
    ax.set_xlabel("")
    ax.tick_params(axis="x", colors="black", labelsize=15)
    ax.tick_params(axis="y", colors="black", labelsize=18)
    ax.legend(loc="upper right", frameon=True, fontsize=20)
    ax.set_facecolor("white")
    ax.grid(True, which="major", linestyle="-", alpha=0.15)
    ax.spines["top"].set_visible(False)
    ax.spines["right"].set_visible(False)
    fig.tight_layout()
    plt.show()
    plt.close(fig)


#Per-ticker DMSE (test) and validation DMSE (computed by re-running eval on val set by ticker)
def per_ticker_dmse(df):
    out = []
    for tkr, g in df.groupby("Ticker"):
        out.append((tkr, dmse_loss(g["True"].to_numpy(), g["Pred"].to_numpy(), alpha=ALPHA)))
    return pd.DataFrame(out, columns=["Ticker","Test_DMSE"])

test_dmse_table = per_ticker_dmse(lstm_pred_df)

#Validation per ticker: pass back through val_loader and collect predictions similarly
val_records = []
with torch.no_grad():
    for Xb, yb, tb in val_loader:
        Xb = Xb.to(device, dtype=torch.float32)
        yb = yb.to(device, dtype=torch.float32)
        yhat = model(Xb)
        y_true = yb.cpu().numpy()
        y_pred = np.maximum(yhat.cpu().numpy(), 0.0)
        tb = tb.cpu().numpy().astype(int)
        for b in range(y_true.shape[0]):
            dt = dates_all[int(tb[b])]
            for s, tkr in enumerate(tickers):
                val_records.append((dt, tkr, float(y_true[b, s]), float(y_pred[b, s])))

lstm_val_df = (pd.DataFrame(val_records, columns=["Date","Ticker","True","Pred"])
                 .sort_values(["Ticker","Date"])
                 .reset_index(drop=True))

overall_val_dmse = dmse_loss(
    lstm_val_df["True"].to_numpy(),
    lstm_val_df["Pred"].to_numpy(),
    alpha=ALPHA)

overall_test_dmse = dmse_loss(
    lstm_pred_df["True"].to_numpy(),
    lstm_pred_df["Pred"].to_numpy(),
    alpha=ALPHA)

print(f"LSTM NN Overall Test DMSE: {overall_test_dmse:.5f}")
print(f"LSTM NN Overall Validation DMSE: {overall_val_dmse:.5f}")

val_dmse_table = per_ticker_dmse(lstm_val_df).rename(columns={"Test_DMSE":"Val_DMSE"})

#Merge and sort
lstm_results_table = (val_dmse_table.merge(test_dmse_table, on="Ticker", how="outer")
                      .sort_values("Test_DMSE")
                      .reset_index(drop=True))

display_cols = ["Ticker","Val_DMSE","Test_DMSE"]
display(lstm_results_table[display_cols])

In [None]:
class SingleTickerSequenceDataset(Dataset):
    """
    Sequences for one ticker s_idx.
    Returns:
      X: (1, SEQ_LEN, F)   y: (1,)   t: int time index (use dates_all[t] to map)
    """
    def __init__(self, X_panel, y_panel, dates, time_mask, s_idx):
        self.Xs = X_panel[:, s_idx, :]           #(T, F)
        self.ys = y_panel[:, s_idx]              #(T,)
        self.dates = dates
        obs_mask_one = ~np.isnan(self.ys)
        base = np.where(time_mask & obs_mask_one & seq_ok_mask)[0]
        self.t_idx = base.astype(np.int32)

    def __len__(self):
        return len(self.t_idx)

    def __getitem__(self, i):
        t = int(self.t_idx[i])
        X_seq = self.Xs[t-SEQ_LEN:t, :].astype(np.float32)   #(SEQ_LEN, F)
        X_seq = X_seq.reshape(1, SEQ_LEN, X_seq.shape[-1])   #add S=1 dim
        y_t   = np.array([self.ys[t]], dtype=np.float32)     #(1,)
        return torch.from_numpy(X_seq), torch.from_numpy(y_t), t

#Train a single model for each ticker
def train_single_ticker_model(s_idx, ticker):
    torch.manual_seed(0)
    train_ds_st = SingleTickerSequenceDataset(X_panel, y_panel, dates_all, train_mask_final, s_idx)
    val_ds_st   = SingleTickerSequenceDataset(X_panel, y_panel, dates_all, val_mask_final,   s_idx)
    test_ds_st  = SingleTickerSequenceDataset(X_panel, y_panel, dates_all, test_time_mask,  s_idx)

    train_loader_st = DataLoader(train_ds_st, batch_size=BATCH_T, shuffle=True,  drop_last=False)
    val_loader_st   = DataLoader(val_ds_st,   batch_size=BATCH_T, shuffle=False, drop_last=False)
    test_loader_st  = DataLoader(test_ds_st,  batch_size=BATCH_T, shuffle=False, drop_last=False)

    model_st = SharedTickerLSTM(input_size=F, hidden_size=HIDDEN, num_layers=LAYERS).to(device)
    with torch.no_grad():
        last_linear = model_st.head[-2]
        init_bias = Y_MU_T[s_idx] if getattr(Y_MU_T, "ndim", 0) > 0 else Y_MU_T
        last_linear.bias.data.fill_(init_bias.item())
        last_linear.weight.data.mul_(0.01)

    criterion_st = DMSE(over_penalty=ALPHA, reduction="mean").to(device)
    optimizer_st = torch.optim.AdamW(model_st.parameters(), lr=LR, weight_decay=1e-3)
    scheduler_st = torch.optim.lr_scheduler.ReduceLROnPlateau(
        optimizer_st, mode="min", factor=0.5, patience=2, min_lr=1e-5, verbose=False
    )

    def _eval(loader):
        model_st.eval()
        tot, n = 0.0, 0
        with torch.no_grad():
            for Xb, yb, _ in loader:
                Xb = Xb.to(device, dtype=torch.float32)   #(B,1,L,F)
                yb = yb.to(device, dtype=torch.float32)   #(B,1)
                yhat = model_st(Xb)                       #(B,1)
                loss = criterion_st((yhat - Y_MU_T)/Y_SD_T, (yb - Y_MU_T)/Y_SD_T)
                bs = yb.numel()
                tot += float(loss.item()) * bs
                n += bs
        return tot / max(1, n)

    best_val, best_state, no_improve = np.inf, None, 0
    for epoch in range(1, EPOCHS+1):
        model_st.train()
        for Xb, yb, _ in train_loader_st:
            Xb = Xb.to(device, dtype=torch.float32)
            yb = yb.to(device, dtype=torch.float32)
            optimizer_st.zero_grad()
            yhat = model_st(Xb)
            loss = criterion_st((yhat - Y_MU_T)/Y_SD_T, (yb - Y_MU_T)/Y_SD_T)
            loss.backward()
            nn.utils.clip_grad_norm_(model_st.parameters(), max_norm=5.0)
            optimizer_st.step()

        va = _eval(val_loader_st)
        scheduler_st.step(va)
        if va + 1e-10 < best_val:
            best_val = va
            best_state = {k: v.detach().cpu().clone() for k, v in model_st.state_dict().items()}
            no_improve = 0
        else:
            no_improve += 1
        if no_improve >= PATIENCE:
            break

    if best_state is not None:
        model_st.load_state_dict(best_state)

    #Test predictions
    preds, trues, dts = [], [], []
    model_st.eval()
    with torch.no_grad():
        for Xb, yb, tb in test_loader_st:
            Xb = Xb.to(device, dtype=torch.float32)
            yb = yb.to(device, dtype=torch.float32)
            yhat = model_st(Xb).clamp_min_(0.0) 
            y_true = yb.cpu().numpy()
            y_pred = yhat.cpu().numpy()
            tb = tb.cpu().numpy().astype(int)
            for b in range(y_true.shape[0]):
                dts.append(dates_all[int(tb[b])])
                trues.append(float(y_true[b, 0]))
                preds.append(float(y_pred[b, 0]))

    pred_df = (pd.DataFrame({"Date": dts, "Ticker": ticker, "True": trues, "Pred": preds})
                 .sort_values("Date").reset_index(drop=True))
    test_dmse = dmse_loss(pred_df["True"].to_numpy(), pred_df["Pred"].to_numpy(), alpha=ALPHA)
    return {"val_dmse": float(best_val), "test_dmse": float(test_dmse)}, pred_df

In [None]:
#Here we produce the data in Table 6

single_results = []
single_preds_by_ticker = {}

for s, tkr in enumerate(tickers):
    stats, pred_df = train_single_ticker_model(s, tkr)
    single_results.append({"Ticker": tkr, "Val_DMSE": stats["val_dmse"], "Test_DMSE": stats["test_dmse"]})
    single_preds_by_ticker[tkr] = pred_df

lstm_single_results_table = pd.DataFrame(single_results).sort_values("Test_DMSE").reset_index(drop=True)
display(lstm_single_results_table)

In [None]:
#This creates the plots in Figure 18

for tkr in tickers[10:20]:
    g = single_preds_by_ticker.get(tkr)
    if g is None or g.empty:
        continue
    fig, ax = plt.subplots(figsize=(12, 4))
    ax.plot(g["Date"], g["True"], lw=2.0, label="True Target")
    ax.plot(g["Date"], g["Pred"], lw=2.0, label="Predicted Target")
    ax.set_title(f"{tkr}: Actual and Predicted Target, Per-Ticker LSTM NN",
                 fontsize=20, weight="bold")
    ax.set_ylabel("Target", fontsize=25)
    ax.set_xlabel("")
    ax.tick_params(axis="x", colors="black", labelsize=15)
    ax.tick_params(axis="y", colors="black", labelsize=18)
    ax.legend(loc="upper right", frameon=True, fontsize=20)
    ax.set_facecolor("white"); ax.grid(True, which="major", linestyle="-", alpha=0.15)
    ax.spines["top"].set_visible(False); ax.spines["right"].set_visible(False)
    fig.tight_layout()
    plt.show()
    plt.close(fig)

# Backtesting a Trading Strategy

### Creating the Dataset for Backtesting

In [None]:
#We create the dataset that will be used for backtesting. It contains data for the years 2022, 2023, 2024

#We take data from "2021-11-01" to avoid the first days of 2022 being null. We will later drop 2021 data
spy_return = (
    yf.Ticker("SPY")
      .history(start="2021-11-01", end=end_date_backtest, auto_adjust=False)["Close"]
      .pct_change()
)

spy_return.index = spy_return.index.tz_localize(None)

bt_frames = []
for tkr in tickers:
    try:
        tmp = (
            yf.Ticker(tkr)
              .history(start="2021-11-01", end=end_date_backtest, auto_adjust=False)
              .reset_index()
              .assign(Ticker=tkr, Sector=sector_of.get(tkr, "Unknown"))
              .loc[:, ["Date", "Ticker", "Sector", "Open", "High", "Low", "Close", "Volume"]]
        )
        bt_frames.append(tmp)
    except Exception as e:
        print(f"Skipping {tkr} (price fetch failed): {e}")

df_backtest = pd.concat(bt_frames, ignore_index=True)

#Normalise date and sector code
df_backtest["Date"] = pd.to_datetime(df_backtest["Date"]).dt.date
df_backtest["Sector"] = df_backtest["Sector"].map(sector_dict).fillna(0).astype(int)

#Compute features and lagged targets only (not the target variable)
df_backtest = (
    df_backtest
      .groupby("Ticker", group_keys=False, sort=False).apply(add_features)
      .groupby("Ticker", group_keys=False, sort=False).apply(add_targets, target_horizons=horizons, future=False)
)

_cut_date = pd.to_datetime(end_date_analysis).date()
df_backtest = df_backtest[df_backtest["Date"] >= _cut_date].reset_index(drop=True)

### Estimating the Straddle Premia

In [None]:
df_backtest["Date"] = pd.to_datetime(df_backtest["Date"]).dt.date

#Build "risk_free_df" from ^IRX (13-week T-bill)
start_dt = pd.to_datetime(df_backtest["Date"]).min() - pd.Timedelta(days=10)
end_dt   = pd.to_datetime(df_backtest["Date"]).max() + pd.Timedelta(days=10)

irx = yf.download("^IRX", start=start_dt, end=end_dt, progress=False, auto_adjust=False)

if isinstance(irx.columns, pd.MultiIndex):
    irx.columns = irx.columns.get_level_values(0)

#Take the close price, reset the index, and standardise the column names
risk_free_df = irx[["Close"]].copy().reset_index()
risk_free_df.rename(
    columns={risk_free_df.columns[0]: "Date", "Close": "irx_discount_pct"},
    inplace=True
)
risk_free_df["Date"] = pd.to_datetime(risk_free_df["Date"]).dt.date

#Convert bank-discount yield (percent) to bond-equivalent annual simple yield
d = (risk_free_df["irx_discount_pct"] / 100.0).astype(float)     
price = 1.0 - d * (91.0 / 360.0) 
risk_free_df["rf_annual_simple"] = ((1.0 / price - 1.0) * (365.0 / 91.0)).astype(float)

#Keep one row per calendar date
risk_free_df = (
    risk_free_df[["Date", "rf_annual_simple"]]
    .dropna(subset=["rf_annual_simple"])
    .drop_duplicates(subset=["Date"], keep="last")
)

#Merge onto backtest frame
df_backtest = df_backtest.merge(risk_free_df, on="Date", how="left", suffixes=("", "_rf"))

#Normalise to a single column name 'rf_annual_simple'
if "rf_annual_simple_rf" in df_backtest.columns and "rf_annual_simple" in df_backtest.columns:
    df_backtest["rf_annual_simple"] = df_backtest["rf_annual_simple"].fillna(df_backtest["rf_annual_simple_rf"])
    df_backtest.drop(columns=["rf_annual_simple_rf"], inplace=True)
elif "rf_annual_simple_rf" in df_backtest.columns and "rf_annual_simple" not in df_backtest.columns:
    df_backtest.rename(columns={"rf_annual_simple_rf": "rf_annual_simple"}, inplace=True)

#Forward/back-fill gaps. If still missing at the very start, default to 0
df_backtest["rf_annual_simple"] = (
    df_backtest["rf_annual_simple"].astype(float)
    .fillna(method="ffill")
    .fillna(method="bfill")
    .fillna(0.0)
)

#Continuous-compounded risk-free rate for Black–Scholes
df_backtest["rf_cc"] = np.log1p(df_backtest["rf_annual_simple"])

#Annualised volatility
df_backtest["sigma_annual"] = (df_backtest["Volatility"].astype(float) * np.sqrt(252)).clip(lower=1e-6)

#Black–Scholes ATM (K = S) 10-day straddle
T_years = 10.0 / 365.0

S = df_backtest["Close"].astype(float).to_numpy()
r = df_backtest["rf_cc"].astype(float).to_numpy()
sigma = df_backtest["sigma_annual"].astype(float).to_numpy()

sig_sqrtT = np.maximum(sigma * np.sqrt(T_years), 1e-12)  #numerical guard
d1 = ((r + 0.5 * sigma**2) * T_years) / sig_sqrtT
d2 = d1 - sig_sqrtT

Nd1 = ndtr(d1)
Nd2 = ndtr(d2)
disc = np.exp(-r * T_years)

calls = S * Nd1 - S * disc * Nd2
puts  = S * disc * (1.0 - Nd2) - S * (1.0 - Nd1)

df_backtest["Straddle_premium"] = calls + puts

#Clean up helper columns
df_backtest.drop(columns=["rf_annual_simple", "rf_cc", "sigma_annual"], inplace=True, errors="ignore")

### Forecasting "Target"

##### Preliminary Steps

In [None]:
#One-hot encode the "Sector" variable
sector_dummies_backtest = pd.get_dummies(df_backtest["Sector"], prefix="Sector", dtype=int, drop_first=True)
df_backtest = pd.concat([df_backtest, sector_dummies_backtest], axis=1)

In [None]:
#Per-ticker mean and std for every predictor, computed on df2
scaler_dict_backtest = {}
for ticker, g in df2.groupby("Ticker"):
    mu = g[continuous_predictors].mean().astype("float32")
    std = g[continuous_predictors].std(ddof=0).replace(0, 1e-12).astype("float32")  #avoid /0
    scaler_dict_backtest[ticker] = {"mean": mu, "std": std}

#helper that standardises using the stats stored in scaler_dict_backtest
def standardise_df2(df_part: pd.DataFrame, variables: list[str]) -> pd.DataFrame:
    """
    Return a copy of df_part where the columns in *variables* are
    standardised (per-ticker) using the mean / std computed on df2.
    All other columns are left unchanged.
    """
    pieces = []
    for ticker, g in df_part.groupby("Ticker", sort=False):
        m = scaler_dict_backtest[ticker]["mean"]
        s = scaler_dict_backtest[ticker]["std"]
        g_std = g.copy()
        g_std[variables] = ((g_std[variables] - m) / s).astype("float32")
        pieces.append(g_std)

    #Re-assemble in original row order
    df_out = pd.concat(pieces).loc[df_part.index]
    return df_out

#Standardise df2 and df_backtest using df2 stats
df2_stand = standardise_df2(df2, continuous_predictors)
df_backtest_stand = standardise_df2(df_backtest, continuous_predictors)

X_df2, X_df2_stand, y_df2 = df2[predictors], df2_stand[predictors], df2["Target"]
X_backtest, X_backtest_stand = df_backtest[predictors], df_backtest_stand[predictors]

#Cast to NumPy float32 (and 2D targets)
X_df2_np = X_df2.values.astype("float32")
X_df2_stand_np = X_df2_stand.values.astype("float32")

X_backtest_np = X_backtest.values.astype("float32")
X_backtest_stand_np = X_backtest_stand.values.astype("float32")

y_df2_np = y_df2.values.astype("float32").reshape(-1, 1)

##### Linear Regression

In [None]:
X_df2_lr = df2_stand[predictors_lr].values.astype("float32")
X_backtest_lr = df_backtest_stand[predictors_lr].values.astype("float32")

y_df2_col = y_df2_np if getattr(y_df2_np, "ndim", 1) == 2 else y_df2_np.reshape(-1, 1)

backtest_lr_model = train_model(
    "linear regression",
    X_df2_lr,
    y_df2_col,
    epochs=15,
    learn_rate=5e-2,
    batch_size=len(X_df2_lr)
)

with torch.no_grad():
    device = next(backtest_lr_model.parameters()).device
    X_bt_t = torch.as_tensor(X_backtest_lr, dtype=torch.float32, device=device)
    preds_bt = backtest_lr_model(X_bt_t)
    preds_bt = torch.clamp(preds_bt, min=0.0)   
    preds_bt_np = preds_bt.squeeze(-1).detach().cpu().numpy()

df_backtest.loc[:, "Predictions_lr"] = preds_bt_np.astype("float32")

##### Feedforward Neural Network

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

try:
    ticker_ids_df2 = df2_stand["Ticker"].to_numpy()
except Exception:
    try:
        ticker_ids_df2 = df2["Ticker"].to_numpy()
    except Exception:
        try:
            ticker_ids_df2 = df_train["Ticker"].to_numpy()
        except Exception:
            ticker_ids_df2 = None  

configs = [
    ("SpectralNorm+Dropout", dict(dropout=0.20, batchnorm=False, spectral_norm=True,
                                  input_noise_std=0.0, lr=1e-3, weight_decay=5e-4,
                                  l1=0.0, l2=0.0, hidden_sizes=(512,256,128,64), patience=15, epochs=80)),
    ("ElasticNet+Dropout",   dict(dropout=0.30, batchnorm=False, spectral_norm=False,
                                  input_noise_std=0.0, lr=1e-3, weight_decay=0.0,
                                  l1=1e-6, l2=1e-5, hidden_sizes=(512,256,128,64), patience=15, epochs=80)),
    ("L2+Dropout+BatchNorm", dict(dropout=0.25, batchnorm=True,  spectral_norm=False,
                                  input_noise_std=0.0, lr=1e-3, weight_decay=1e-3,
                                  l1=0.0, l2=0.0, hidden_sizes=(512,256,128,64), patience=15, epochs=80)),
    ("InputNoise+L2",        dict(dropout=0.00, batchnorm=False, spectral_norm=False,
                                  input_noise_std=0.05, lr=1e-3, weight_decay=1e-3,
                                  l1=0.0, l2=0.0, hidden_sizes=(512,256,128,64), patience=15, epochs=80)),
]

#Select the best config by validation DMSE, then retrain on full data
best_name, best_val = None, float("inf")
best_cfg = None
for name, cfg in configs:
    model, _hist, val_dmse, _diags = train_one_model(
        X_df2_stand_np, y_df2_np,
        ticker_ids_train=ticker_ids_df2,
        over_penalty=loss_penalty,
        **cfg
    )
    if val_dmse < best_val:
        best_val = val_dmse
        best_name = name
        best_cfg = cfg

#Retrain best config on the full df2 window (no hold-out)
best_backtest_nn_model, _hist_full, _score_out, _diags_full = train_one_model(
    X_df2_stand_np, y_df2_np,
    ticker_ids_train=ticker_ids_df2,
    over_penalty=loss_penalty,
    val_frac=0.0,  #no validation hold-out
    **best_cfg
)

#Predict on the standardised backtest matrix
with torch.no_grad():
    xb = torch.as_tensor(X_backtest_stand_np, dtype=torch.float32, device=device)
    preds_bt = best_backtest_nn_model(xb)
    preds_bt = torch.clamp(preds_bt, min=0.0)
    preds_bt_np = preds_bt.squeeze(-1).detach().cpu().numpy().astype("float32")

#Attach predictions to df_backtest, keyed by Date+Ticker
_pred_frame = df_backtest.loc[:, ["Date", "Ticker"]].copy()
_pred_frame["Predictions_nn"] = preds_bt_np

df_backtest = (
    df_backtest.drop(columns=["Predictions_nn"], errors="ignore")
               .merge(_pred_frame, on=["Date", "Ticker"], how="left")
)

##### Random Forest

In [None]:
#Train the global random forest (non-standardised features)
rf_backtest_model = RandomForestRegressor(
    n_estimators=100, random_state=1, n_jobs=-1
)
rf_backtest_model.fit(X_df2_np, y_df2_np)

#Predict on the backtest design matrix
rf_backtest_pred = rf_backtest_model.predict(X_backtest_np).astype(np.float32)
rf_backtest_pred = np.asarray(rf_backtest_pred).reshape(-1)  #ensure 1D for column assignment

#Attach predictions to df_backtest, keyed by Date+Ticker
_pred_frame = df_backtest.loc[:, ["Date", "Ticker"]].copy()
_pred_frame["Predictions_rf"] = rf_backtest_pred.astype("float32")

df_backtest = (
    df_backtest.drop(columns=["Predictions_rf"], errors="ignore")
               .merge(_pred_frame, on=["Date", "Ticker"], how="left")
)

##### ARIMA/ARIMAX

In [None]:
#Build per-row-aligned train and backtest frames from the arrays
df_tr_arima = df2_stand.loc[:, ["Date", "Ticker"]].copy()
df_tr_arima = pd.concat(
    [df_tr_arima, pd.DataFrame(X_df2_stand_np, columns=predictors, index=df_tr_arima.index)],
    axis=1
)
df_tr_arima["Target"] = np.asarray(y_df2_np, dtype=float).ravel()

df_bt_arima = df_backtest_stand.loc[:, ["Date", "Ticker"]].copy()
df_bt_arima = pd.concat(
    [df_bt_arima, pd.DataFrame(X_backtest_stand_np, columns=predictors, index=df_bt_arima.index)],
    axis=1
)

#Per-ticker ARIMA/ARIMAX using the 4-step selection on reduced-frequency subseries,
#but forecasting on the backtest window (no access to future y).
def _predict_one_ticker_forecast(df_tr_tkr, df_bt_tkr, predictors, H=10, alpha=9.0, val_frac=0.10):
    df_tr_tkr = df_tr_tkr.copy()
    df_bt_tkr = df_bt_tkr.copy()
    df_tr_tkr["Date"] = pd.to_datetime(df_tr_tkr["Date"])
    df_bt_tkr["Date"] = pd.to_datetime(df_bt_tkr["Date"])

    if df_bt_tkr.empty:
        return np.array([], dtype=float), pd.Series([], dtype="datetime64[ns]")

    #Raw time index across train+backtest
    df_all = (pd.concat([df_tr_tkr, df_bt_tkr], axis=0, ignore_index=True)
                .sort_values("Date").reset_index(drop=True))
    df_all["t_raw"] = np.arange(len(df_all), dtype=int)

    df_tr = df_all.iloc[:len(df_tr_tkr)].copy()
    df_bt = df_all.iloc[len(df_tr_tkr):].copy()

    y_tr_all = df_tr["Target"].to_numpy(dtype=float)
    X_tr_all = df_tr[predictors].to_numpy(dtype=float) if predictors else None
    X_bt_all = df_bt[predictors].to_numpy(dtype=float) if predictors else None
    t_tr = df_tr["t_raw"].to_numpy()
    t_bt = df_bt["t_raw"].to_numpy()

    yhat_bt = np.full(shape=(len(df_bt),), fill_value=np.nan, dtype=float)

    for r in range(H):
        #Training subseries for remainder class r
        idx_tr_r = np.flatnonzero(t_tr % H == r)
        if idx_tr_r.size == 0:
            continue
        y_tr_r = y_tr_all[idx_tr_r]
        X_tr_r = None if X_tr_all is None else X_tr_all[idx_tr_r, :]

        #Initial order via BIC on full training subseries
        p0, d0, q0 = _initial_order_bic(y_tr_r, None, alpha=alpha)

        #Local grid
        candidates = _local_grid_around(p0, d0, q0)

        #Internal validation on the subseries (last 10%)
        n = len(y_tr_r)
        n_va = max(1, int(np.floor(val_frac * n)))
        n_tr_sub = max(1, n - n_va)
        y_tr_sub, y_va = y_tr_r[:n_tr_sub], y_tr_r[n_tr_sub:]
        X_tr_sub = None if X_tr_r is None else X_tr_r[:n_tr_sub, :]
        X_va     = None if X_tr_r is None else X_tr_r[n_tr_sub:, :]

        best_tuple = None  #(mse, order, use_exog)
        for od in candidates:
            #ARIMA
            yhat_a, _ = _fit_and_forecast(y_tr_sub, None, y_va, None, od, use_exog=False)
            if yhat_a is not None:
                mse_a = mse_np(y_va, yhat_a)
                if (best_tuple is None) or (mse_a < best_tuple[0] - 1e-10) or \
                   (abs(mse_a - best_tuple[0]) <= 1e-10 and best_tuple[2] is True):
                    best_tuple = (mse_a, od, False)
            #ARIMAX
            if X_tr_sub is not None:
                yhat_x, _ = _fit_and_forecast(y_tr_sub, X_tr_sub, y_va, X_va, od, use_exog=True)
                if yhat_x is not None:
                    mse_x = mse_np(y_va, yhat_x)
                    if (best_tuple is None) or (mse_x < best_tuple[0] - 1e-10) or \
                       (abs(mse_x - best_tuple[0]) <= 1e-10 and best_tuple[2] is False):
                        best_tuple = (mse_x, od, True)

        if best_tuple is None:
            #Fallback to (0,0,0) ARIMA on subseries mean behaviour
            best_tuple = (np.inf, (0, 0, 0), False)

        _, best_order, best_use_exog = best_tuple

        #Final refit on full training subseries and out-of-sample forecast for this remainder
        idx_bt_r = np.flatnonzero(t_bt % H == r)
        if idx_bt_r.size == 0:
            continue

        X_bt_r = None if X_bt_all is None else X_bt_all[idx_bt_r, :]

        try:
            res_full = _fit_sarimax(y_tr_r, (X_tr_r if best_use_exog else None), best_order)
            if best_use_exog:
                fc = res_full.get_forecast(steps=len(idx_bt_r), exog=X_bt_r)
            else:
                fc = res_full.get_forecast(steps=len(idx_bt_r))
            yhat_r = np.asarray(fc.predicted_mean, dtype=float)
        except Exception:
            #Robust fallback: use the mean of the training subseries
            yhat_r = np.full(shape=len(idx_bt_r), fill_value=float(np.mean(y_tr_r)), dtype=float)

        yhat_bt[idx_bt_r] = yhat_r

    #If any NaNs remain (e.g., empty subseries), fill with the overall training mean
    if np.isnan(yhat_bt).any():
        fill_val = float(np.nanmean(yhat_bt)) if np.isfinite(np.nanmean(yhat_bt)) else float(np.nanmean(y_tr_all))
        yhat_bt = np.where(np.isnan(yhat_bt), fill_val, yhat_bt)

    return yhat_bt, df_bt["Date"].reset_index(drop=True)

#Run per-ticker and collect predictions
tickers_bt = sorted(set(df_tr_arima["Ticker"]).intersection(set(df_bt_arima["Ticker"])))
pred_rows = []
for tkr in tickers_bt:
    tr_tkr = df_tr_arima.loc[df_tr_arima["Ticker"] == tkr].sort_values("Date").reset_index(drop=True)
    bt_tkr = df_bt_arima.loc[df_bt_arima["Ticker"] == tkr].sort_values("Date").reset_index(drop=True)
    if tr_tkr.empty or bt_tkr.empty:
        continue
    yhat_bt, bt_dates = _predict_one_ticker_forecast(tr_tkr, bt_tkr, predictors, H=H, alpha=9.0, val_frac=0.10)
    pred_rows.append(pd.DataFrame({
        "Ticker": tkr,
        "Date":   bt_dates.to_numpy(),
        "Predictions_arima": yhat_bt.astype(np.float32)
    }))

#Merge predictions into df_backtest by (Date, Ticker)
if pred_rows:
    preds_df_arima = pd.concat(pred_rows, axis=0, ignore_index=True)
    
    if pd.api.types.is_datetime64_any_dtype(df_backtest["Date"]):
        preds_df_arima["Date"] = pd.to_datetime(preds_df_arima["Date"])
    else:
        preds_df_arima["Date"] = pd.to_datetime(preds_df_arima["Date"]).dt.date

    df_backtest = (
        df_backtest.drop(columns=["Predictions_arima"], errors="ignore")
                   .merge(preds_df_arima, on=["Date", "Ticker"], how="left")
    )
else:
    df_backtest["Predictions_arima"] = np.nan

##### Long Short-Term Memory Neural Network

In [None]:
device    = torch.device("cuda" if torch.cuda.is_available() else "cpu")
SEQ_LEN   = 20
HIDDEN    = 64
LAYERS    = 2
BIDIR     = True
DROPOUT   = 0.20
HEAD_H    = 128
BATCH_T   = 64
LR        = 3e-3
EPOCHS    = 40
PATIENCE  = 10
ALPHA     = loss_penalty

F = len(predictors)

#Sequence builders
def _sorted_group_indices(df):
    dft = df.loc[:, ["Date", "Ticker"]].copy()
    dft["Date"] = pd.to_datetime(dft["Date"])
    pos = pd.Series(np.arange(len(dft), dtype=int), index=dft.index)  
    idxs = {}
    for tkr, g in dft.groupby("Ticker", sort=False, as_index=False):
        g_sorted = g.sort_values("Date")
        idxs[tkr] = pos.loc[g_sorted.index].to_numpy()  
    return idxs

def make_sequences_from_panel(df_ref, X2d, y2d_or_none, seq_len):
    """
    Build rolling (seq_len)-step sequences per ticker from flat 2D arrays aligned to df_ref rows.
    Returns:
      X_seq : (N_samples, seq_len, F)
      y_seq : (N_samples, 1) or None
      meta  : DataFrame with columns ["Date","Ticker"] for the sequence *end* row
    """
    tkr_to_idxs = _sorted_group_indices(df_ref)
    X_seq_list, y_seq_list, meta_rows = [], [], []

    for tkr, idx in tkr_to_idxs.items():
        if idx.size <= seq_len:
            continue
        #rolling windows that *end* at position j (seq covers rows j-seq_len ... j-1)
        for j in range(seq_len, idx.size):
            win = idx[j-seq_len:j]
            end_row = idx[j]
            X_seq_list.append(X2d[win, :])
            if y2d_or_none is not None:
                y_seq_list.append(y2d_or_none[end_row])
            meta_rows.append((df_ref.iloc[end_row]["Date"], df_ref.iloc[end_row]["Ticker"]))

    X_seq = np.stack(X_seq_list, axis=0).astype("float32") if X_seq_list else np.empty((0, seq_len, F), dtype="float32")
    y_seq = (np.vstack(y_seq_list).astype("float32") if y_seq_list else None)
    meta = pd.DataFrame(meta_rows, columns=["Date","Ticker"])
    return X_seq, y_seq, meta

def make_backtest_sequences_with_history(df_hist, X_hist, df_bt, X_bt, seq_len):
    """
    For each ticker, take the last `seq_len` rows from df_hist, then all rows from df_bt.
    Build one sequence per backtest row whose window ends at that backtest date.
    Returns X_seq (N_bt, seq_len, F) and meta (Date, Ticker) for the backtest rows.
    """
    dfh = df_hist.loc[:, ["Date","Ticker"]].copy(); dfh["Date"] = pd.to_datetime(dfh["Date"])
    dfb = df_bt.loc[:, ["Date","Ticker"]].copy();  dfb["Date"] = pd.to_datetime(dfb["Date"])

    pos_h = pd.Series(np.arange(len(dfh), dtype=int), index=dfh.index)   #label -> position
    pos_b = pd.Series(np.arange(len(dfb), dtype=int), index=dfb.index)

    X_seq_list, meta_rows = [], []
    F = X_hist.shape[1]

    for tkr, g_b in dfb.groupby("Ticker", sort=False):
        g_b = g_b.sort_values("Date")
        g_h = dfh[dfh["Ticker"] == tkr].sort_values("Date")
        if g_b.empty or g_h.empty:
            continue

        h_pos = pos_h.loc[g_h.index].to_numpy()
        b_pos = pos_b.loc[g_b.index].to_numpy()

        #last seq_len rows from history (pad by repeating earliest if needed)
        if h_pos.size < seq_len:
            pad = seq_len - h_pos.size
            h_tail = np.pad(h_pos, (pad, 0), mode="edge")
        else:
            h_tail = h_pos[-seq_len:]

        #extended feature block: [history tail | entire backtest]
        X_ext = np.vstack([X_hist[h_tail, :], X_bt[b_pos, :]])
        T_hist = seq_len  #length of the history tail inside X_ext

        #one sequence per backtest date; window ends at each backtest row
        for k in range(len(b_pos)):
            end = T_hist + k         #exclusive
            start = end - seq_len
            X_seq_list.append(X_ext[start:end, :])
            meta_rows.append((g_b["Date"].iloc[k], tkr))

    X_seq = np.stack(X_seq_list, axis=0).astype("float32") if X_seq_list else np.empty((0, seq_len, F), dtype="float32")
    meta = pd.DataFrame(meta_rows, columns=["Date","Ticker"])
    return X_seq, meta


#Build train sequences from df2 (uses y) and backtest sequences from df_backtest (no y)
X_train_seq, y_train_seq, meta_train = make_sequences_from_panel(df2, X_df2_np, y_df2_np, SEQ_LEN)
X_bt_seq, meta_bt = make_backtest_sequences_with_history(df2, X_df2_np, df_backtest, X_backtest_np, SEQ_LEN)

#Chronological split (per ticker)
def chrono_val_split_indices(meta_df, val_frac=0.10):
    """
    Given meta_df with columns ["Date","Ticker"] (one row per sequence end),
    return (train_idx, val_idx) indices implementing a per-ticker chronological split
    where the last val_frac sequences for each ticker go to validation.
    """
    meta = meta_df.copy()
    meta["Date"] = pd.to_datetime(meta["Date"])
    tr_idx, va_idx = [], []
    for tkr, g in meta.groupby("Ticker", sort=False):
        g_sorted = g.sort_values("Date")
        n = len(g_sorted)
        k = max(1, int(np.floor(n * (1.0 - val_frac))))
        idx_sorted = g_sorted.index.to_numpy()
        tr_idx.append(idx_sorted[:k])
        va_idx.append(idx_sorted[k:])
    tr_idx = np.concatenate(tr_idx) if tr_idx else np.array([], dtype=int)
    va_idx = np.concatenate(va_idx) if va_idx else np.array([], dtype=int)
    return tr_idx, va_idx

tr_idx, va_idx = chrono_val_split_indices(meta_train, val_frac=0.10)

#Tensor datasets/loaders
Xtr_np, ytr_np = X_train_seq[tr_idx], y_train_seq[tr_idx]
Xva_np, yva_np = X_train_seq[va_idx], y_train_seq[va_idx]

train_dl = DataLoader(TensorDataset(torch.from_numpy(Xtr_np), torch.from_numpy(ytr_np)),
                      batch_size=BATCH_T, shuffle=True, drop_last=False)
val_dl   = DataLoader(TensorDataset(torch.from_numpy(Xva_np), torch.from_numpy(yva_np)),
                      batch_size=BATCH_T, shuffle=False, drop_last=False)

#Model
class LSTMRegressor(nn.Module):
    """
    BiLSTM over the SEQ_LEN window with a non-negative Softplus head.
    Input:  (B, SEQ_LEN, F)
    Output: (B,)
    """
    def __init__(self, input_size, hidden_size=HIDDEN, num_layers=LAYERS,
                 bidirectional=BIDIR, dropout=DROPOUT, head_hidden=HEAD_H):
        super().__init__()
        self.D = 2 if bidirectional else 1
        self.in_norm = nn.LayerNorm(input_size)
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            bidirectional=bidirectional,
            dropout=(dropout if num_layers > 1 else 0.0),
            batch_first=True
        )
        self.head = nn.Sequential(
            nn.Linear(hidden_size * self.D, head_hidden),
            nn.ReLU(),
            nn.Dropout(0.10),
            nn.Linear(head_hidden, 1),
            nn.Softplus()     #keep predictions >= 0
        )

    def forward(self, x):               #x: (B, SEQ_LEN, F)
        x = self.in_norm(x)
        out, _ = self.lstm(x)           #(B, SEQ_LEN, H*D)
        h_last = out[:, -1, :]          #last timestep summary
        return self.head(h_last).squeeze(-1)

model = LSTMRegressor(input_size=F).to(device)
criterion = DMSE(over_penalty=ALPHA).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=1e-5)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode="min", factor=0.5, patience=2, min_lr=1e-5, verbose=False
)

def eval_dmse(loader):
    model.eval()
    tot_loss, n = 0.0, 0
    with torch.no_grad():
        for xb, yb in loader:
            xb = xb.to(device)
            yb = yb.to(device)
            yhat = model(xb).unsqueeze(-1)
            loss = criterion(y_hat=yhat, y=yb)
            tot_loss += float(loss.item()) * yb.size(0)
            n += yb.size(0)
    return tot_loss / max(1, n)

#Train
best_val = float("inf")
best_state = None
no_improve = 0

for epoch in range(1, EPOCHS + 1):
    model.train()
    for xb, yb in train_dl:
        xb = xb.to(device)
        yb = yb.to(device)
        optimizer.zero_grad(set_to_none=True)
        yhat = model(xb).unsqueeze(-1)
        loss = criterion(y_hat=yhat, y=yb)
        loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(), max_norm=5.0)
        optimizer.step()

    #Validate
    val_loss = eval_dmse(val_dl) if len(Xva_np) > 0 else eval_dmse(train_dl)
    scheduler.step(val_loss)
    if val_loss + 1e-10 < best_val:
        best_val = val_loss
        best_state = {k: v.detach().cpu().clone() for k, v in model.state_dict().items()}
        no_improve = 0
    else:
        no_improve += 1
        if no_improve >= PATIENCE:
            break

if best_state is not None:
    model.load_state_dict(best_state)

#Predict on backtest sequences
model.eval()
with torch.no_grad():
    xb = torch.from_numpy(X_bt_seq).to(device)
    preds_seq = model(xb).clamp_min(0.0)           
    preds_seq_np = preds_seq.detach().cpu().numpy().astype("float32")

#Attach to df_backtest
preds_df = meta_bt.copy()
if pd.api.types.is_datetime64_any_dtype(df_backtest["Date"]):
    preds_df["Date"] = pd.to_datetime(preds_df["Date"])
else:
    preds_df["Date"] = pd.to_datetime(preds_df["Date"]).dt.date

preds_df["Predictions_lstmnn"] = preds_seq_np

df_backtest = (
    df_backtest.drop(columns=["Predictions_lstmnn"], errors="ignore")
               .merge(preds_df, on=["Date","Ticker"], how="left")
)

### Trading Strategy Simulation

##### Definition

In [None]:
#Using the predictions obtained above, we compute the associated implied expected profits from 10-day ATM straddles
models_names = ["lr", "nn", "rf", "arima", "lstmnn"]
for model in models_names:
    df_backtest[f"EP_{model}"] = df_backtest[f"Predictions_{model}"] * df_backtest["Close"] - df_backtest["Straddle_premium"]

#We eliminate the columns we do not need any more
df_backtest.drop(columns = predictors + ["Sector", "Volume", "Slope"] + [f"Predictions_{model}" for model in models_names],
                 axis=1, inplace=True, errors="ignore")

In [None]:
@dataclass
class Position:
    ticker: str
    entry_date: pd.Timestamp
    expiry_deadline: pd.Timestamp   
    strike: float                 
    premium_per_share: float     
    ep_per_share: float        
    tp_intrinsic_per_share: float 
    n_contracts: int    
    gross_cost: float    
    total_cost_out: float       
    open: bool = True


def backtest_strategy(
    input_df: pd.DataFrame,
    *,
    model: str,                                #"lr", "nn", "rf", "arima", or "lstmnn"
    capital: float,
    capital_fraction: Optional[np.ndarray] = None,
    commissions: float = 0.01,      
    threshold: float = 0.10,  
    do_plot: bool = False,       
    plot_number: int = 1,
    seed: int = 0
) -> Tuple[pd.DataFrame, pd.DataFrame, Dict[str, float]]:
    """
    Backtests an ATM straddle strategy with exercise-only exits.

    Key rules implemented:
    - Per-ticker capital buckets (fixed at start via capital_fraction; equal weights if None).
    - Entry (right before the close): buy an ATM straddle iff EP_per_share >= threshold * premium_per_share,
      apply commissions on buy.
    - Expiry: at the first trading day with date >= entry_date + 10 calendar days we exercise at close intrinsic.
    - Portfolio value is marked daily
    - Option contract multiplier = 100.
    - One new entry per ticker per day at most.
    """
    rng = np.random.default_rng(seed)

    df = input_df.copy()
    tickers = df["Ticker"].drop_duplicates().tolist()
    n_tickers = len(tickers)
    model = model.lower()
    ep_col = f"EP_{model}"

    #Build fast per-ticker frames (Date as index)
    per_ticker: Dict[str, pd.DataFrame] = {
        t: g.set_index("Date")[["High", "Low", "Close", "Straddle_premium", ep_col]].sort_index()
        for t, g in df.groupby("Ticker", sort=False)
    }

    #Use the shared calendar of dates 
    all_dates = df["Date"].drop_duplicates().sort_values().tolist()
    first_date, last_date = all_dates[0], all_dates[-1]

    #Capital buckets
    if capital_fraction is None:
        weights = np.full(n_tickers, 1.0 / n_tickers, dtype=float)
    else:
        cf = np.asarray(capital_fraction, dtype=float)
        if cf.shape[0] != n_tickers:
            raise ValueError("capital_fraction length must equal the number of tickers in df_backtest.")
        total = cf.sum()
        if total <= 0:
            raise ValueError("capital_fraction must sum to a positive value.")
        weights = cf / total

    starting_cash: Dict[str, float] = {t: float(capital * w) for t, w in zip(tickers, weights)}
    free_cash: Dict[str, float] = starting_cash.copy()

    #Positions and trade log
    open_positions: Dict[str, List[Position]] = {t: [] for t in tickers}
    closed_trades: List[dict] = []

    def _get_row(t: str, d: pd.Timestamp):
        row = per_ticker[t].loc[d]
        #row is a Series with indices: High, Low, Close, Straddle_premium, ep_col
        return float(row["High"]), float(row["Low"]), float(row["Close"]), float(row["Straddle_premium"]), float(row[ep_col])

    #Main backtest loop over dates
    equity_dates = []
    equity_values = []

    for current_date in all_dates:
        #Handle exits first 
        for t in tickers:
            if current_date not in per_ticker[t].index:
                continue
            high, low, close, _, _ = _get_row(t, current_date)

            #iterate over a copy to allow removal
            remaining_positions: List[Position] = []
            for pos in open_positions[t]:
                if not pos.open:
                    continue

                exercised = False
                payout = 0.0

                #If still alive in calendar-time
                if current_date <= pos.expiry_deadline:
                    max_intraday_intrinsic = max(max(high - pos.strike, 0.0),
                                                 max(pos.strike - low, 0.0))
                    if max_intraday_intrinsic >= pos.tp_intrinsic_per_share:
                        realized_per_share = pos.tp_intrinsic_per_share
                        payout = realized_per_share * 100.0 * pos.n_contracts
                        cash_in = max(payout - 1 * pos.n_contracts, 0.0) * (1.0 - commissions)
                        free_cash[t] += cash_in
                        exercised = True

                        pnl = cash_in - pos.total_cost_out
                        closed_trades.append({
                            "Ticker": t,
                            "EntryDate": pos.entry_date,
                            "ExitDate": current_date,
                            "DaysHeld": (current_date - pos.entry_date).days,
                            "Strike": pos.strike,
                            "Premium_per_share": pos.premium_per_share,
                            "Contracts": pos.n_contracts,
                            "ExitReason": "TakeProfit",
                            "GrossPayout": payout,
                            "PnL": pnl
                        })

                #If not already exercised, check expiration
                if (not exercised) and (current_date >= pos.expiry_deadline):
                    intrinsic_at_close = max(abs(close - pos.strike), 0.0)
                    payout = intrinsic_at_close * 100.0 * pos.n_contracts
                    cash_in = max(payout - 1*pos.n_contracts, 0.0) * (1.0 - commissions)
                    free_cash[t] += cash_in
                    exercised = True

                    pnl = cash_in - pos.total_cost_out
                    closed_trades.append({
                        "Ticker": t,
                        "EntryDate": pos.entry_date,
                        "ExitDate": current_date,
                        "DaysHeld": (current_date - pos.entry_date).days,
                        "Strike": pos.strike,
                        "Premium_per_share": pos.premium_per_share,
                        "Contracts": pos.n_contracts,
                        "ExitReason": "Expiry",
                        "GrossPayout": payout,
                        "PnL": pnl
                    })

                if exercised:
                    pos.open = False
                else:
                    remaining_positions.append(pos)

            open_positions[t] = remaining_positions

        for t in tickers:
            if current_date not in per_ticker[t].index:
                continue
            high, low, close, premium_ps, _ep_today = _get_row(t, current_date)
            ep_ps = float(per_ticker[t].loc[current_date, ep_col])
            if np.isnan(ep_ps) or np.isnan(premium_ps):
                continue

            #Entry condition: EP >= threshold * premium
            if ep_ps >= threshold * premium_ps:
                #Budget exactly 10% of the ticker's current free cash for this trade
                allow_cash = 0.1 * free_cash[t]
                if allow_cash <= 0:
                    continue

                cost_per_contract_out = premium_ps * 100.0 * (1.0 + commissions)
                n_contracts = int(allow_cash // cost_per_contract_out)
                if n_contracts < 1:
                    continue
                
                tp_intrinsic_ps = ((1.08 * premium_ps * 100.0 * (1.0 + commissions)) / (1.0 - commissions) + 1.0) / 100.0
                gross_cost = premium_ps * 100.0 * n_contracts
                total_cost_out = gross_cost * (1.0 + commissions)
                free_cash[t] -= total_cost_out

                pos = Position(
                    ticker=t,
                    entry_date=current_date,
                    expiry_deadline=current_date + pd.Timedelta(days=10),
                    strike=close,
                    premium_per_share=premium_ps,
                    ep_per_share=ep_ps,
                    tp_intrinsic_per_share=tp_intrinsic_ps,
                    n_contracts=n_contracts,
                    gross_cost=gross_cost,
                    total_cost_out=total_cost_out,
                    open=True
                )
                open_positions[t].append(pos)

        equity = sum(free_cash.values())
        for t in tickers:
            if current_date not in per_ticker[t].index:
                continue
            _, _, close, _, _ = _get_row(t, current_date)
            for pos in open_positions[t]:
                if pos.open:
                    intrinsic = max(abs(close - pos.strike), 0.0)
                    equity += intrinsic * 100.0 * pos.n_contracts

        equity_dates.append(current_date)
        equity_values.append(equity)

    #Results assembly
    equity_df = pd.DataFrame({"Date": equity_dates, "Equity": equity_values})
    equity_df.set_index("Date", inplace=True)

    trades_df = pd.DataFrame(closed_trades)
    if not trades_df.empty:
        trades_df.sort_values(["ExitDate", "Ticker", "EntryDate"], inplace=True)

    #Statistics
    final_equity = float(equity_df["Equity"].iloc[-1])
    total_pnl_pct = (final_equity - capital) / capital

    #Daily log returns and risk metrics
    eq = equity_df["Equity"].astype(float)
    rets = np.log(eq/eq.shift(1)).dropna()

    #Risk-free rate assumption for 2022-2024 period
    annual_rf_rate = 0.04  #4% annual risk-free rate 
    daily_rf_rate = annual_rf_rate / 252  #Convert to daily rate
    daily_rf_log = np.log(1 + daily_rf_rate)  #Convert to log-return equ

    daily_vol = rets.std(ddof=1)           #std of daily log-returns
    ann_vol = daily_vol * np.sqrt(252)     #annualised volatility

    sharpe = np.nan
    if daily_vol > 0:
        #Subtract log risk-free rate from log-returns
        excess_daily_log_return = rets.mean() - daily_rf_log
        sharpe = (excess_daily_log_return / daily_vol) * np.sqrt(252)
    
    #CAGR
    n_days = (equity_df.index[-1] - equity_df.index[0]).days
    cagr = np.nan
    if n_days > 0 and capital > 0:
        years = n_days / 365.25
        cagr = (final_equity / capital) ** (1 / years) - 1

    #Max drawdown
    roll_max = eq.cummax()
    dd = eq / roll_max - 1.0
    max_dd = dd.min()
    max_dd = float(max_dd) if pd.notna(max_dd) else np.nan

    #Trade stats
    n_trades = int(trades_df.shape[0]) if not trades_df.empty else 0
    gross_win = trades_df.loc[trades_df["PnL"] > 0, "PnL"].sum() if n_trades > 0 else 0.0
    gross_loss = -trades_df.loc[trades_df["PnL"] < 0, "PnL"].sum() if n_trades > 0 else 0.0
    profit_factor = (gross_win / gross_loss) if gross_loss > 0 else np.inf

    expectancy = trades_df["PnL"].mean() if n_trades > 0 else np.nan  #average PnL per trade

    stats = {
        "Start Date": str(first_date),
        "End Date": str(last_date),
        "Initial Equity": round(capital, 2),
        "Final Equity": round(final_equity, 2),
        "PnL": f"{round(total_pnl_pct*100, 2)}%",
        "Annualised Volatility": f"{round(100 * ann_vol, 2)}%" if not np.isnan(ann_vol) else np.nan,
        "Sharpe Ratio": round(sharpe, 3) if not np.isnan(sharpe) else np.nan,
        "CAGR": f"{round(100 * cagr, 2)}%" if not np.isnan(cagr) else np.nan,
        "Maximum Drawdown": f"{round(100 * max_dd, 2)}%" if not np.isnan(max_dd) else np.nan,
        "Profit Factor": (round(profit_factor, 3) if np.isfinite(profit_factor) else "inf"),
        "Per-Trade Expectancy": round(expectancy, 2) if not np.isnan(expectancy) else np.nan
    }

    if do_plot:
        model_names_dict = {"lr":"Linear Regression", "nn":"Feedforward Neural Network", "rf":"Random Forest", 
                        "arima":"ARIMA(X)", "lstmnn":"Long Short-Term Memory Neural Network"}
        plt.figure(figsize=(10, 6))
        plt.plot(equity_df.index, equity_df["Equity"], color="green")
        plt.title(f"Capital Curve – {model_names_dict[model]}", fontsize=22, weight="bold")
        plt.xlabel("Date", fontsize=25)
        plt.ylabel("Total Capital", fontsize=25)
        plt.tick_params(axis="x", colors="black", labelsize=13)
        plt.tick_params(axis="y", colors="black", labelsize=16)
        plt.grid(True, which="major", linestyle="-", alpha=0.15)
        plt.tight_layout()
        plt.show()
        plt.close()

    stats_df = pd.DataFrame(list(stats.items()), columns=["Metric", "Value"]).iloc[2:].reset_index(drop=True)
    return stats_df

##### Results

In [None]:
model_names = ["lr", "nn", "rf", "arima", "lstmnn"]
l = [x/100 for x in range(5,201,5)]
models_best_threshold = {"lr":0.05, "nn":0.05, "rf":0.05, "arima":0.05, "lstmnn":0.05}

In [None]:
for model_name in model_names:
    print("-------------------------------------------")
    print(f"Model = {model_name}")
    best_sharpe = -np.inf
    for x in l:
        sharpe_ratio = backtest_strategy(input_df=df_backtest, model=model_name, capital=100000, 
                       capital_fraction=None, commissions=0.01, threshold=x, do_plot=False).iloc[4,1]
        print(f"threshold = {x}, sharpe = {sharpe_ratio}")
        if sharpe_ratio > best_sharpe:
            models_best_threshold[model_name] = x
            best_sharpe = sharpe_ratio

In [None]:
models_best_threshold

In [None]:
backtest_strategy(input_df=df_backtest, model="lr", capital=100000, capital_fraction=None, 
                  commissions=0.01, threshold=models_best_threshold["lr"], do_plot=True, plot_number=153)

In [None]:
backtest_strategy(input_df=df_backtest, model="nn", capital=100000, capital_fraction=None, 
                  commissions=0.01, threshold=models_best_threshold["nn"], do_plot=True, plot_number=154)

In [None]:
backtest_strategy(input_df=df_backtest, model="rf", capital=100000, capital_fraction=None, 
                  commissions=0.01, threshold=models_best_threshold["rf"], do_plot=True, plot_number=155)

In [None]:
backtest_strategy(input_df=df_backtest, model="arima", capital=100000, capital_fraction=None, 
                  commissions=0.01, threshold=models_best_threshold["arima"], do_plot=True, plot_number=156)

In [None]:
backtest_strategy(input_df=df_backtest, model="lstmnn", capital=100000, capital_fraction=None, 
                  commissions=0.01, threshold=models_best_threshold["lstmnn"], do_plot=True, plot_number=157)