<a href="https://colab.research.google.com/github/Victor-Manach/numerai/blob/main/lgbm_walk_forward.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!mkdir -p /etc/OpenCL/vendors && echo "libnvidia-opencl.so.1" > /etc/OpenCL/vendors/nvidia.icd

In [None]:
!pip install -q numerapi pandas pyarrow matplotlib lightgbm==4.0.0 xgboost==1.7.5 scikit-learn cloudpickle==2.2.1 scipy==1.10.1 umap-learn

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m58.9/58.9 kB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.0/3.0 MB[0m [31m47.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.3/200.3 MB[0m [31m7.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m34.4/34.4 MB[0m [31m36.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m88.8/88.8 kB[0m [31m6.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m56.9/56.9 kB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25h[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
dask 2024.10.0 requires cloudpickle>=3.0.0, but you have cloudpickle 2.2.1 which is incompatible.[0m

In [None]:
!pip install -q --no-deps numerai-tools

In [None]:
import re
from numerapi import NumerAPI
import numpy as np
import json
from pathlib import Path
import pandas as pd
from numerai_tools.scoring import numerai_corr
import cloudpickle
import lightgbm as lgb

from typing import List, Dict, Any, Optional, Callable
from tqdm import tqdm
from datetime import datetime
import os

import pyarrow.dataset as ds
import pyarrow as pa

import gc

Dask dataframe query planning is disabled because dask-expr is not installed.

You can install it with `pip install dask[dataframe]` or `conda install dask`.
This will raise in a future version.



In [None]:
def has_gpu():
    gpu_info = !nvidia-smi
    gpu_info = '\n'.join(gpu_info)
    return not gpu_info.find('failed') >= 0

gpu_available = has_gpu()
gpu_available

True

---

In [None]:
def format_era_list(era_list:List[int]) -> List[str]:
    return [f"{era:04d}" for era in era_list]

In [None]:
def get_training_steps(
    total_eras: int,
    eras_per_chunk: int,
    purge_eras: int,
    eras_overlap: int = 0,
    increment_start_era: bool = False
) -> List[Dict[str, Any]]:
    steps = []
    train_start_era = 1
    train_window_size = eras_per_chunk - purge_eras
    train_end_era = train_start_era + train_window_size - 1

    while train_start_era <= total_eras:
        if train_end_era > total_eras:
            train_end_era = total_eras

        purge_start_era = train_end_era + 1
        purge_end_era = train_end_era + purge_eras

        # Adjust purge_end_era if it exceeds total_eras
        if purge_end_era > total_eras:
            purge_end_era = total_eras

        validation_start_era = purge_end_era + 1
        validation_end_era = validation_start_era + eras_per_chunk - 1

        # Adjust validation_start_era and validation_end_era if they exceed total_eras
        if validation_start_era > total_eras:
            validation_eras = []
        else:
            if validation_end_era > total_eras:
                validation_end_era = total_eras
            validation_eras = list(range(validation_start_era, validation_end_era + 1))

        # Create lists of eras for training and purge
        train_eras = list(range(train_start_era, train_end_era + 1))
        purge_eras_list = list(range(purge_start_era, purge_end_era + 1))

        # Append phase details to steps list
        steps.append({
            'train_eras': format_era_list(train_eras),
            'validation_eras': format_era_list(validation_eras),
            'train_end_era': train_end_era,
            'validation_start_era': validation_start_era if validation_eras else None,
            'validation_end_era': validation_end_era if validation_eras else None,
            'purge_eras': format_era_list(purge_eras_list)
        })

        # Check if we've reached the end of the data
        if train_end_era >= total_eras:
            break  # No more data to process

        if increment_start_era:
            # Move both start and end of the training window forward
            train_start_era += eras_per_chunk - eras_overlap
            train_end_era = train_start_era + train_window_size - 1
        else:
            # Only move the end of the training window forward
            train_end_era += eras_per_chunk

    return steps


In [None]:
def downsample_dataset(dataset, columns_to_read:List[str], n:int, eras_to_embargo:Optional[List[str]]=None) -> pd.DataFrame:
    eras = dataset.to_table(columns=['era']).column('era').to_pandas()
    unique_eras = eras.unique().tolist()
    if eras_to_embargo:
        unique_eras = [era_id for era_id in unique_eras if era_id not in eras_to_embargo]
    downsampled_eras = unique_eras[::n]
    filtered_eras = ds.field('era').isin(downsampled_eras)

    downsampled_table = dataset.to_table(filter=filtered_eras, columns=columns_to_read)

    return downsampled_table.to_pandas()

In [None]:
def save_predict_func_to_pkl_file(
    pkl_file_path:Path,
    predict_func:Callable[[pd.DataFrame],pd.DataFrame]
  ) -> None:

  p = cloudpickle.dumps(predict_func)
  with open(pkl_file_path, "wb") as f:
    f.write(p)

  return None

def save_predict_func_to_file_and_check_on_data(
    pkl_file_path:Path,
    predict_func:Callable[[pd.DataFrame],pd.DataFrame],
    input_data:pd.DataFrame
  ) -> None:

  save_predict_func_to_pkl_file(pkl_file_path, predict_func)
  result = predict_func(input_data)

  with open(pkl_file_path, "rb") as f:
    loaded_predict:Callable[[pd.DataFrame], pd.DataFrame] = cloudpickle.load(f)

  loaded_result = loaded_predict(input_data)

  pd.testing.assert_frame_equal(result, loaded_result)

In [None]:
def compute_sample_weights(target: pd.Series) -> pd.Series:
    value_counts = target.value_counts(normalize=True)

    inverse_freq = 1.0 / value_counts

    weights = target.map(inverse_freq)
    weights = weights / weights.sum() * len(target)

    return weights

In [None]:
def train_model_with_walk_forward(
        lgb_params,
        train_dataset,
        training_steps,
        columns_to_read,
        feature_set,
        target_col,
        path_to_save_model,
        model_name,
        save_model:bool=True
    ):

    pbar = tqdm(training_steps, desc='Training Phases', total=len(training_steps))

    model = lgb.LGBMRegressor(**lgb_params)
    previous_model = None

    for step in pbar:
        train_eras = step['train_eras']
        validation_eras = step['validation_eras']
        purge_eras_range = step['purge_eras']

        df_train = train_dataset.to_table(filter=ds.field('era').isin(train_eras), columns=columns_to_read).to_pandas()


        X_train = df_train[feature_set]
        y_train = df_train[target_col]

        sample_weight = compute_sample_weights(df_train[target_col])

        model.fit(
            X_train, y_train,
            sample_weight=sample_weight,
            init_model=previous_model
        )

        y_train_pred = model.predict(X_train)
        df_train['y_pred'] = y_train_pred

        per_era_train_corr = df_train.groupby("era").apply(
            lambda x: numerai_corr(x[["y_pred"]].dropna(), x[target_col].dropna()),
            include_groups=False
        )
        train_corr = per_era_train_corr.sum().values[0]

        if validation_eras:
            df_validation = train_dataset.to_table(filter=ds.field('era').isin(validation_eras), columns=columns_to_read).to_pandas()
            X_valid = df_validation[feature_set]
            y_valid = df_validation[target_col]

            y_valid_pred = model.predict(X_valid)
            df_validation['y_pred'] = y_valid_pred

            per_era_valid_corr = df_validation.groupby("era").apply(
                lambda x: numerai_corr(x[["y_pred"]].dropna(), x[target_col].dropna()),
                include_groups=False
            )
            valid_corr = per_era_valid_corr.sum().values[0]

            pbar.set_postfix({
                'Train Eras': f"{train_eras[0]}-{train_eras[-1]}",
                'Purge Eras': f"{purge_eras_range[0]}-{purge_eras_range[-1]}",
                'Valid Eras': f"{validation_eras[0]}-{validation_eras[-1]}",
                'Train Corr': f"{train_corr:.4f}",
                'Valid Corr': f"{valid_corr:.4f}"
            })

            del df_validation, X_valid, y_valid, y_valid_pred, per_era_valid_corr
        else:
            pbar.set_postfix({
                'Train Eras': f"{train_eras[0]}-{train_eras[-1]}",
                'Train Corr': f"{train_corr:.4f}",
            })

        previous_model = model.booster_


        # Clean up
        del df_train, X_train, y_train, y_train_pred, per_era_train_corr,
        gc.collect()

    pbar.close()
    if save_model:
        model.booster_.save_model(path_to_save_model / f'{model_name}.txt')
    return model

In [None]:
def compute_and_plot_correlation_with_target(
        model,
        dataset,
        columns_to_read,
        feature_set,
        target_col:str,
        mode:str,
        model_name:str,
        n=None
    ):
    print(f'Metrics for model `{model_name}`...')

    if n is None:
        df = dataset.to_table(columns=columns_to_read)
    else:
        df = downsample_dataset(dataset, columns_to_read, n=8)

    y_pred = model.predict(df[feature_set])
    df['y_pred'] = y_pred
    print(df['y_pred'].describe())

    per_era_corr = df.groupby("era").apply(
        lambda x: numerai_corr(x[["y_pred"]].dropna(), x[target_col].dropna()),
        include_groups=False
    )
    per_era_corr.plot(
        title=f"{mode.capitalize()} CORR",
        kind="bar",
        figsize=(9, 4),
        xticks=[],
        legend=False,
        snap=False
    )

    per_era_corr.cumsum().plot(
    title=f"Cumulative {mode.capitalize()} CORR",
    kind="line",
    figsize=(8, 4),
    legend=False
    )
    del df, y_pred, per_era_corr

    gc.collect()
    return None

In [None]:
def get_max_era(dataset, data_types):
    unique_era_values = set()
    era_pattern = re.compile(r'era=([^/\\]+)')

    for fragment in dataset.get_fragments():
        path = fragment.path

        if any(f"data_type={dt}" in path for dt in data_types):
            era_match = era_pattern.search(path)
            if era_match:
                era_value = era_match.group(1)
                unique_era_values.add(int(era_value))

    if unique_era_values:
        max_era_value = max(unique_era_values)
    else:
        raise ValueError('No era values found')

    return max_era_value

In [None]:
def train_validation_split_on_eras(max_era:int, split:float=.75)->tuple[list[str],list[str]]:
    all_eras = list(range(max_era + 1))
    split_point = int(len(all_eras) * split)
    train_eras = [str(era).zfill(4) for era in all_eras[:split_point]]
    validation_eras = [str(era).zfill(4) for era in all_eras[split_point:]]

    return train_eras, validation_eras

In [None]:
def neutralize(
    df: pd.DataFrame,
    neutralizers: np.ndarray,
    proportion: float = 1.0,
) -> pd.DataFrame:
    """Neutralize each column of a given DataFrame by each feature in a given
    neutralizers DataFrame. Neutralization uses least-squares regression to
    find the orthogonal projection of each column onto the neutralizers, then
    subtracts the result from the original predictions.

    Arguments:
        df: pd.DataFrame - the data with columns to neutralize
        neutralizers: pd.DataFrame - the neutralizer data with features as columns
        proportion: float - the degree to which neutralization occurs

    Returns:
        pd.DataFrame - the neutralized data
    """
    assert not neutralizers.isna().any().any(), "Neutralizers contain NaNs"
    assert len(df.index) == len(neutralizers.index), "Indices don't match"
    assert (df.index == neutralizers.index).all(), "Indices don't match"
    df[df.columns[df.std() == 0]] = np.nan
    df_arr = df.values
    neutralizer_arr = neutralizers.values
    neutralizer_arr = np.hstack(
        # add a column of 1s to the neutralizer array in case neutralizer_arr is a single column
        (neutralizer_arr, np.array([1] * len(neutralizer_arr)).reshape(-1, 1))
    )
    inverse_neutralizers = np.linalg.pinv(neutralizer_arr, rcond=1e-6)
    adjustments = proportion * neutralizer_arr.dot(inverse_neutralizers.dot(df_arr))
    neutral = df_arr - adjustments
    return pd.DataFrame(neutral, index=df.index, columns=df.columns)

---

In [None]:
DATA_VERSION = "v5.0"

In [None]:
TEST_RUN = False

In [None]:
# wf = walk_forward
MODEL_NAME = 'walk_forward_feature_neutral'
MODEL_VERSION = 'large'

In [None]:
PATH_TO_DATA = Path(f'/content/drive/MyDrive/numerai/data/{DATA_VERSION}')
PATH_TO_MODELS = Path('/content/drive/MyDrive/numerai/models/')
PATH_TO_PREDICT_FUNCS = Path('/content/drive/MyDrive/numerai/predict_funcs/')

## LGB params

In [None]:
if TEST_RUN:
    lgb_params = {
        "n_estimators": 2,
        "learning_rate": 0.01,
        "max_depth": 5,
        "num_leaves": 2**5,
        "colsample_bytree": 0.1,
        "verbose": -1,
    }
else:
    max_depth = 6
    lgb_params = {
        "n_estimators": 2_000,
        "learning_rate": 0.01,
        "max_depth": max_depth,
        "num_leaves": 2**max_depth,
        "colsample_bytree": 0.1,
        # "device": "gpu",
        "verbose": -1,
    }

---

In [None]:
napi = NumerAPI()

In [None]:
all_datasets = napi.list_datasets()

In [None]:
features_path = PATH_TO_DATA / 'features.json'
target_col = 'target'

In [None]:
feature_metadata = json.load(open(features_path))
feature_sets = feature_metadata["feature_sets"]
features = feature_sets['medium'] + feature_sets['rain']
to_neutralize_feats = feature_sets['rain']

In [None]:
partitioning = ds.partitioning(
    schema=pa.schema([
        ('data_type', pa.string()),
        ('era', pa.string())
    ]),
    flavor='hive'
)

In [None]:
dataset = ds.dataset(PATH_TO_DATA/'pdata', format='parquet', partitioning=partitioning)
filtered_dataset = dataset.filter(ds.field('data_type').isin(['train', 'validation']))

In [None]:
max_era = get_max_era(dataset, {'train', 'validation'})
max_era

1138

In [None]:
teras, veras = train_validation_split_on_eras(max_era)
# last_train_era = int(teras[-1])
last_train_era = max_era

In [None]:
len(teras), len(veras)

(854, 285)

In [None]:
eras_per_chunk = 156 # 156
eras_overlap = eras_per_chunk // 2
purge_eras = 8  # Or 16 for 60D targets

training_steps = get_training_steps(last_train_era, eras_per_chunk, purge_eras, eras_overlap, increment_start_era=True)

In [None]:
model_name = f'{MODEL_NAME}_v{MODEL_VERSION}'

In [None]:
model = train_model_with_walk_forward(
    lgb_params,
    dataset,
    training_steps,
    [target_col, 'era']+features,
    features,
    target_col,
    PATH_TO_MODELS,
    model_name,
    save_model=False
)

Training Phases:  14%|█▍        | 2/14 [17:58<1:50:50, 554.23s/it, Train Eras=0079-0226, Purge Eras=0227-0234, Valid Eras=0235-0390, Train Corr=80.7497, Valid Corr=3.7908]

In [None]:
train_dataset = dataset.filter(ds.field('era').isin(teras))

In [None]:
compute_and_plot_correlation_with_target(
    model, train_dataset, [target_col, 'era']+features, features, target_col, 'train', model_name=model_name, n=2)

---

## Validation

In [None]:
validation_dataset = dataset.filter(ds.field('era').isin(veras))

In [None]:
compute_and_plot_correlation_with_target(
    model, validation_dataset, [target_col, 'era']+features, features, target_col, 'validation', model_name=model_name, n=4)

---

## Live data

In [None]:
live_data_path = PATH_TO_DATA/ 'live.parquet'
napi.download_dataset(f"{DATA_VERSION}/live.parquet", str(live_data_path))

In [None]:
live_data = pd.read_parquet(live_data_path)

In [None]:
def predict(live_features: pd.DataFrame) -> pd.DataFrame:
    live_preds = pd.DataFrame(
        model.predict(live_features[features]),
        index=live_features.index,
        columns=["prediction"]
    )

    neutralized = neutralize(live_preds, live_features[to_neutralize_feats], proportion=0.5)
    return neutralized.rank(pct=True)

In [None]:
a = predict(live_data)
a.describe()

In [None]:
if not TEST_RUN:
    p = cloudpickle.dumps(predict)
    with open(PATH_TO_PREDICT_FUNCS/f'{MODEL_NAME}_v{MODEL_VERSION}.pkl', 'wb') as f:
        f.write(p)

In [None]:
if not TEST_RUN:
    model.booster_.save_model(PATH_TO_MODELS / f'lgbm_{MODEL_NAME}_v{MODEL_VERSION}.txt')

In [None]:
from google.colab import files

if not TEST_RUN:
    files.download(PATH_TO_PREDICT_FUNCS/f'{MODEL_NAME}_v{MODEL_VERSION}.pkl')