## Imports

In [None]:
import os
from pathlib import Path
import datetime
from typing import List

from tqdm import tqdm
from dataclasses import dataclass, asdict

import polars as pl 
import numpy as np
from sklearn.linear_model import Ridge, Lasso, LinearRegression
from sklearn.ensemble import VotingRegressor
from sklearn.preprocessing import StandardScaler, RobustScaler
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import mean_squared_error

import kaggle_evaluation.default_inference_server

## Project Directory Structure

In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

## Configurations

In [None]:
# ============ PATHS ============
DATA_PATH: Path = Path('/kaggle/input/hull-tactical-market-prediction/')

# ============ RETURNS TO SIGNAL CONFIGS ============
MIN_SIGNAL: float = 0.0                         # Minimum value for the daily signal 
MAX_SIGNAL: float = 2.0                         # Maximum value for the daily signal 
SIGNAL_MULTIPLIER: float = 400.0                # Multiplier of the OLS market forward excess returns predictions to signal 

# ============ MODEL CONFIGS ============
N_SPLITS: int = 5                               # Number of time series cross validation splits
RIDGE_ALPHAS: np.ndarray = np.logspace(-3, 3, 50)  # Ridge regularization parameters to test
LASSO_ALPHAS: np.ndarray = np.logspace(-4, 1, 50)  # Lasso regularization parameters to test
USE_ROBUST_SCALER: bool = True                  # Use RobustScaler (better for outliers) vs StandardScaler
ENSEMBLE_WEIGHTS: dict = {'ridge': 0.5, 'lasso': 0.3, 'ols': 0.2}  # Weights for ensemble

## Dataclasses Helpers

In [None]:
@dataclass
class DatasetOutput:
    X_train : pl.DataFrame 
    X_test: pl.DataFrame
    y_train: pl.Series
    y_test: pl.Series
    scaler: StandardScaler | RobustScaler

@dataclass 
class ModelParameters:
    n_splits: int
    ridge_alphas: np.ndarray
    lasso_alphas: np.ndarray
    use_robust_scaler: bool
    ensemble_weights: dict
    
    def __post_init__(self): 
        if self.n_splits < 2:
            raise ValueError("n_splits must be at least 2 for TimeSeriesSplit")
        if sum(self.ensemble_weights.values()) != 1.0:
            raise ValueError("Ensemble weights must sum to 1.0")
        
@dataclass(frozen=True)
class RetToSignalParameters:
    signal_multiplier: float 
    min_signal : float = MIN_SIGNAL
    max_signal : float = MAX_SIGNAL

## Set the Parameters

In [None]:
ret_signal_params = RetToSignalParameters(
    signal_multiplier= SIGNAL_MULTIPLIER
)

model_params = ModelParameters(
    n_splits=N_SPLITS,
    ridge_alphas=RIDGE_ALPHAS,
    lasso_alphas=LASSO_ALPHAS,
    use_robust_scaler=USE_ROBUST_SCALER,
    ensemble_weights=ENSEMBLE_WEIGHTS
)

## Dataset Loading/Creating Helper Functions

In [None]:
def load_trainset() -> pl.DataFrame:
    """Loads and preprocesses the training dataset.""" 
    return (
        pl.read_csv(DATA_PATH / "train.csv")
        .rename({'market_forward_excess_returns':'target'})
        .with_columns(
            pl.exclude('date_id').cast(pl.Float64, strict=False)
        )
        .head(-10)
    )

def load_testset() -> pl.DataFrame:
    """Loads and preprocesses the testing dataset.""" 
    return (
        pl.read_csv(DATA_PATH / "test.csv")
        .rename({'lagged_forward_returns':'target'})
        .with_columns(
            pl.exclude('date_id').cast(pl.Float64, strict=False)
        )
    )

def create_example_dataset(df: pl.DataFrame) -> pl.DataFrame:
    """Creates new features and cleans a DataFrame.""" 
    vars_to_keep: List[str] = [
        "S2", "E2", "E3", "P9", "S1", "S5", "I2", "P8",
        "P10", "P12", "P13", "U1", "U2", "U3", "U4", "U5"
    ]

    return (
        df.with_columns([
            (pl.col("I2") - pl.col("I1")).alias("U1"),
            (pl.col("M11") / ((pl.col("I2") + pl.col("I9") + pl.col("I7")) / 3)).alias("U2"),
            (pl.col("S2") / (pl.col("S1") + 1e-6)).alias("U3"),
            (pl.col("E2") * pl.col("E3")).alias("U4"),
            (pl.col("P9") + pl.col("P10") + pl.col("P12")).alias("U5"),
        ])
        .select(["date_id", "target"] + vars_to_keep)
        .with_columns([
            pl.col(col).fill_null(pl.col(col).ewm_mean(com=0.5))
            for col in vars_to_keep
        ])
        .drop_nulls()
    )
    
def join_train_test_dataframes(train: pl.DataFrame, test: pl.DataFrame) -> pl.DataFrame:
    """Joins two dataframes by common columns and concatenates them vertically.""" 
    common_columns: list[str] = [col for col in train.columns if col in test.columns]
    return pl.concat([train.select(common_columns), test.select(common_columns)], how="vertical")

def split_dataset(train: pl.DataFrame, test: pl.DataFrame, features: list[str], use_robust: bool = True) -> DatasetOutput: 
    """Splits the data into features (X) and target (y), and scales the features.""" 
    X_train = train.drop(['date_id','target']) 
    y_train = train.get_column('target')
    X_test = test.drop(['date_id','target']) 
    y_test = test.get_column('target')
    
    scaler = RobustScaler() if use_robust else StandardScaler()
    
    X_train_scaled_np = scaler.fit_transform(X_train)
    X_train = pl.from_numpy(X_train_scaled_np, schema=features)
    
    X_test_scaled_np = scaler.transform(X_test)
    X_test = pl.from_numpy(X_test_scaled_np, schema=features)
    
    return DatasetOutput(
        X_train = X_train,
        y_train = y_train, 
        X_test = X_test, 
        y_test = y_test,
        scaler = scaler
    )

## Converting Return Prediction to Signal\n\nHere is an example of a potential function used to convert a prediction based on the market forward excess return to a daily signal position.

In [None]:
def convert_ret_to_signal(
    ret_arr: np.ndarray,
    params: RetToSignalParameters
) -> np.ndarray:
    """Converts raw model predictions (expected returns) into a trading signal.""" 
    return np.clip(
        ret_arr * params.signal_multiplier + 1, params.min_signal, params.max_signal
    )

## Looking at the Data

In [None]:
train: pl.DataFrame = load_trainset()
test: pl.DataFrame = load_testset() 
print(train.tail(3)) 
print(test.head(3))

## Generating the Train and Test

In [None]:
df: pl.DataFrame = join_train_test_dataframes(train, test)
df = create_example_dataset(df=df) 
train: pl.DataFrame = df.filter(pl.col('date_id').is_in(train.get_column('date_id')))
test: pl.DataFrame = df.filter(pl.col('date_id').is_in(test.get_column('date_id')))

FEATURES: list[str] = [col for col in test.columns if col not in ['date_id', 'target']]

dataset: DatasetOutput = split_dataset(train=train, test=test, features=FEATURES, use_robust=model_params.use_robust_scaler)

X_train: pl.DataFrame = dataset.X_train
X_test: pl.DataFrame = dataset.X_test
y_train: pl.DataFrame = dataset.y_train
y_test: pl.DataFrame = dataset.y_test
scaler: StandardScaler | RobustScaler = dataset.scaler

## Fitting the Model with Ensemble + Time-Series CV

In [None]:
from typing import Tuple

def find_best_alpha_timeseries(X, y, alphas: np.ndarray, model_class, n_splits: int = 5) -> Tuple[float, list]:
    tscv = TimeSeriesSplit(n_splits=n_splits)
    alpha_scores = []
    
    for alpha in alphas:
        fold_scores = []
        for train_idx, val_idx in tscv.split(X):
            X_tr, X_val = X[train_idx], X[val_idx]
            y_tr, y_val = y[train_idx], y[val_idx]
            model = model_class(alpha=alpha)
            model.fit(X_tr, y_tr)
            pred = model.predict(X_val)
            mse = mean_squared_error(y_val, pred)
            fold_scores.append(mse)
        alpha_scores.append(np.mean(fold_scores))
    
    best_idx = np.argmin(alpha_scores)
    return alphas[best_idx], alpha_scores

# Convert to numpy for sklearn
X_train_np = X_train.to_numpy()
y_train_np = y_train.to_numpy()

# Find best Ridge alpha
best_ridge_alpha, _ = find_best_alpha_timeseries(X_train_np, y_train_np, model_params.ridge_alphas, Ridge, model_params.n_splits)
# Find best Lasso alpha  
best_lasso_alpha, _ = find_best_alpha_timeseries(X_train_np, y_train_np, model_params.lasso_alphas, Lasso, model_params.n_splits)

# Create ensemble model
model = VotingRegressor(
    estimators=[
        ('ridge', Ridge(alpha=best_ridge_alpha)),
        ('lasso', Lasso(alpha=best_lasso_alpha)),
        ('ols', LinearRegression())
    ],
    weights=[model_params.ensemble_weights['ridge'], model_params.ensemble_weights['lasso'], model_params.ensemble_weights['ols']]
)
model.fit(X_train_np, y_train_np)

## Prediction Function via Kaggle Server

In [None]:
def predict(test: pl.DataFrame) -> float:
    test = test.rename({'lagged_forward_returns':'target'})
    df: pl.DataFrame = create_example_dataset(test)
    X_test: pl.DataFrame = df.select(FEATURES)
    X_test_scaled_np: np.ndarray = scaler.transform(X_test)
    X_test: pl.DataFrame = pl.from_numpy(X_test_scaled_np, schema=FEATURES)
    raw_pred: float = model.predict(X_test)[0]
    return convert_ret_to_signal(np.array([raw_pred]), ret_signal_params)[0]

## Launch Server

In [None]:
inference_server = kaggle_evaluation.default_inference_server.DefaultInferenceServer(predict)

if os.getenv('KAGGLE_IS_COMPETITION_RERUN'):
    inference_server.serve()
else:
    inference_server.run_local_gateway(('/kaggle/input/hull-tactical-market-prediction/',))