## 🏆 Jane Street Real-Time Market Data Forecasting

* Author: **Roll20** (*creative-ataraxia.com*)
* Date: Jan 21st, 2025
* Objective: Design a quantitative model to forecast a financial responder value from `107` features
* Results: Placed at **8.77%** on the leaderboard

### 1. Imports and Ultilities


#### Imports

In [None]:
# Standard Library Imports**
import copy
import gc
import json
import os
import random
import shutil
import subprocess
import tempfile
import time
from functools import partial
from pathlib import Path

# Third-Party Libraries**
import catboost as cb
import joblib
import lightgbm as lgb
import numpy as np
import pandas as pd
import polars as pl
import tensorflow as tf
import torch
import torch.nn as nn
from sklearn.base import BaseEstimator, ClassifierMixin, RegressorMixin, clone
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import TimeSeriesSplit, train_test_split
from sklearn.pipeline import Pipeline
from sklearn.utils import shuffle
from scipy import stats
from tensorflow.keras import Input, Model, callbacks, layers, losses, models, optimizers
from torch.utils.data import DataLoader, Dataset
from tqdm.auto import tqdm
import xgboost as xgb
import matplotlib.pyplot as plt
from kaggle_evaluation import jane_street_inference_server


#### Utility Functions

* A set of utility functions for executing shell commands and creating folders. 
* These functions help automate various tasks during setup and deployment.


In [None]:
def run_shell_command(command: str, cwd: str = None) -> None:
    """
    Executes a shell command and prints the output in real-time.
    """
    try:
        print(f"Running command: {command}")
        process = subprocess.Popen(
            command,
            cwd=cwd,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            shell=True,
            universal_newlines=True
        )
        while True:
            output = process.stdout.readline()
            if output == '' and process.poll() is not None:
                break
            if output:
                print(output.strip())
        stderr = process.stderr.read()
        if stderr:
            print("Errors:\n", stderr)
    except Exception as e:
        print(f"An error occurred: {e}")

def create_folder(path: str, rm: bool = False) -> None:
    """
    Creates a folder, optionally removing it first if it exists.
    """
    if rm and os.path.exists(path):
        shutil.rmtree(path)
    os.makedirs(path, exist_ok=True)


#### Custom Polar Transformer

A custom transformer using Polars to preprocess data by scaling features, filling missing values, and clipping time-related features.

In [None]:
class PolarsTransformer:
    """
    A custom transformer for preprocessing data using Polars.
    """
    def __init__(self, features: list = None, fillnull: bool = True, scale: bool = True, clip_time: bool = True) -> None:
        self.features = features
        self.fillnull = fillnull
        self.scale = scale
        self.clip_time = clip_time
        self.statistics_mean_std = None
        self.statistics_min_max = None

    def set_features(self, features: list) -> None:
        self.features = features

    def fit_transform(self, df: pl.DataFrame) -> pl.DataFrame:
        if self.scale:
            self.statistics_mean_std = {
                column: {"mean": df[column].mean(), "std": df[column].std()}
                for column in self.features
            }

        if self.clip_time:
            self.statistics_min_max = {
                column: {"min": df[column].min(), "max": df[column].max()}
                for column in ["feature_time_id"]
            }

        if self.fillnull:
            df = df.with_columns([pl.col(column).fill_null(0.0) for column in self.features])

        if self.scale:
            df = df.with_columns([
                ((pl.col(column) - self.statistics_mean_std[column]["mean"]) /
                 self.statistics_mean_std[column]["std"])
                for column in self.features
            ])

        return df

    def transform(self, df: pl.DataFrame, refit: bool = False) -> pl.DataFrame:
        if refit and self.clip_time:
            self.statistics_min_max.update({
                column: {
                    "min": (self.statistics_min_max[column]["min"] if df[column].min() is None
                            else min(df[column].min(), self.statistics_min_max[column]["min"])),
                    "max": (self.statistics_min_max[column]["max"] if df[column].max() is None
                            else max(df[column].max(), self.statistics_min_max[column]["max"]))
                }
                for column in ["feature_time_id"]
            })

        if self.clip_time:
            df = df.with_columns([pl.col(column).clip(self.statistics_min_max[column]["min"],
                                                         self.statistics_min_max[column]["max"])
                                  for column in ["feature_time_id"]])

        if self.fillnull:
            df = df.with_columns([pl.col(column).fill_null(0.0) for column in self.features])

        if self.scale:
            df = df.with_columns([
                ((pl.col(column) - self.statistics_mean_std[column]["mean"]) / self.statistics_mean_std[column]["std"])
                for column in self.features
            ])

        return df


#### Metrics

* A weighted R² metric, both in NumPy and PyTorch, as matched to the competition’s evaluation metric. 
* A custom PyTorch loss (WeightedR2Loss) is also provided so that our models can optimize for this metric directly.


In [None]:
def r2_weighted(y_true: np.array, y_pred: np.array, sample_weight: np.array) -> float:
    """
    Compute the weighted R² score.
    """
    r2 = 1 - np.average((y_pred - y_true) ** 2, weights=sample_weight) / (
        np.average((y_true) ** 2, weights=sample_weight) + 1e-38
    )
    return r2

def r2_weighted_torch(y_true: torch.Tensor, y_pred: torch.Tensor, sample_weight: torch.Tensor) -> torch.Tensor:
    """
    Compute the weighted R² score using PyTorch tensors.
    """
    numerator = torch.sum(sample_weight * (y_pred - y_true) ** 2)
    denominator = torch.sum(sample_weight * (y_true) ** 2) + 1e-38
    r2 = 1 - (numerator / denominator)
    return r2

class WeightedR2Loss(nn.Module):
    """
    PyTorch loss function for weighted R².
    """
    def __init__(self, epsilon: float = 1e-38) -> None:
        super(WeightedR2Loss, self).__init__()
        self.epsilon = epsilon

    def forward(self, y_pred: torch.Tensor, y_true: torch.Tensor, weights: torch.Tensor) -> torch.Tensor:
        numerator = torch.sum(weights * (y_pred - y_true) ** 2)
        denominator = torch.sum(weights * (y_true) ** 2) + 1e-38
        loss = numerator / denominator
        return loss

def gs_metric(
        true: np.ndarray,
        pred: np.ndarray,
        week: np.ndarray,
        penalty: bool = False,
        verbose: bool = False,
) -> float:
    """
    Calculate the Gini Stability metric.

    See details here:
    www.kaggle.com/competitions/home-credit-credit-risk-model-stability/overview/evaluation

    Arguments:
        true: array with labels
        pred: array with predictions
        week: array with week numbers
        penalty: whether to apply a slope penalty
        verbose: whether to print the results
    Returns:
        out: float
    """
    # Sort by week
    sorted_indices = np.argsort(week)
    week_sorted = week[sorted_indices]
    true_sorted = true[sorted_indices]
    pred_sorted = pred[sorted_indices]

    # Group by week
    week_unique, week_index = np.unique(week_sorted, return_index=True)
    grouped_true = np.split(true_sorted, week_index[1:])
    grouped_pred = np.split(pred_sorted, week_index[1:])

    # Calculate Gini for each week
    ginis = np.zeros(len(week_unique))
    for i, (true, pred) in enumerate(zip(grouped_true, grouped_pred)):
        if len(np.unique(true)) == 1:
            gini = 0.0
        else:
            gini = roc_auc_score(true, pred) * 2 - 1
        ginis[i] = gini

    # Calculate Gini Stability
    slope, intercept, _, _, _ = stats.linregress(week_unique, ginis)
    residuals = ginis - (slope*week_unique + intercept)
    out = np.mean(ginis) - 0.5 * np.std(residuals)
    if penalty:
        out += 88.0 * min(0, slope)

    # Print results
    if verbose:
        print(
            f"Stability gini: {out:.3f}"
            f", gini: {np.mean(ginis):.3f}"
            f", slope: {88.0 * slope:.3f}"
            f", std: {0.5 * np.std(residuals):.3f}"
        )
        plt.plot(ginis)

    out_dict = {
        "ginis": ginis,
        "slope": 88.0*slope,
        "std": np.std(residuals)
    }

    return out, out_dict


def r2_lgb(preds, train_data):
    """
    Custom evaluation metric for LightGBM that computes a weighted R² score.
    
    Parameters:
        preds (np.array): The predicted values.
        train_data (lightgbm.Dataset): The training dataset containing labels and sample weights.
        
    Returns:
        tuple: A tuple with:
            - A string with the name of the metric.
            - The computed weighted R² score.
            - A boolean indicating that higher values of R² are better.
    """
    labels = train_data.get_label()
    weights = train_data.get_weight()
    if weights is None or len(weights) == 0:
        weights = np.ones_like(labels)
    
    # Compute weighted mean squared error.
    mse = np.average((preds - labels) ** 2, weights=weights)
    
    # Compute weighted mean of the squared true labels.
    mean_y2 = np.average(labels ** 2, weights=weights) + 1e-38  # avoid division by zero
    
    # Compute weighted R².
    r2 = 1 - mse / mean_y2
    return 'r2', r2, True


import numpy as np

class R2Cbt:
    def get_final_error(self, error, weight):
        """
        Final error is computed as the ratio of the weighted error sum and the total weight.
        """
        return error / weight if weight != 0 else error

    def is_max_optimal(self):
        """
        For R², a higher value is better.
        """
        return True

    def evaluate(self, approxes, target, weight):
        """
        Evaluate the weighted R² metric.
        
        Parameters:
            approxes (list of list): A list (of length one) containing the predictions.
            target (list): The true target values.
            weight (list or None): Optional weights for each sample.
        
        Returns:
            tuple: A tuple (error_sum, weight_sum) where:
                   - error_sum is computed as R² multiplied by the total weight.
                   - weight_sum is the sum of weights.
        """
        # CatBoost passes approxes as a list (even for single-output regression)
        preds = np.array(approxes[0])
        target = np.array(target)
        
        if weight is None or len(weight) == 0:
            weight = np.ones_like(target)
        else:
            weight = np.array(weight)
        
        # Compute the weighted mean squared error.
        mse = np.average((preds - target) ** 2, weights=weight)
        # Compute weighted average of squared target values (with a small constant for numerical stability)
        mean_y2 = np.average(target ** 2, weights=weight) + 1e-38
        
        # Compute weighted R²
        r2 = 1 - mse / mean_y2
        # Multiply by total weight to obtain the error sum (CatBoost expects a pair: error_sum and weight_sum)
        error_sum = r2 * np.sum(weight)
        weight_sum = np.sum(weight)
        return error_sum, weight_sum

    def get_description(self):
        """
        Return a short string description of this metric.
        """
        return "R2"

def r2_cbt():
    """
    Returns an instance of the custom CatBoost metric for weighted R².
    """
    return R2Cbt()


### 2. Configurations

* Defines the environment-specific configurations, including file paths, API credentials, and global variables. 
* The configuration is set up to work both on Kaggle and on rented GPU environments.


In [None]:
# Global run name and model definitions
RUN_NAME = "full"
# The pretrained models chosen for inference (an ensemble of GRU models)
MODEL_NAMES = ["gru_2.0_700", "gru_2.1_700", "gru_2.2_700", "gru_3.0_700", "gru_3.1_700", "gru_3.2_700"]
WEIGHTS = np.array([1.0] * len(MODEL_NAMES)) / len(MODEL_NAMES)
WEIGHTS = WEIGHTS / sum(WEIGHTS)
N_ROLL = 1000
TEST_SIZE = 200
GAP = 0

print("_".join(MODEL_NAMES))
print(WEIGHTS)

# Prepare a small window of raw training data
MAX_DATE = 1698
COLS_ID = ['row_id', 'date_id', 'time_id', 'symbol_id', 'weight', 'is_scored']

DEBUG = False
CNT_DATES = 9
CNT_DATES_NOT_SCORED = 4

# Determine the execution environment based on environment variables
KAGGLE = 'KAGGLE_URL_BASE' in os.environ
VASTAI = not KAGGLE

# Define base paths for different environments
base_paths = {
    "VASTAI": Path("/home/janestreet2024"),
    "VASTAI_DATA": Path("/workspace/kaggle/janestreet"),
    "KAGGLE": Path("/kaggle/input"),
}

# Set paths based on the environment
if VASTAI:  # rented GPU
    base_path = base_paths["VASTAI"]
    base_path_data = base_paths["VASTAI_DATA"]
    PATH_DATA = base_path_data / "data"
    PATH_MODELS = base_path_data / "models"
    PATH_CODE = base_path / "dist/janestreet-0.1-py3-none-any.whl"
elif KAGGLE:
    base_path = base_paths["KAGGLE"]
    base_path_data = base_paths["KAGGLE"]
    PATH_DATA = base_path / "jane-street-real-time-market-data-forecasting"
    PATH_MODELS = base_path / "janestreet2025-models"
    PATH_CODE = base_path / "janestreet2025-code/janestreet-0.1-py3-none-any.whl"
else:
    raise ValueError("Unknown environment")

PATHS_DATA = {
    "train": PATH_DATA / "train",
    "test": PATH_DATA / "test",
}

# Set other configuration variables
# Wandb
WANDB_PROJECT = "kaggle_janestreet"

# Kaggle
KAGGLE_USERNAME = "alexmason11"

# Random seed
RANDOM_SEED = 42

# Data column names
COL_TARGET = "responder_6"
COL_ID = "symbol_id"
COL_DATE = "date_id"
COL_TIME = "time_id"
COL_WEIGHT = "weight"
COL_WEEK = "WEEK_NUM"
COLS_RESPONDERS = [f"responder_{i}" for i in range(11)]

### 3. Models

* Ensemble Model Selection: Classes for creating ensemble models, selecting the best ensemble 
    * (using score, correlation, or forward selection methods), and aggregating predictions
* Linear Models: Logistic regression
* Neural Network Models: Custom neural network architectures, GRU for time series forecasting
* Tree-based Models: Implementations of tree-based regressors such as LightGBM, CatBoost, XGBoost


In [None]:
class EnsembleModel:
    def __init__(self, model, name, weight, update):
        self.model = model
        self.name = name
        self.weight = weight
        self.update = update

class Ensemble:
    """LightGBM model."""

    def __init__(self, models) -> None:
        """
        Initialize the ensemble with a list of models.
        """
        self.models = models

    def fit(self, train_set, val_set, test_set=None, cat_cols: list = None, verbose: bool = False) -> None:
        for model in self.models:
            model.model.fit(train_set, val_set, test_set, cat_cols, verbose)
        return self

    def update(self, X, stocks, y, weights, dates, times, lr):
        for model in self.models:
            if model.update:
                model.model.update(X, stocks, y, weights, dates, times, lr)
        return self

    def predict(self, X: np.array, *args, **kwargs) -> np.array:
        """
        Predict probabilities.
        """
        preds = []
        weights = []
        for model in self.models:
            preds_i = model.model.predict(X, *args, **kwargs)
            preds.append(preds_i)
            weights.append(model.weight)
        preds = np.average(preds, axis=0, weights=weights / sum(weights))
        return preds, None

    def set_seed(self, seed: int) -> None:
        """
        Set the seed for the ensemble (if applicable).
        """
        pass

    def get_params(self, deep: bool = True):
        return {
            "models": self.models,
        }

    def set_params(self, **parameters):
        for parameter, value in parameters.items():
            setattr(self, parameter, value)
        return self

class EnsembleSelector:
    """Select models for the ensemble."""

    def __init__(self, params: dict, method: str = "score") -> None:
        """
        Initialize the EnsembleSelector.

        Arguments:
            params: Dictionary with parameters for the ensemble selection.
            method: Method for the ensemble selection: "score", "corr", "forward".
        """
        self.params = params
        self.method = method
        self.methods_map = {
            "score": self.find_best_ensemble_score,
            "corr": self.find_best_ensemble_corr,  # select models that maximize overall variance; don't add models with similar corr
            "forward": self.find_best_ensemble_forward,
        }
        self.find_best_ensemble = self.methods_map[self.method]
        self.selected_models = None

    def fit(self, X: pd.DataFrame, y: pd.Series, verbose: bool = True) -> None:
        """
        Fit the ensemble selector.
        """
        self.selected_models = self.find_best_ensemble(X, y)

    def predict(self, X: pd.DataFrame) -> np.array:
        """
        Predict probabilities using the selected ensemble.
        """
        preds = X[self.selected_models].mean(axis=1).values
        preds = preds[:, np.newaxis]
        preds = np.hstack((1 - preds, preds))
        return preds

    def find_best_ensemble_score(self, X: pd.DataFrame, y: pd.Series, corr_threshold: float = 0.95, max_n: int = 10) -> list:
        """
        Find the best combination of models by iteratively adding models based on score and checking correlation.
        """
        if "corr_threshold" in self.params:
            corr_threshold = self.params["corr_threshold"]
        if "max_n" in self.params:
            max_n = self.params["max_n"]

        cols = [i for i in X.columns if i not in [COL_ID, COL_DATE, COL_WEEK, COL_TARGET]]
        scores = {}
        for col in cols:
            score, ginis = gs_metric(y.values, X[col].values, X["WEEK_NUM"].values, verbose=False, penalty=False)
            scores[col] = score

        correlation_matrix = X[cols].corr()
        selected_models = []
        sorted_models = sorted(scores, key=scores.get, reverse=True)

        i = 0
        for model in sorted_models:
            if all(correlation_matrix[model][selected] < corr_threshold for selected in selected_models):
                selected_models.append(model)
                ensemble_preds = X[selected_models].mean(axis=1).values
                score, ginis = gs_metric(y.values, ensemble_preds, X[COL_WEEK].values, verbose=False, penalty=False)
                print(f"Model {i}. Adding {model}: individual score {scores[model]:.4f}, ensemble score {score:.4f}")
                i += 1
                if len(selected_models) >= max_n:
                    break
            else:
                print(f"Skipping {model}: correlation is too high")

        return selected_models

    def find_best_ensemble_forward(self, X: pd.DataFrame, y: pd.Series, max_n: int = 10, weights: bool = False):
        """
        Find the best combination of models by iteratively adding models based on total score.
        """
        if "max_n" in self.params:
            max_n = self.params["max_n"]
        if "weights" in self.params:
            weights = self.params["weights"]
        cols = [i for i in X.columns if i not in [COL_ID, COL_DATE, COL_WEEK, COL_TARGET]]
        cols_best = []
        score_best = 0.0
        while True:
            cols_left = [i for i in cols if i not in cols_best]
            col_best = None
            for col in tqdm(cols if weights else cols_left):
                cols_tmp = cols_best.copy()
                cols_tmp.append(col)
                preds = X[cols_tmp].mean(axis=1).values
                score, ginis = gs_metric(y.values, preds, X["WEEK_NUM"].values, verbose=False, penalty=False)
                if score > score_best:
                    print(f"{col}: {score:.4f}, {ginis['slope']:.4f}")
                    score_best = score
                    col_best = col

            if col_best is not None:
                cols_best.append(col_best)
            else:
                print(cols_best)
                break

            if len(cols_best) >= max_n:
                break

        return cols_best

    def find_best_ensemble_corr(self, X: pd.DataFrame, y, score_threshold: float = 0.65, max_n: int = 10):
        """
        Find the best combination of models by iteratively adding models based on correlation and score threshold.
        """
        if "score_threshold" in self.params:
            score_threshold = self.params["score_threshold"]
        if "max_n" in self.params:
            max_n = self.params["max_n"]
        cols = [i for i in X.columns if i not in [COL_ID, COL_DATE, COL_WEEK, COL_TARGET]]
        scores = {}
        for col in cols:
            score, ginis = gs_metric(y.values, X[col].values, X[COL_WEEK].values, verbose=False, penalty=False)
            if score >= score_threshold:
                scores[col] = score

        cols = list(scores.keys())
        sorted_models = sorted(scores, key=scores.get, reverse=True)
        selected_models = [sorted_models[0]]
        print(f"Model 0. Starting ensemble with {selected_models[0]}: score {scores[selected_models[0]]:.4f}")

        correlation_matrix = X[cols].corr()

        i = 1
        while len(selected_models) < max_n:
            average_correlations = correlation_matrix[selected_models].mean(axis=1).drop(labels=selected_models, errors='ignore')
            next_model = average_correlations.idxmin()
            selected_models.append(next_model)

            ensemble_preds = X[selected_models].mean(axis=1).values
            score, ginis = gs_metric(y.values, ensemble_preds, X[COL_WEEK].values, verbose=False, penalty=False)
            print(f"Model {i}. Adding {next_model}: score {score:.4f}")

            if average_correlations.empty:
                break

            i += 1

        return selected_models


#### Linear Models

In [None]:
class LogReg(BaseEstimator, ClassifierMixin):
    """Logistic Regression model."""

    def __init__(self, params: dict) -> None:
        """
        Initialize the Logistic Regression model.
        """
        self.params = params
        self.model = LogisticRegression(**params)
        self.features = None
        self.classes_ = None

    def fit(self, X: pd.DataFrame, y: pd.Series, verbose: bool = False) -> None:
        """
        Fit the model.
        """
        self.model.fit(X, y)
        self.classes_ = self.model.classes_
        self.features = X.columns
        return self

    def predict(self, X: pd.DataFrame) -> np.array:
        """
        Predict probabilities.
        """
        return self.model.predict(X)

    def set_seed(self, seed: int) -> None:
        """
        Set the seed for the model.
        """
        self.params["random_state"] = seed

    def get_params(self, deep: bool = True) -> dict:
        """
        Get parameters for this estimator.
        """
        return {
            "params": self.params,
        }

    def set_params(self, **parameters):
        """
        Set the parameters of this estimator.
        """
        for parameter, value in parameters.items():
            setattr(self, parameter, value)
        return self


#### NN

In [None]:
def flatten_collate_fn(batch: list) -> tuple[torch.Tensor]:
    """
    Collate function for DataLoader to flatten the batch.
    """
    X, resp, y, weights = zip(*batch)
    X = torch.cat(X, dim=0)
    resp = torch.cat(resp, dim=0)
    y = torch.cat(y, dim=0)
    weights = torch.cat(weights, dim=0)
    return X, resp, y, weights


class CustomTensorDataset(Dataset):
    """Dataset wrapping tensors, grouped by datetime."""

    T = 968

    def __init__(self, X: np.array, resp: np.array, y: np.array, weights: np.array,
                 symbols: np.array, dates: np.array, times: np.array, on_batch: bool = True):
        self.on_batch = on_batch
        self.num_features = X.shape[1]

        self.X = torch.tensor(X, dtype=torch.float32)
        self.resp = torch.tensor(resp, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.float32)
        self.weights = torch.tensor(weights, dtype=torch.float32)
        self.symbols = torch.tensor(symbols, dtype=torch.int64)
        self.dates = torch.tensor(dates, dtype=torch.int64)
        self.times = torch.tensor(times, dtype=torch.int64)

        self.X = torch.nan_to_num(self.X, 0)

        self.K = X.shape[1]

        if not self.on_batch:
            T = self.T
            N, K = self.X.shape
            sorted_indices = torch.argsort(self.times, stable=True)
            sorted_indices = sorted_indices[torch.argsort(self.dates[sorted_indices], stable=True)]
            sorted_indices = sorted_indices[torch.argsort(self.symbols[sorted_indices], stable=True)]
            self.X = self.X[sorted_indices]
            self.resp = self.resp[sorted_indices]
            self.dates = self.dates[sorted_indices]
            self.y = self.y[sorted_indices]
            self.weights = self.weights[sorted_indices]
            self.symbols = self.symbols[sorted_indices]
            self.X = self.X.view(N // T, T, K)
            self.resp = self.resp.view(N // T, T, self.resp.shape[-1])
            self.dates = self.dates.view(N // T, T)[:, 0].squeeze()
            self.y = self.y.view(N // T, T)
            self.weights = self.weights.view(N // T, T)
            self.symbols = self.symbols.view(N // T, T)

        self.datetime_ids = self.dates
        self.unique_datetimes, self.inverse_indices, self.counts = torch.unique(
            self.datetime_ids, return_inverse=True, return_counts=True)
        self.sorted_indices = torch.argsort(self.inverse_indices)
        self.group_end_indices = torch.cumsum(self.counts, dim=0)
        self.group_start_indices = torch.cat((torch.tensor([0]), self.group_end_indices[:-1]))

    def __getitem__(self, index: int) -> tuple[torch.Tensor]:
        start = self.group_start_indices[index]
        end = self.group_end_indices[index]
        index = self.sorted_indices[start:end]
        X = self.X[index]
        resp = self.resp[index]
        y = self.y[index]
        weights = self.weights[index]
        if self.on_batch:
            T = max(self.times[index]) + 1
            X = X.reshape(T, -1, self.K).swapaxes(0, 1)
            resp = resp.reshape(T, -1, resp.shape[1]).swapaxes(0, 1)
            y = y.reshape(T, -1).swapaxes(0, 1)
            weights = weights.reshape(T, -1).swapaxes(0, 1)
        return X, resp, y, weights

    def __len__(self) -> int:
        return len(self.unique_datetimes)


class ModelRBase(nn.Module):
    """Base recurrent model."""

    def __init__(self, input_size: int, hidden_sizes: list, dropout_rates: list,
                 hidden_sizes_linear: list, dropout_rates_linear: list, model_type: str) -> None:
        super(ModelRBase, self).__init__()
        self.num_layers = len(hidden_sizes)
        self.gru_layers = nn.ModuleList()
        self.dropout_rates = nn.ModuleList()
        for i in range(self.num_layers):
            input_dim = input_size if i == 0 else hidden_sizes[i - 1]
            if model_type == "gru":
                layer = nn.GRU(input_dim, hidden_sizes[i], num_layers=1, batch_first=True)
            elif model_type == "lstm":
                layer = nn.LSTM(input_dim, hidden_sizes[i], num_layers=1, batch_first=True)
            else:
                raise ValueError("Unknown model type")
            self.gru_layers.append(layer)
            self.dropout_rates.append(nn.Dropout(dropout_rates[i]))
        n_input_linear = input_size if self.num_layers == 0 else hidden_sizes[-1]
        fc_layers = []
        if hidden_sizes_linear:
            for i in range(len(hidden_sizes_linear)):
                in_features = n_input_linear if i == 0 else hidden_sizes_linear[i - 1]
                fc_layers.append(nn.Linear(in_features, hidden_sizes_linear[i]))
                fc_layers.append(nn.ReLU())
                fc_layers.append(nn.Dropout(dropout_rates_linear[i]))
            fc_layers.append(nn.Linear(hidden_sizes_linear[-1], 1))
        else:
            fc_layers.append(nn.Linear(n_input_linear, 1))
        self.fc = nn.Sequential(*fc_layers)

    def forward(self, x: torch.Tensor, hidden: bool = None) -> tuple[torch.Tensor, torch.Tensor]:
        D, T, _ = x.shape
        if hidden is None:
            hidden = [None] * self.num_layers
        for i, gru in enumerate(self.gru_layers):
            x, h = gru(x, hidden[i])
            if hasattr(self, "dropout_rates"):
                x = self.dropout_rates[i](x)
            hidden[i] = h
        x = x.reshape(D * T, -1)
        x = self.fc(x)
        x = x.reshape(D, T)
        return x, hidden


class ModelR(nn.Module):
    """Recurrent model with auxiliary targets."""

    def __init__(self, input_size: int, hidden_sizes: list, dropout_rates: list,
                 hidden_sizes_linear: list, dropout_rates_linear: list, model_type: str):
        super(ModelR, self).__init__()
        self.num_resp = 4
        self.grus = nn.ModuleList()
        self.fcs = nn.ModuleList()
        for _ in range(self.num_resp):
            self.grus.append(
                ModelRBase(input_size, hidden_sizes, dropout_rates, hidden_sizes_linear, dropout_rates_linear, model_type)
            )
        self.out = nn.Sequential(nn.Linear(self.num_resp, 1),)

    def forward(self, x: torch.Tensor, hidden: torch.Tensor | None = None) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
        D, T, _ = x.shape
        if hidden is None:
            hidden = [None] * self.num_resp
        out = []
        for i in range(len(self.grus)):
            z, h = self.grus[i](x, hidden[i])
            out.append(z)
            out[i] = out[i].reshape(D * T, -1)
            hidden[i] = h
        out_resp = torch.cat(out, dim=-1)
        y = self.out(out_resp)
        out_resp = out_resp.reshape(D, T, -1)
        y = y.reshape(D, T)
        return y, out_resp, hidden


class NN:
    """Neural network model for time series data with auxiliary targets."""

    def __init__(self, model_type: str = None, hidden_sizes: list = None, dropout_rates: list = None,
                 hidden_sizes_linear: list = None, dropout_rates_linear: list = None, lr: float = 0.001,
                 batch_size: int = 1, epochs: int = 100, early_stopping_patience: int = 10, early_stopping: bool = True,
                 lr_patience: int = 2, lr_factor: float = 0.5, lr_refit: float = 0.001, random_seed: int = 42) -> None:
        self.model_type = model_type
        self.hidden_sizes = hidden_sizes
        self.dropout_rates = dropout_rates
        self.hidden_sizes_linear = hidden_sizes_linear
        self.dropout_rates_linear = dropout_rates_linear
        self.lr = lr
        self.batch_size = batch_size
        self.epochs = epochs
        self.early_stopping_patience = early_stopping_patience
        self.early_stopping = early_stopping
        self.lr_patience = lr_patience
        self.lr_factor = lr_factor
        self.lr_refit = lr_refit
        self.random_seed = random_seed
        self.criterion = WeightedR2Loss()
        self.device = torch.device("cuda:1" if torch.cuda.is_available() else "cpu")
        self.model = None
        self.optimizer = None
        self.best_epoch = None
        self.features = None

    def fit(self, train_set: tuple, val_set: tuple, verbose: bool = False) -> None:
        torch.manual_seed(self.random_seed)
        train_dataset = CustomTensorDataset(*train_set, on_batch=True)
        train_dataloader = DataLoader(train_dataset, batch_size=self.batch_size, shuffle=True, collate_fn=flatten_collate_fn)
        val_dataset = CustomTensorDataset(*val_set, on_batch=True)
        val_dataloader = DataLoader(val_dataset, batch_size=1, shuffle=False, collate_fn=flatten_collate_fn)
        self.model = ModelR(train_dataset.num_features, self.hidden_sizes, self.dropout_rates,
                            self.hidden_sizes_linear, self.dropout_rates_linear, self.model_type).to(self.device)
        self.optimizer = torch.optim.AdamW(self.model.parameters(), lr=self.lr, weight_decay=0.01)
        train_r2s, val_r2s = [], []
        if verbose:
            print(f"Device: {self.device}")
            print(f"{'Epoch':^5} | {'Train Loss':^10} | {'Val Loss':^8} | {'Train R2':^9} | {'Val R2':^7} | {'LR':^7}")
            print("-" * 60)
        min_val_r2 = -np.inf
        best_epoch = 0
        no_improvement = 0
        best_model = None
        for epoch in range(self.epochs):
            train_loss, train_r2 = self.train_one_epoch(train_dataloader, verbose)
            val_loss, val_r2 = self.validate_one_epoch(val_dataloader, verbose)
            lr_last = self.optimizer.param_groups[0]["lr"]
            train_r2s.append(train_r2)
            val_r2s.append(val_r2)
            if verbose:
                print(f"{epoch+1:^5} | {train_loss:^10.4f} | {val_loss:^8.4f} | {train_r2:^9.4f} | {val_r2:^7.4f} | {lr_last:^7.5f}")
            if val_r2 > min_val_r2:
                min_val_r2 = val_r2
                best_model = copy.deepcopy(self.model.state_dict())
                no_improvement = 0
                best_epoch = epoch
            else:
                no_improvement += 1
            if self.early_stopping and no_improvement >= self.early_stopping_patience + 1:
                self.best_epoch = best_epoch + 1
                if verbose:
                    print(f"Early stopping on epoch {best_epoch+1}. Best score: {min_val_r2:.4f}")
                break
        if self.early_stopping:
            self.model.load_state_dict(best_model)

    def train_one_epoch(self, train_dataloader: DataLoader, verbose: bool) -> tuple[float, float]:
        self.model.train()
        total_loss = 0.0
        y_total, weights_total, preds_total = [], [], []
        itr = tqdm(train_dataloader) if verbose else train_dataloader
        for x_batch, resp_batch, y_batch, weights_batch in itr:
            x_batch, resp_batch, y_batch, weights_batch = (item.to(self.device) for item in [x_batch, resp_batch, y_batch, weights_batch])
            self.optimizer.zero_grad()
            out_y, out_resp, _ = self.model(x_batch, None)
            loss1 = self.criterion(out_y.flatten(), y_batch.flatten(), weights_batch.flatten())
            loss2 = self.criterion(out_resp[:, :, 0].flatten(), resp_batch[:, :, -1].flatten(), weights_batch.flatten())
            loss3 = self.criterion(out_resp[:, :, 1].flatten(), resp_batch[:, :, -2].flatten(), weights_batch.flatten())
            loss4 = self.criterion(out_resp[:, :, 2].flatten(), resp_batch[:, :, -3].flatten(), weights_batch.flatten())
            loss5 = self.criterion(out_resp[:, :, 3].flatten(), resp_batch[:, :, -4].flatten(), weights_batch.flatten())
            loss = loss1 + loss2 + loss3 + loss4 + loss5
            loss.backward()
            torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
            self.optimizer.step()
            total_loss += loss.item()
            y_total.append(y_batch.flatten())
            weights_total.append(weights_batch.flatten())
            preds_total.append(out_y.detach().flatten())
        y_total = torch.cat(y_total).cpu()
        weights_total = torch.cat(weights_total).cpu()
        preds_total = torch.cat(preds_total).cpu()
        train_r2 = r2_weighted_torch(y_total, preds_total, weights_total).item()
        train_loss = total_loss / len(train_dataloader)
        return train_loss, train_r2

    def validate_one_epoch(self, val_dataloader: DataLoader, verbose=False) -> tuple[float, float]:
        model = copy.deepcopy(self.model)
        losses, all_y, all_weights, all_preds = [], [], [], []
        itr = tqdm(val_dataloader) if verbose else val_dataloader
        for x_batch, resp_batch, y_batch, weights_batch in itr:
            x_batch, resp_batch, y_batch, weights_batch = (item.to(self.device) for item in [x_batch, resp_batch, y_batch, weights_batch])
            with torch.no_grad():
                model.eval()
                preds_batch, _, _ = model(x_batch, None)
                loss = self.criterion(preds_batch.flatten(), y_batch.flatten(), weights_batch.flatten())
                losses.append(loss.item())
                all_y.append(y_batch.flatten())
                all_weights.append(weights_batch.flatten())
                all_preds.append(preds_batch.flatten())
            if self.lr_refit > 0:
                optimizer = torch.optim.AdamW(model.parameters(), lr=self.lr_refit, weight_decay=0.01)
                optimizer.zero_grad()
                model.train()
                out_y, _, _ = model(x_batch, None)
                loss = self.criterion(out_y, y_batch, weights_batch)
                loss.backward()
                torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
                optimizer.step()
        all_y = torch.cat(all_y)
        all_weights = torch.cat(all_weights)
        all_preds = torch.cat(all_preds)
        loss = np.mean(losses)
        r2 = r2_weighted_torch(all_y, all_preds, all_weights).item()
        return loss, r2

    def update(self, X: np.array, y: np.array, weights: np.array, n_times: int):
        if self.lr_refit == 0.0:
            return
        X = torch.tensor(X, dtype=torch.float32)
        y = torch.tensor(y, dtype=torch.float32)
        weights = torch.tensor(weights, dtype=torch.float32)
        N, K = X.shape
        X = X.view(n_times, N // n_times, K).swapaxes(0, 1).to(self.device)
        y = y.view(n_times, N // n_times).swapaxes(0, 1).to(self.device)
        weights = weights.view(n_times, N // n_times).swapaxes(0, 1).to(self.device)
        self.model.train()
        optimizer = torch.optim.AdamW(self.model.parameters(), lr=self.lr_refit, weight_decay=0.01)
        optimizer.zero_grad()
        out_y, _, _ = self.model(X, None)
        loss = self.criterion(out_y.flatten(), y.flatten(), weights.flatten())
        loss.backward()
        torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
        optimizer.step()

    def predict(self, X: np.array, n_times: int = None, hidden: torch.Tensor | list | None = None) -> tuple[np.array, torch.Tensor | list]:
        X_tensor = torch.tensor(X, dtype=torch.float32)
        N, K = X.shape
        X_tensor = X_tensor.view(n_times, N // n_times, K).swapaxes(0, 1).to(self.device)
        X_tensor = torch.nan_to_num(X_tensor, 0)
        self.model.eval()
        with torch.no_grad():
            preds, _, hidden = self.model(X_tensor, hidden)
            preds = preds.swapaxes(0, 1)
            preds = preds.reshape(-1).cpu().numpy()
        return preds, hidden

    def get_params(self, deep: bool = True):
        return {
            "model_type": self.model_type,
            "hidden_sizes": self.hidden_sizes,
            "dropout_rates": self.dropout_rates,
            "hidden_sizes_linear": self.hidden_sizes_linear,
            "dropout_rates_linear": self.dropout_rates_linear,
            "lr": self.lr,
            "batch_size": self.batch_size,
            "epochs": self.epochs,
            "early_stopping_patience": self.early_stopping_patience,
            "early_stopping": self.early_stopping,
            "lr_patience": self.lr_patience,
            "lr_factor": self.lr_factor,
            "lr_refit": self.lr_refit,
            "random_seed": self.random_seed
        }

    def set_params(self, **parameters):
        for parameter, value in parameters.items():
            setattr(self, parameter, value)
        return self


#### Tree-Based Models

In [None]:
class LGBM(BaseEstimator, RegressorMixin):
    """LightGBM model."""

    def __init__(self, params: dict, early_stopping: bool = True, early_stopping_rounds: int = 50,
                 test_size: float = 0.05, features: str = None, features_cat: list | None = None,
                 shuffle: bool = False, classifier: bool = False) -> None:
        self.params = params
        self.early_stopping = early_stopping
        self.early_stopping_rounds = early_stopping_rounds
        self.test_size = test_size
        self.features = features
        self.features_cat = features_cat
        self.shuffle = shuffle
        if classifier:
            self.model = lgb.LGBMClassifier(**params)
        else:
            self.model = lgb.LGBMRegressor(**params)

    def fit(self, train_set, val_set, verbose: bool = False) -> None:
        X_train, y_train, weights_train, _, _, _ = train_set
        if not self.early_stopping:
            eval_set = [(X_train, y_train)]
            eval_weights_set = [weights_train]
            callbacks = [lgb.callback.log_evaluation(50 if verbose else 0)]
        else:
            X_val, y_val, weights_val, _, _, _ = val_set
            eval_set = [(X_train, y_train), (X_val, y_val)]
            eval_weights_set = [weights_train, weights_val]
            callbacks = [
                lgb.callback.log_evaluation(50 if verbose else 0),
                lgb.callback.early_stopping(stopping_rounds=self.early_stopping_rounds, verbose=verbose)
            ]
        self.model.fit(X_train, y_train,
                       sample_weight=weights_train,
                       eval_metric=[r2_lgb],
                       eval_set=eval_set,
                       eval_sample_weight=eval_weights_set,
                       feature_name=self.features,
                       categorical_feature=self.features_cat,
                       callbacks=callbacks)
        return self

    def update(self, X, stocks, y, weights, dates, times):
        self.model.learning_rate = self.lr_refit
        self.model.fit(X, y, sample_weight=weights, feature_name=self.features, init_model=self.model,
                       callbacks=[lgb.callback.log_evaluation(1)])

    def predict(self, X: np.array, *args, **kwargs) -> np.array:
        return self.model.predict(X), None

    def get_feature_importances(self) -> None:
        imp_df = pd.DataFrame({
            "feature": self.features,
            "imp": self.model.feature_importances_
        })
        imp_df.sort_values("imp", ascending=False, inplace=True)
        imp_df["imp_acc"] = imp_df["imp"].cumsum() / imp_df["imp"].sum()
        imp_df.set_index("feature", inplace=True)
        return imp_df

    def set_seed(self, seed: int) -> None:
        self.params["random_state"] = seed

    def get_params(self, deep: bool = True):
        return {
            "params": self.params,
            "early_stopping": self.early_stopping,
            "early_stopping_rounds": self.early_stopping_rounds,
            "test_size": self.test_size,
            "features": self.features,
            "shuffle": self.shuffle,
        }

    def set_params(self, **parameters):
        for parameter, value in parameters.items():
            setattr(self, parameter, value)
        return self

class CBM(BaseEstimator, RegressorMixin):
    """CBM model."""

    def __init__(self, params: dict, early_stopping: bool = True, early_stopping_rounds: int = 50,
                 test_size: float = 0.05, features: str = None, shuffle: bool = False) -> None:
        self.params = params
        self.early_stopping = early_stopping
        self.early_stopping_rounds = early_stopping_rounds
        self.test_size = test_size
        self.features = features
        self.shuffle = shuffle
        self.model = cb.CatBoostRegressor(**params, eval_metric=r2_cbt(), early_stopping_rounds=early_stopping_rounds)
        self.cat_cols = None

    def fit(self, train_set, val_set, test_set=None, cat_cols: list = None, verbose: bool = False) -> None:
        X_train, y_train, weights_train, _, _, _ = train_set
        self.cat_cols = cat_cols
        if not self.early_stopping:
            train_pool = cb.Pool(X_train, y_train, weight=weights_train, cat_features=cat_cols)
            eval_set = None
        else:
            X_val, y_val, weights_val, _, _, _ = val_set
            train_pool = cb.Pool(X_train, y_train, weight=weights_train, cat_features=cat_cols)
            val_pool = cb.Pool(X_val, y_val, weight=weights_val, cat_features=cat_cols)
            eval_set = val_pool
        self.model.set_feature_names(self.features)
        self.model.fit(train_pool, eval_set=eval_set, verbose=1 if verbose else 0)
        return self

    def predict(self, X: np.array, *args, **kwargs) -> np.array:
        return self.model.predict(X)

    def get_feature_importances(self) -> None:
        imp_df = pd.DataFrame({
            "feature": self.features,
            "imp": self.model.feature_importances_
        })
        imp_df.sort_values("imp", ascending=False, inplace=True)
        imp_df["imp_acc"] = imp_df["imp"].cumsum() / imp_df["imp"].sum()
        imp_df.set_index("feature", inplace=True)
        return imp_df

    def set_seed(self, seed: int) -> None:
        self.params["random_state"] = seed

    def get_params(self, deep: bool = True):
        return {
            "params": self.params,
            "early_stopping": self.early_stopping,
            "early_stopping_rounds": self.early_stopping_rounds,
            "test_size": self.test_size,
            "features": self.features,
            "shuffle": self.shuffle,
        }

    def set_params(self, **parameters):
        for parameter, value in parameters.items():
            setattr(self, parameter, value)
        return self

class XGBM(BaseEstimator, RegressorMixin):
    """XGBoost model."""

    def __init__(self, params: dict, early_stopping: bool = True, early_stopping_rounds=50,
                 test_size: float = 0.05, features: str = None, shuffle: bool = False):
        self.params = params
        self.early_stopping = early_stopping
        self.early_stopping_rounds = early_stopping_rounds
        self.test_size = test_size
        self.features = features
        self.shuffle = shuffle
        self.model = xgb.XGBRegressor(**params, early_stopping_rounds=early_stopping_rounds)

    def fit(self, X: np.array, y: np.array, weights: np.array, verbose: bool = False) -> None:
        if not self.early_stopping:
            X_train, y_train, weights_train = X, y, weights
            eval_set = [(X_train, y_train)]
            eval_sample_weight = [weights_train]
        else:
            X_train, X_val, y_train, y_val, weights_train, weights_val = train_test_split(
                X, y, weights, test_size=self.test_size, shuffle=self.shuffle,
                random_state=RANDOM_SEED if self.shuffle else None
            )
            eval_set = [(X_train, y_train), (X_val, y_val)]
            eval_sample_weight = [weights_train, weights_val]
        self.model.fit(X_train, y_train, sample_weight=weights_train,
                       eval_set=eval_set, eval_sample_weight=eval_sample_weight,
                       feature_names=self.features, verbose=50 if verbose else 0)
        return self

    def predict(self, X: np.array) -> np.array:
        return self.model.predict(X)

    def set_seed(self, seed: int) -> None:
        self.params["random_state"] = seed

    def get_params(self, deep: bool = True):
        return {
            "params": self.params,
            "early_stopping": self.early_stopping,
            "early_stopping_rounds": self.early_stopping_rounds,
            "test_size": self.test_size,
            "features": self.features,
            "shuffle": self.shuffle,
        }

    def set_params(self, **parameters):
        for parameter, value in parameters.items():
            setattr(self, parameter, value)
        return self


### 4. Custom Pipeline Classes

* Manage model training, ensembling, and cross-validation on time series data, including:
    * FullPipeline: Manages model training, saving/loading, and real-time updates.
    * PipelineEnsemble: Averages predictions from multiple models.
    * PipelineCV: Handles time series cross-validation.


In [None]:
class FullPipeline:
    """
    Custom pipeline for model management and time series training.
    """
    def __init__(self, model: BaseEstimator, preprocessor=None, run_name: str = "", name: str = "",
                 load_model: bool = False, features: list | None = None, save_to_disc: bool = True,
                 refit=True, change_lr=False, col_target=COL_TARGET) -> None:
        self.model = model
        self.preprocessor = preprocessor
        self.name = name
        self.load_model = load_model
        self.features = features
        self.save_to_disc = save_to_disc
        self.refit = refit
        self.change_lr = change_lr
        self.col_target = col_target

        self.responders = [i for i in COLS_RESPONDERS if i != self.col_target]

        self.set_run_name(run_name)
        self.path = os.path.join(PATH_MODELS, f"{self.run_name}")

    def set_run_name(self, run_name: str) -> None:
        self.run_name = run_name
        self.path = os.path.join(PATH_MODELS, f"{self.run_name}")
        if self.save_to_disc:
            create_folder(self.path)

    def fit(self, df: pl.DataFrame | None = None, df_valid: pl.DataFrame | None = None, verbose: bool = False) -> None:
        if not self.load_model:
            self.model.features = self.features

            weights_train = df.select(COL_WEIGHT).to_series().to_numpy()
            dates_train = df.select(COL_DATE).to_series().to_numpy()
            times_train = df.select(COL_TIME).to_series().to_numpy()
            stocks_train = df.select(COL_ID).to_series().to_numpy()

            weights_valid = df_valid.select(COL_WEIGHT).to_series().to_numpy()
            dates_valid = df_valid.select(COL_DATE).to_series().to_numpy()
            times_valid = df_valid.select(COL_TIME).to_series().to_numpy()
            stocks_valid = df_valid.select(COL_ID).to_series().to_numpy()

            if self.preprocessor is not None:
                df = self.preprocessor.fit_transform(df)
                df_valid = self.preprocessor.transform(df_valid)

            X_train = df.select(self.features).to_numpy()
            resp_train = df.select(self.responders).to_numpy()
            y_train = df.select(self.col_target).to_series().to_numpy()

            X_valid = df_valid.select(self.features).to_numpy()
            resp_valid = df_valid.select(self.responders).to_numpy()
            y_valid = df_valid.select(self.col_target).to_series().to_numpy()

            train_set = (X_train, resp_train, y_train, weights_train, stocks_train, dates_train, times_train)
            val_set = (X_valid, resp_valid, y_valid, weights_valid, stocks_valid, dates_valid, times_valid)

            del df, df_valid
            gc.collect()

            self.model.fit(train_set, val_set, verbose)
            if self.save_to_disc:
                self.save()
        else:
            self.load()

    def predict(self, df: pl.DataFrame, hidden: torch.Tensor | list | None = None, n_times: int | None = None) -> tuple[np.ndarray, torch.Tensor | list]:
        if n_times is None:
            n_times = len(df.select(COL_TIME).unique())
        if self.preprocessor is not None:
            df = self.preprocessor.transform(df)
        X = df.select(self.features).to_numpy()
        preds, hidden = self.model.predict(X, hidden=hidden, n_times=n_times)
        preds = np.clip(preds, -5, 5)  # per competition requirements
        return preds, hidden

    def update(self, df: pl.DataFrame) -> None:
        weights = df.select(COL_WEIGHT).to_series().to_numpy()
        n_times = len(df.select(COL_TIME).unique())
        if self.preprocessor is not None:
            df = self.preprocessor.transform(df, refit=True)

        X = df.select(self.features).to_numpy()
        y = df.select(self.col_target).to_series().to_numpy()
        self.model.update(X, y, weights, n_times)

    def load(self) -> None:
        if self.change_lr:
            lr_refit = self.model.lr_refit
        self.model = joblib.load(f"{self.path}/model_{self.name}.joblib")
        self.features = self.model.features
        if self.change_lr:
            self.model.lr_refit = lr_refit
        try:
            self.preprocessor = joblib.load(f"{self.path}/preprocessor_{self.name}.joblib")
        except FileNotFoundError:
            self.preprocessor = None
            print("WARNING: Preprocessor not found.")

    def save(self) -> None:
        joblib.dump(self.model, f"{self.path}/model_{self.name}.joblib")
        if self.preprocessor is not None:
            joblib.dump(self.preprocessor, f"{self.path}/preprocessor_{self.name}.joblib")

    def get_params(self, deep: bool = True) -> dict:
        return {
            "model": self.model,
            "preprocessor": self.preprocessor,
            "name": self.name,
            "load_model": self.load_model,
            "features": self.features,
            "save_to_disc": self.save_to_disc,
            "refit": self.refit,
            "change_lr": self.change_lr,
            "col_target": self.col_target,
        }

    def set_params(self, **parameters):
        for parameter, value in parameters.items():
            setattr(self, parameter, value)
        return self

class PipelineEnsemble:
    """
    Ensemble pipeline for aggregating predictions from multiple models.
    """
    def __init__(self, models: list, weights: np.array = None, refit_models: list[bool] = None, col_target: str = COL_TARGET) -> None:
        self.models = models
        self.weights = weights if weights is not None else np.ones(len(self.models))
        self.refit_models = refit_models if refit_models is not None else [True] * len(models)
        self.col_target = col_target
        self.refit = True

    def fit(self, df: pl.DataFrame | None = None, df_valid: pl.DataFrame | None = None, verbose: bool = False) -> None:
        self.weights = np.array(self.weights) / sum(self.weights)
        for model in self.models:
            model.fit(df, df_valid, verbose)

    def set_run_name(self, run_name: str) -> None:
        for model in self.models:
            model.set_run_name(run_name)

    def predict(self, df: pl.DataFrame, hidden_ls=None) -> np.ndarray:
        if hidden_ls is None:
            hidden_ls = [None] * len(self.models)

        preds = []
        for i, model in enumerate(self.models):
            preds_i, hidden_ls[i] = model.predict(df, hidden=hidden_ls[i])
            preds.append(preds_i)

        preds = np.average(preds, axis=0, weights=self.weights)
        return preds, hidden_ls

    def update(self, df: pl.DataFrame) -> None:
        for i, model in enumerate(self.models):
            if self.refit_models[i]:
                model.update(df)

    def load(self) -> None:
        for model in self.models:
            model.model.load()

    def save(self) -> None:
        for model in self.models:
            model.model.save()

    def get_params(self, deep: bool = True) -> dict:
        return {
            "models": self.models,
            "weights": self.weights,
            "refit_models": self.refit_models,
            "col_target": self.col_target,
        }

    def set_params(self, **parameters):
        for parameter, value in parameters.items():
            setattr(self, parameter, value)
        return self

class PipelineCV:
    """
    Cross-validation pipeline for time series models.
    """
    def __init__(self, model: FullPipeline, tracker: WandbTracker, n_splits: int, train_size: int = False) -> None:
        self.model = model
        self.tracker = tracker
        self.n_splits = n_splits
        self.train_size = train_size
        self.models = []

    def fit(self, df: pl.DataFrame, verbose: bool = False) -> list:
        dates_unique = df.select(pl.col(COL_DATE).unique().sort()).to_series().to_numpy()

        test_size = (
            TEST_SIZE
            if len(dates_unique) > TEST_SIZE * (self.n_splits + 1)
            else len(dates_unique) // (self.n_splits + 1)
        )
        cv = TimeSeriesSplit(
            n_splits=self.n_splits,
            test_size=test_size,
            max_train_size=self.train_size
        )
        cv_split = cv.split(dates_unique)

        scores = []
        for fold, (train_idx, valid_idx) in enumerate(cv_split):
            if verbose:
                print("-" * 20 + f"Fold {fold}" + "-" * 20)
                print(f"Train dates from {dates_unique[train_idx].min()} to {dates_unique[train_idx].max()}")
                print(f"Valid dates from {dates_unique[valid_idx].min()} to {dates_unique[valid_idx].max()}")

            dates_train = dates_unique[train_idx]
            dates_valid = dates_unique[valid_idx]

            df_train = df.filter(pl.col(COL_DATE).is_in(dates_train))
            df_valid = df.filter(pl.col(COL_DATE).is_in(dates_valid))

            model_fold = clone(self.model)
            model_fold.set_run_name(f"fold{fold}")
            model_fold.fit(df_train, df_valid, verbose=verbose)

            self.models.append(model_fold)

            preds = []
            cnt_dates = 0
            model_save = copy.deepcopy(model_fold)
            for date_id in tqdm(dates_valid):
                df_valid_date = df_valid.filter(pl.col(COL_DATE) == date_id)

                if model_fold.refit & (cnt_dates > 0):
                    df_upd = df.filter(pl.col(COL_DATE) == date_id - 1)
                    if len(df_upd) > 0:
                        model_save.update(df_upd)

                preds_i, _ = model_save.predict(df_valid_date)
                preds += list(preds_i)
                cnt_dates += 1
            preds = np.array(preds)

            df_valid = df_valid.fill_null(0.0)
            y_true = df_valid.select(pl.col(model_fold.col_target)).to_series().to_numpy()
            weights = df_valid.select(pl.col(COL_WEIGHT)).to_series().to_numpy()
            score = r2_weighted(y_true, preds, weights)
            scores.append(score)

            print(f"R2: {score:.5f}")
            if self.tracker:
                self.tracker.log_metrics({f"fold_{fold}": score})

        if self.tracker:
            self.tracker.log_metrics({"cv": np.mean(scores)})
        return scores

    def load(self) -> None:
        self.models = []
        for i in range(self.n_splits):
            model = clone(self.model)
            model.set_run_name(f"fold{i}")
            model.fit()
            self.models.append(model)


#### Instantiate Data Processor

In [None]:
# Instantiate the data processor (handles cleaning and feature engineering)
data_processor = DataProcessor(MODEL_NAMES[0]).load()

# Instantiate the pipeline objects for each model.
pipelines = {}
for model_name in MODEL_NAMES:
    pipeline = FullPipeline(
        None,
        run_name=RUN_NAME,
        name=model_name,
        load_model=True,
        features=None,
        save_to_disc=False
    )
    pipeline.fit(verbose=True)
    pipelines[model_name] = pipeline

    print("-" * 100)
    print(model_name)
    print(pipeline.model.get_params())
    print(f"Number of features: {len(pipeline.features)}")
    print(pipeline.model.model.num_resp)

df_raw = pl.scan_parquet(f"{PATH_DATA}/train.parquet")
df_raw = df_raw.filter(pl.col("date_id") >= MAX_DATE - 10)
df_raw = df_raw.collect()
df_raw = df_raw.with_columns(
    pl.lit(-1).cast(pl.Int64).alias("row_id"),
    pl.lit(True).alias("is_scored"),
    (pl.col("date_id") - MAX_DATE - 1).alias("date_id")
)
df_raw = df_raw.select(COLS_ID + data_processor.COLS_FEATURES_INIT)

df_raw = (
    df_raw.filter(pl.col("date_id") >= -5)
    .sort(['date_id', 'time_id', 'symbol_id'])
)

# Global variables used by the inference function
hidden_states = [None] * len(pipelines)
dfs = []

time_start = time.time()
time_start_not_scored = time.time()
time_est = 0
time_est_not_scored = 0
cnt_dates = 0

def predict(test: pl.DataFrame, lags: pl.DataFrame | None) -> pl.DataFrame | pd.DataFrame:
    """Make a prediction given the latest test data and lag features."""
    start_time = time.time()
    
    global df_raw, hidden_states, pipelines, dfs, time_est, time_start, time_start_not_scored, time_est_not_scored, cnt_dates
    
    date_id = test["date_id"][0]
    time_id = test["time_id"][0]
    is_scored = test["is_scored"][0]

    # For debugging purposes: measure elapsed time
    if DEBUG:
        if not is_scored:
            time_est_not_scored = time.time() - time_start_not_scored
        else:
            time_est = time.time() - time_start

        if time_id == 0:
            print("-" * 100)
            if date_id == 1:
                time_start_not_scored = time.time()
            if date_id == CNT_DATES_NOT_SCORED: 
                time_start = time.time()

    # Reset hidden states and accumulate data for weight updates
    if time_id == 0:
        cnt_dates += 1
        hidden_states = [None for _ in pipelines]
        lags = lags.with_columns(
            pl.col("responder_6_lag_1").alias("responder_6"),
            pl.lit(date_id - 1).cast(pl.Int16).alias("date_id")
        ).select(["date_id", "time_id", "symbol_id", "responder_6"])
        if cnt_dates > 1:
            df = pl.concat(dfs)  # append the previous test data
            dfs = []            # reset the container
            df = df.join(lags, on=["date_id", "time_id", "symbol_id"], how="left")
            df = df.sort(["date_id", "time_id", "symbol_id"])

    # Append new test data to raw data and trim to the latest N_ROLL observations per symbol
    test = test.select(df_raw.columns)
    df_raw = pl.concat([df_raw, test], how="vertical_relaxed")
    df_raw = df_raw.select(test.columns)
    df_raw = (
        df_raw
        .group_by(["symbol_id"])
        .tail(N_ROLL)
    )

    # Process current features using the data processor
    df_cur = data_processor.process_test_data(df_raw, fast=True, date_id=date_id, time_id=time_id, symbols=test["symbol_id"])
    df_cur = df_cur.sort(["symbol_id"])
    dfs.append(df_cur)
    df_cur = df_cur.with_columns(pl.lit(None).alias("responder_6"))
    
    # Update model weights with accumulated data (if enough data is available)
    if (time_id == 0) & (cnt_dates > 1):
        if len(df) > 968:  # need more than 1 day of data to update
            for i, (name, pipeline) in enumerate(pipelines.items()):
                pipeline.update(df)

    # Generate predictions only if the test sample is scored
    if is_scored:
        preds = []
        for i, (name, pipeline) in enumerate(pipelines.items()):
            pred, hidden_states[i] = pipeline.predict(df_cur, hidden=hidden_states[i], n_times=1)
            preds.append(pred)
        pred = np.average(preds, axis=0, weights=WEIGHTS)
    
        df_cur = df_cur.with_columns(pl.Series("responder_6", pred))
        df_cur = test.select(["date_id", "time_id", "symbol_id"]).join(df_cur, on=["date_id", "time_id", "symbol_id"], how="left")
        predictions = df_cur.select(["row_id", "responder_6"])
    else:
        predictions = test.select(
            'row_id',
            pl.lit(0.0).alias('responder_6'),
        )

    if DEBUG:
        if time_id % 100 == 0:
            n_nans = sum(sum(predictions.fill_nan(None).null_count().to_numpy()))
            print(
                f"{date_id} {time_id:3.0f} (is_scored {is_scored}): "
                f"time elps {time.time() - start_time:.4f}, # nans {n_nans}"
            )
    else:
        if (time_id == 0) & (date_id == 0):
            print(predictions)
            print((time_id, time.time() - start_time))
    
    return predictions

# Initialize the inference server using our predict function
inference_server = jane_street_inference_server.JSInferenceServer(predict)

if os.getenv('KAGGLE_IS_COMPETITION_RERUN'):
    inference_server.serve()
else:
    if not DEBUG:
        inference_server.run_local_gateway(
            (
                f'{PATH_DATA}/test.parquet',
                f'{PATH_DATA}/lags.parquet',
            )
        )
    else:
        inference_server.run_local_gateway(
            (
                '/kaggle/input/js24-rmf-submission-api-debug-with-synthetic-test/synthetic_test.parquet',
                '/kaggle/input/js24-rmf-submission-api-debug-with-synthetic-test/synthetic_lag.parquet',
            )
        )

if DEBUG:
    time_est_cur = time_est / (CNT_DATES - CNT_DATES_NOT_SCORED) * 200 / 60 / 60
    time_est_scored = time_est / (CNT_DATES - CNT_DATES_NOT_SCORED) * 120 / 60 / 60
    time_est_not_scored = time_est_not_scored / (CNT_DATES_NOT_SCORED - 1) * 240 / 60 / 60
    time_est_final = time_est_scored + time_est_not_scored
    print("-" * 100)
    print(f"Estimated current time: {time_est_cur:.4f}")
    print(f"Estimated final time (is_score=True): {time_est_scored:.4f}")
    print(f"Estimated final time (is_score=False): {time_est_not_scored:.4f}")
    print(f"Estimated final time: {time_est_final:.4f}")


### 5. Model Analysis

* Functions for in‐depth model analysis.
* Perform feature selection using neural networks.
* Calculate null importances to help distinguish signal from noise
* Apply adversarial validation


In [None]:
def select_features(
        features: list,
        score_func: callable,
        features_all: list | None = None,
        method: str = "forward",
        fast: bool = False,
        superfast: bool = False,
        shuffle: bool = False,
        chunk_size: int = 1,
        threshold: float = 0.0
):
    """
    Backward/forward feature selection.
    """
    def chunks(lst, chunk_size):
        for i in range(0, len(lst), chunk_size):
            yield lst[i:i + chunk_size]

    features_incl = features.copy()
    features_selected = []
    if method == "forward":
        features_left = [i for i in features_all if i not in features_incl]
    else:
        features_left = features_all.copy()

    res_df = pd.DataFrame(index=features_left)

    if len(features_incl) > 0:
        score = score_func(features=features_incl)
        score_best = score
        print(f"Initial score: {score_best:.4f}")
    else:
        score_best = 0.0
        print(f"No initial features. Initial score: {score_best:.4f}")

    cnt = 0
    while True:
        res_df[cnt] = np.nan
        feature_selected = None
        if shuffle:
            random.shuffle(features_left)
        pbar = tqdm(
            chunks(features_left, chunk_size),
            total=len(features_left) // chunk_size
        )
        cnt1 = 0
        for f in pbar:
            features_tmp = features_incl.copy()
            if method == "forward":
                features_tmp.extend(f)
            else:
                for i in f:
                    features_tmp.remove(i)
            score = score_func(features=features_tmp)

            res_df.loc[f, cnt] = score

            cnt1 += len(f)

            print(f"score {score:.4f}: {f}")

            if score - score_best > threshold:
                score_best = score
                feature_selected = f
                if fast:
                    break

        if feature_selected is not None:
            if method == "forward":
                features_incl.extend(feature_selected)
            else:
                for i in feature_selected:
                    features_incl.remove(i)
            features_selected.extend(feature_selected)
            if superfast:
                features_left = features_left[cnt1:]
            else:
                for i in feature_selected:
                    features_left.remove(i)
            print(f"Score {score_best:.4f}. {feature_selected} was selected.")
            print(features_selected)
        else:
            print(f"Score {score_best:.4f}. No good features left. Stopping.")
            break

        cnt += 1

    return res_df


def get_null_imp(
    df: pd.DataFrame,
    features: list,
    model_name: str,
    params: dict,
    n: int = 10
) -> pd.DataFrame:
    """
    Get null importances for the features.
    """
    params_null = params.copy()
    params_null["n_estimators"] = 300
    model = LGBM(
        params_null,
        early_stopping=False,
        test_size=0.01,
        shuffle=True
    )
    imps = []
    for i in range(n):
        df["target_shuffled"] = df["target"].sample(frac=1, random_state=i).values
        name = f"{model_name}_null_{i}"
        pipeline = FullPipeline(
            Pipeline(steps=[('model', model)]),
            run_name="full",
            name=name,
            load_model=False,
            features=features,
            target_col="target_shuffled",
        )
        pipeline.fit(df, verbose=True)

        feat_imp = pipeline.model["model"].get_feature_importances()
        suffixed_importances = feat_imp[["imp"]].add_suffix(f"_{i}")
        imps.append(suffixed_importances)

    imp_df = pd.concat(imps, axis=1)
    return imp_df


def select_features_null_imp(
    version: str,
    df: pd.DataFrame,
    n_splits: int = 2
) -> None:
    """
    Select features based on null importances.
    """
    features_all = np.array(
        [i for i in df.columns if i not in [COL_ID, COL_DATE, COL_TIME, COL_TARGET]]
    )
    random.shuffle(features_all)
    n = n_splits
    part_size = len(features_all) // n
    remainder = len(features_all) % n
    indices = [(i + 1) * part_size + (1 if i < remainder else 0) for i in range(n)]
    feature_sets = [part.tolist() for part in np.array_split(features_all, indices[:-1])]

    model_name = f"lgb_version_{version}_nullimp"

    params_lgb = {
        'boosting_type': 'gbdt',
        'colsample_bynode': 0.8,
        'colsample_bytree': 0.8,
        'extra_trees': True,
        'learning_rate': 0.1,
        'max_depth': 10,
        'metric': 'auc',
        'n_estimators': 4000,
        'num_leaves': 64,
        'objective': 'binary',
        'random_state': 42,
        'reg_alpha': 10,
        'reg_lambda': 10,
        "device": "gpu",
        'verbose': -1,
        "max_bin": 150,
    }
    imps = []
    imps_null = []
    model = LGBM(
        params_lgb,
        early_stopping_rounds=50,
        test_size=0.01,
        shuffle=True
    )
    for i, features in tqdm(enumerate(feature_sets), total=len(feature_sets)):
        name = f"{model_name}_full_{i}"
        print(f"Number of features {len(features)}")
        pipeline = FullPipeline(
            Pipeline(steps=[('model', model)]),
            run_name="full",
            name=name,
            load_model=False,
            features=features
        )
        df_sample = df.sample(frac=0.5, random_state=i)
        pipeline.fit(df_sample, verbose=True)
        imp_df = pipeline.model["model"].get_feature_importances()
        imp_i_df = imp_df[["imp"]]
        imps.append(imp_i_df)

        imp_null_i_df = get_null_imp(
            df_sample,
            features,
            f"{model_name}_full_{i}",
            params_lgb,
            n=10
        )
        imps_null.append(imp_null_i_df)

    imp_df = pd.concat(imps, axis=0)
    imp_null_df = pd.concat(imps_null, axis=0)

    imp_df["imp_null_75"] = imp_null_df.quantile(0.75, axis=1)
    imp_df["imp_final"] = imp_df["imp"] / (imp_df["imp_null_75"] + 0.01)
    imp_df.sort_values("imp_final", ascending=False, inplace=True)
    features_selected = imp_df[(imp_df["imp_final"] > 1) & (imp_df["imp"] > 1)].index.to_list()

    imp_df.to_csv(f"{base_path}/analysis/imp_null_{version}.csv")

    return features_selected


def select_features_adv(
    version: str,
    df: pd.DataFrame,
    features: list
) -> list:
    """
    Select features based on adversarial validation.
    """
    df["covid"] = 0
    df.loc[df["WEEK_NUM"] >= 65, "covid"] = 1

    df_train, df_valid = train_test_split(df, random_state=42)

    params_lgb = {
        'boosting_type': 'gbdt',
        'objective': 'binary',
        "n_estimators": 20,
        'random_state': 42,
        "device": "gpu",
        'verbose': -1,
    }
    model = LGBM(
        params_lgb,
        early_stopping_rounds=200,
        test_size=0.01,
        shuffle=True
    )

    print(f"Number of features: {len(features)}")
    res_df = pd.DataFrame(index=features, columns=["score"])

    for f in tqdm(features):
        pipeline = FullPipeline(
            Pipeline(steps=[('model', model)]),
            run_name="adv",
            name="test",
            load_model=False,
            features=[f],
            target_col="covid"
        )
        try:
            pipeline.fit(df_train)
        except:
            continue
        preds = pipeline.predict(df_valid)
        score = roc_auc_score(df_valid["covid"], preds)
        res_df.loc[f, "score"] = score

    res_df.to_csv(f"{base_path}/analysis/adv_val_features_{version}.csv")
    features_stable = res_df.loc[res_df["score"] < 0.65].index.to_list()
    return features_stable


### 6. Custom Data Processor and Feature Engineering

* Custom data processor handles both the cleaning and advanced feature engineering. 
* Add rolling averages, standard deviations, and market-level statistics using Polars for fast computations.


In [None]:
class DataProcessor:
    """
    Custom data processor for feature engineering and transformation.
    
    This class includes methods for adding rolling differences, standard deviations, and market averages.
    """
    PATH = os.path.join(PATH_MODELS, "data_processors")

    COLS_FEATURES_INIT = [f"feature_{i:02d}" for i in range(79)]

    COLS_FEATURES_CORR = [
        'feature_06',
        'feature_04',
        'feature_07',
        'feature_36',
        'feature_60',
        'feature_45',
        'feature_56',
        'feature_05',
        'feature_51',
        'feature_19',
        'feature_66',
        'feature_59',
        'feature_54',
        'feature_70',
        'feature_71', 
        'feature_72',
    ]
    COLS_FEATURES_CAT = ["feature_09", "feature_10", "feature_11"]

    T = 1000

    def __init__(self, name: str, skip_days: int = None, transformer: PolarsTransformer | None = None):
        self.name = name
        self.skip_days = skip_days
        self.transformer = transformer

        self.features = list(self.COLS_FEATURES_INIT)
        self.features += [f"{i}_diff_rolling_avg_{self.T}" for i in self.COLS_FEATURES_CORR]
        self.features += [f"{i}_rolling_std_{self.T}" for i in self.COLS_FEATURES_CORR]
        self.features += [f"{i}_avg_per_date_time" for i in self.COLS_FEATURES_CORR]
        self.features += ["feature_time_id"]
        self.features = [i for i in self.features if i not in self.COLS_FEATURES_CAT]

        utils.create_folder(self.PATH)

    def get_train_data(self) -> pl.DataFrame:
        df = self._load_data().collect()

        df = df.with_columns(
            (
                pl.col("responder_8") + pl.col("responder_8").shift(-4).over("symbol_id")
            ).fill_null(0.0).alias("responder_9"),
            (
                pl.col("responder_6")
                + pl.col("responder_6").shift(-20).over("symbol_id")
                + pl.col("responder_6").shift(-40).over("symbol_id")
            ).fill_null(0.0).alias("responder_10"),
        )

        df = self._add_features(df)

        if self.transformer is not None:
            self.transformer.set_features(self.features)
            df = self.transformer.fit_transform(df)

        self._save()
        return df

    def process_test_data(self, df: pl.DataFrame, fast: bool = False, date_id: int = 0, time_id: int = 0, symbols: list = None) -> pl.DataFrame:
        df = self._add_features(df, fast=fast, date_id=date_id, time_id=time_id, symbols=symbols)
        if self.transformer is not None:
            df = self.transformer.transform(df, refit=True)
        return df

    def _save(self):
        joblib.dump(self, f"{self.PATH}/{self.name}.joblib")

    def load(self):
        return joblib.load(f"{self.PATH}/{self.name}.joblib")

    def _load_data(self) -> pl.DataFrame:
        df = pl.scan_parquet(f'{PATH_DATA}/train.parquet')
        df = df.drop("partition_id")
        if self.skip_days is not None:
            df = df.filter(pl.col("date_id") >= self.skip_days)
        return df

    def _add_features(self, df: pl.DataFrame, fast: bool = False, date_id: int | None = None, time_id: int | None = None, symbols: list = None) -> pl.DataFrame:
        df = self._get_window_average_std(df, self.COLS_FEATURES_CORR, n=self.T, fast=fast, date_id=date_id, time_id=time_id, symbols=symbols)
        df = self._get_market_average(df, self.COLS_FEATURES_CORR, fast=fast)

        df = df.with_columns(pl.col("time_id").alias("feature_time_id"))
        return df

    def _get_window_average_std(self, df: pl.DataFrame, cols: list, n: int = 1000, fast: bool = False, date_id: int | None = None, time_id: int | None = None, symbols: list = None) -> pl.DataFrame:
        if not fast:
            df = df.with_columns([
                pl.col(col).rolling_mean(window_size=n).over(["symbol_id"]).alias(f"{col}_rolling_avg_{n}")
                for col in cols
            ] + [
                pl.col(col).rolling_std(window_size=n).over(["symbol_id"]).alias(f"{col}_rolling_std_{n}")
                for col in cols
            ])
        else:
            df = df.group_by("symbol_id").agg([
                pl.col(col).mean().alias(f"{col}_rolling_avg_{n}")
                for col in cols
            ] + [
                pl.col(col).std().alias(f"{col}_rolling_std_{n}")
                for col in cols
            ] + [
                pl.col(col).last().alias(col)
                for col in self.COLS_FEATURES_INIT + ["row_id", "weight", "is_scored"]
            ]).filter(pl.col("symbol_id").is_in(symbols))
            df = df.with_columns(
                pl.lit(date_id).cast(pl.Int16).alias("date_id"),
                pl.lit(time_id).cast(pl.Int16).alias("time_id")
            )

        df = df.with_columns([
            (pl.col(col) - pl.col(f"{col}_rolling_avg_{n}")).alias(f"{col}_diff_rolling_avg_{n}")
            for col in cols
        ])
        df = df.drop([f"{col}_rolling_avg_{n}" for col in cols])
        return df

    def _get_market_average(self, df: pl.DataFrame, cols: list, fast: bool = False) -> pl.DataFrame:
        if not fast:
            df = df.with_columns([
                pl.col(col).mean().over(["date_id", "time_id"]).alias(f"{col}_avg_per_date_time")
                for col in cols
            ])
        else:
            df = df.with_columns([
                pl.col(col).mean().alias(f"{col}_avg_per_date_time")
                for col in cols
            ])
        return df


### 7. Datasets Utility Functions

* Helper functions to update and upload Kaggle datasets. 
* These utilities allow you to package the models and code and then update the dataset.


In [None]:
def update_dataset(dataset_id: str, source_path: str) -> None:
    """
    Updates a Kaggle dataset with specific subfolders.
    """
    original_source_path = Path(source_path)
    temp_dir = Path("/tmp/kaggle_dataset_temp/models")

    if temp_dir.exists():
        shutil.rmtree(temp_dir)
    temp_dir.mkdir(parents=True)

    for folder_name in ["full", "data_processors"]:
        src = original_source_path / folder_name
        dest = temp_dir / folder_name
        if src.exists():
            shutil.copytree(src, dest)
        else:
            print(f"Warning: Folder '{src}' does not exist and won't be uploaded.")

    print(f"Files in temp_dir ({temp_dir}):")
    for root, _, files in os.walk(temp_dir):
        for file in files:
            print(os.path.join(root, file))

    metadata = {"id": f"{KAGGLE_USERNAME}/{dataset_id}"}
    metadata_path = temp_dir / 'dataset-metadata.json'
    with open(metadata_path, 'w', encoding='utf-8') as f:
        json.dump(metadata, f)

    command = f"kaggle datasets version -p '{temp_dir}' -m 'Updated dataset' -r zip"
    print(f"Running command: {command}")
    os.system(command)

    shutil.rmtree(temp_dir)


def upload_code(dataset_id: str, source_path: str) -> None:
    """
    Uploads a Python package to a Kaggle dataset.
    """
    os.chdir(base_path)
    run_shell_command("python setup.py sdist bdist_wheel")

    original_source_path = Path(source_path)
    temp_dir = tempfile.mkdtemp()
    shutil.copy(original_source_path, temp_dir)
    source_path = Path(temp_dir)

    metadata = {"id": f"{KAGGLE_USERNAME}/{dataset_id}"}
    metadata_path = source_path / 'dataset-metadata.json'
    with open(metadata_path, 'w', encoding='utf-8') as f:
        json.dump(metadata, f)

    run_shell_command(f"kaggle datasets version -p '{source_path}' -m 'Updated dataset'")

    os.remove(metadata_path)
    shutil.rmtree(temp_dir)


### 8. Project Environment, Evaluation, Logging

* This section includes functions to set up the project environment 
    * loading environment variables, configuring Kaggle API and Weights & Biases, 
    * and a custom tracker class for experiment logging.


In [None]:
def setup_environment(track: bool = False):
    """
    Set up project environment.
    """
    print("Loading environment variables from .env file...")
    from dotenv import load_dotenv
    load_dotenv()

    print("Setting up Kaggle API credentials...")
    run_shell_command("mkdir -p ~/.kaggle")
    run_shell_command(f"cp '{base_path_data}/kaggle.json' ~/.kaggle/")
    run_shell_command("chmod 600 ~/.kaggle/kaggle.json")

    if track:
        print("Setting up Weights & Biases...")
        import wandb
        wandb.login(key=os.environ.get('WANDB_TOKEN'))

    print("Environment setup complete.")


class WandbTracker:
    """
    Custom class for tracking experiments using WandB.
    """
    def __init__(self, run_name: str, params: dict, category: str, comment: str) -> None:
        self.run_name = run_name
        self.params = params
        self.category = category
        self.comment = comment
        self.api = wandb.Api()

    def init_run(self, features: list) -> None:
        config = self.params.copy()
        config.update({
            "model": "lgb",
            "category": self.category,
            "comment": self.comment,
            "n_features": len(features)
        })
        wandb.init(
            project=WANDB_PROJECT,
            name=self.run_name,
            config=config,
            dir=base_path,
            save_code=True
        )
        self.save_features(features)
        print(f"Running {self.run_name} model.")
        print(self.comment)

    def save_features(self, features: list) -> None:
        feature_file_path = "features.txt"
        with open(feature_file_path, "w", encoding="utf-8") as file:
            for feature in features:
                file.write(f"{feature}\n")
        artifact = wandb.Artifact(name=f"{self.run_name}-feature-list", type="dataset")
        artifact.add_file(feature_file_path)
        wandb.log_artifact(artifact)

    def save_data(self, df: pd.DataFrame, name: str) -> None:
        tab = wandb.Table(columns=list(df.columns), data=df.values.tolist())
        wandb.log({name: tab})

    def alert(self, text: str) -> None:
        wandb.alert(
            title=f'Run {self.run_name} finished.',
            text=text,
            level=wandb.AlertLevel.INFO
        )

    def log_metrics(self, metrics: dict) -> None:
        wandb.log(metrics)

    def update_summary(self, run_id: str, summary_params: dict) -> None:
        run = self.api.run(f"eivolkova3/kaggle_home_credit/{run_id}")
        for key, val in summary_params.items():
            run.summary[key] = val
        run.summary.update()

    def update_settings(self, run_id: str, settings_params: dict) -> None:
        run = self.api.run(f"eivolkova3/kaggle_home_credit/{run_id}")
        for key, val in settings_params.items():
            run.settings[key] = val
        run.update()

    def finish(self) -> None:
        wandb.finish()
