In [1]:
# hide
%load_ext autoreload
%autoreload 2
%load_ext nb_black
%load_ext lab_black

<IPython.core.display.Javascript object>

In [2]:
# default_exp preprocessing

<IPython.core.display.Javascript object>

# Preprocessing
> Feature/target selection, engineering and manipulation.

## Overview
This section provides functionality for all data manipulation steps that are needed before data is passed into a model for prediction. We group all these steps under Preprocessing. This includes feature/target selection, feature/target engineering and feature/target manipulation.

Some preprocessors work with both Pandas DataFrames and NumerFrames. Most preprocessors use specific `NumerFrame` functionality.

In the last section we explain how you can implement your own Preprocessor that integrates well with the rest of this framework.

In [3]:
# hide
from nbdev.showdoc import *

<IPython.core.display.Javascript object>

In [1]:
# export
import os
import sdv
import time
import warnings
import numpy as np
import pandas as pd
import datetime as dt
from pathlib import Path
from tqdm.auto import tqdm
from functools import wraps
from scipy.stats import rankdata
from typeguard import typechecked
from abc import ABC, abstractmethod
from rich import print as rich_print
from typing import Union, List, Tuple
from multiprocessing.pool import Pool
from sklearn.linear_model import Ridge
from sklearn.mixture import BayesianGaussianMixture
from sklearn.preprocessing import QuantileTransformer

from numerblox.download import NumeraiClassicDownloader
from numerblox.numerframe import NumerFrame, create_numerframe

## 0. Base

These objects will provide a base for all pre- and post-processing functionality and log relevant information.

## 0.1. BaseProcessor

`BaseProcessor` defines common functionality for `preprocessing` and `postprocessing` (Section 5).

Every Preprocessor should inherit from `BaseProcessor` and implement the `.transform` method.

In [5]:
# export
class BaseProcessor(ABC):
    """Common functionality for preprocessors and postprocessors."""

    def __init__(self):
        ...

    @abstractmethod
    def transform(
        self, dataf: Union[pd.DataFrame, NumerFrame], *args, **kwargs
    ) -> NumerFrame:
        ...

    def __call__(
        self, dataf: Union[pd.DataFrame, NumerFrame], *args, **kwargs
    ) -> NumerFrame:
        return self.transform(dataf=dataf, *args, **kwargs)

<IPython.core.display.Javascript object>

## 0.2. Logging

We would like to keep an overview of which steps are done in a data pipeline and where processing bottlenecks occur.
The decorator below will display for a given function/method:
1. When it has finished.
2. What the output shape of the data is.
3. How long it took to finish.

To use this functionality, simply add `@display_processor_info` as a decorator to the function/method you want to track.

We will use this decorator throughout the pipeline (`preprocessing`, `model` and `postprocessing`).

Inspiration for this decorator: [Calmcode Pandas Pipe Logs](https://calmcode.io/pandas-pipe/logs.html)

In [6]:
# export
def display_processor_info(func):
    """Fancy console output for data processing."""

    @wraps(func)
    def wrapper(*args, **kwargs):
        tic = dt.datetime.now()
        result = func(*args, **kwargs)
        time_taken = str(dt.datetime.now() - tic)
        class_name = func.__qualname__.split(".")[0]
        rich_print(
            f":white_check_mark: Finished step [bold]{class_name}[/bold]. Output shape={result.shape}. Time taken for step: [blue]{time_taken}[/blue]. :white_check_mark:"
        )
        return result

    return wrapper

<IPython.core.display.Javascript object>

In [7]:
# hide_input
class TestDisplay:
    """
    Small test for logging.
    Output should mention 'TestDisplay',
    Return output shape of (10, 314) and
    time taken for step should be close to 2 seconds.
    """

    def __init__(self, dataf: NumerFrame):
        self.dataf = dataf

    @display_processor_info
    def test(self) -> NumerFrame:
        time.sleep(2)
        return self.dataf


dataf = create_numerframe("test_assets/mini_numerai_version_1_data.csv")
TestDisplay(dataf).test()

Unnamed: 0,id,era,data_type,feature_intelligence1,feature_intelligence2,feature_intelligence3,feature_intelligence4,feature_intelligence5,feature_intelligence6,feature_intelligence7,...,feature_wisdom38,feature_wisdom39,feature_wisdom40,feature_wisdom41,feature_wisdom42,feature_wisdom43,feature_wisdom44,feature_wisdom45,feature_wisdom46,target
0,n000315175b67977,era1,train,0.0,0.5,0.25,0.0,0.5,0.25,0.25,...,1.0,1.0,0.75,0.5,0.75,0.5,1.0,0.5,0.75,0.5
1,n0014af834a96cdd,era1,train,0.0,0.0,0.0,0.25,0.5,0.0,0.0,...,1.0,1.0,0.0,0.0,0.75,0.25,0.0,0.25,1.0,0.25
2,n001c93979ac41d4,era1,train,0.25,0.5,0.25,0.25,1.0,0.75,0.75,...,0.25,0.5,0.0,0.0,0.5,1.0,0.0,0.25,0.75,0.25
3,n0034e4143f22a13,era1,train,1.0,0.0,0.0,0.5,0.5,0.25,0.25,...,1.0,1.0,0.75,0.75,1.0,1.0,0.75,1.0,1.0,0.25
4,n00679d1a636062f,era1,train,0.25,0.25,0.25,0.25,0.0,0.25,0.5,...,0.75,0.75,0.25,0.5,0.75,0.0,0.5,0.25,0.75,0.75
5,n009aa2d32389eca,era1,train,0.5,0.5,0.25,0.25,0.75,0.75,0.75,...,0.75,0.75,0.0,0.0,0.75,0.5,0.0,0.25,0.0,0.5
6,n009ef1a5fe009b6,era1,train,0.5,0.25,0.25,0.75,1.0,1.0,1.0,...,1.0,1.0,0.5,0.5,0.75,0.5,0.5,0.5,1.0,0.25
7,n00ae5d51f55fb0f,era1,train,0.25,1.0,1.0,0.75,1.0,0.75,0.75,...,0.5,0.25,0.75,0.75,0.0,0.25,0.75,0.5,0.25,0.25
8,n00b0ac86d77aed7,era1,train,0.5,0.5,0.5,1.0,1.0,0.25,0.5,...,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.5
9,n00c63366aeaf76a,era1,train,0.5,1.0,1.0,0.25,0.75,0.25,0.25,...,0.0,0.0,1.0,1.0,0.75,0.5,1.0,1.0,0.75,0.75


<IPython.core.display.Javascript object>

## 1. Common preprocessing steps


This section implements commonly used preprocessing for Numerai. We invite the Numerai community to develop new preprocessors.

## 1.0 Tournament agnostic

Preprocessors that can be applied for both Numerai Classic and Numerai Signals.

### 1.0.1. CopyPreProcessor

The first and obvious preprocessor is copying, which is implemented as a default in `ModelPipeline` (Section 4) to avoid manipulation of the original DataFrame or `NumerFrame` that you load in.

In [8]:
# export
@typechecked
class CopyPreProcessor(BaseProcessor):
    """Copy DataFrame to avoid manipulation of original DataFrame."""

    def __init__(self):
        super().__init__()

    @display_processor_info
    def transform(self, dataf: Union[pd.DataFrame, NumerFrame]) -> NumerFrame:
        return NumerFrame(dataf.copy())

<IPython.core.display.Javascript object>

In [9]:
dataset = create_numerframe(
    "test_assets/mini_numerai_version_1_data.csv", metadata={"version": 1}
)
copied_dataset = CopyPreProcessor().transform(dataset)
assert np.array_equal(copied_dataset.values, dataset.values)
assert dataset.meta == copied_dataset.meta

<IPython.core.display.Javascript object>

### 1.0.2. FeatureSelectionPreProcessor

`FeatureSelectionPreProcessor` will keep all features that you pass + keeps all other columns that are not features.

In [10]:
# export
@typechecked
class FeatureSelectionPreProcessor(BaseProcessor):
    """
    Keep only features given + all target, predictions and aux columns.
    """

    def __init__(self, feature_cols: Union[str, list]):
        super().__init__()
        self.feature_cols = feature_cols

    @display_processor_info
    def transform(self, dataf: NumerFrame) -> NumerFrame:
        keep_cols = (
            self.feature_cols
            + dataf.target_cols
            + dataf.prediction_cols
            + dataf.aux_cols
        )
        dataf = dataf.loc[:, keep_cols]
        return NumerFrame(dataf)

<IPython.core.display.Javascript object>

In [11]:
selected_dataset = FeatureSelectionPreProcessor(
    feature_cols=["feature_wisdom1"]
).transform(dataset)

assert selected_dataset.get_feature_data.shape[1] == 1
assert dataset.meta == selected_dataset.meta

<IPython.core.display.Javascript object>

In [12]:
selected_dataset.head(2)

Unnamed: 0,feature_wisdom1,target,id,era,data_type
0,0.25,0.5,n000315175b67977,era1,train
1,0.5,0.25,n0014af834a96cdd,era1,train


<IPython.core.display.Javascript object>

### 1.0.3. TargetSelectionPreProcessor

`TargetSelectionPreProcessor` will keep all targets that you pass + all other columns that are not targets.

Not relevant for an inference pipeline, but especially convenient for Numerai Classic training if you train on a subset of the available targets. Can also be applied to Signals if you are using engineered targets in your pipeline.


In [13]:
# export
@typechecked
class TargetSelectionPreProcessor(BaseProcessor):
    """
    Keep only features given + all target, predictions and aux columns.
    """

    def __init__(self, target_cols: Union[str, list]):
        super().__init__()
        self.target_cols = target_cols

    @display_processor_info
    def transform(self, dataf: NumerFrame) -> NumerFrame:
        keep_cols = (
            self.target_cols
            + dataf.feature_cols
            + dataf.prediction_cols
            + dataf.aux_cols
        )
        dataf = dataf.loc[:, keep_cols]
        return NumerFrame(dataf)

<IPython.core.display.Javascript object>

In [14]:
dataset = create_numerframe(
    "test_assets/mini_numerai_version_2_data.parquet", metadata={"version": 2}
)
target_cols = ["target", "target_nomi_20", "target_nomi_60"]
selected_dataset = TargetSelectionPreProcessor(target_cols=target_cols).transform(
    dataset
)
assert selected_dataset.get_target_data.shape[1] == len(target_cols)
selected_dataset.head(2)

Unnamed: 0_level_0,target,target_nomi_20,target_nomi_60,feature_dichasial_hammier_spawner,feature_rheumy_epistemic_prancer,feature_pert_performative_hormuz,feature_hillier_unpitied_theobromine,feature_perigean_bewitching_thruster,feature_renegade_undomestic_milord,feature_koranic_rude_corf,...,feature_drawable_exhortative_dispersant,feature_metabolic_minded_armorist,feature_investigatory_inerasable_circumvallation,feature_centroclinal_incentive_lancelet,feature_unemotional_quietistic_chirper,feature_behaviorist_microbiological_farina,feature_lofty_acceptable_challenge,feature_coactive_prefatorial_lucy,era,data_type
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
n559bd06a8861222,0.25,0.25,0.5,0.25,0.75,0.25,0.75,0.25,0.5,1.0,...,1.0,0.0,0.0,0.25,0.0,0.0,1.0,0.25,297,train
n9d39dea58c9e3cf,0.5,0.5,0.75,0.75,0.5,0.75,1.0,0.5,0.25,0.5,...,0.25,0.5,0.0,0.25,0.75,1.0,0.75,1.0,3,train


<IPython.core.display.Javascript object>

### 1.0.4. ReduceMemoryProcessor

Numerai datasets can take up a lot of RAM and may put a strain on your compute environment.

For Numerai Classic, many of the feature and target columns can be downscaled to `float16`. `int8` if you are using the Numerai int8 datasets. For Signals it depends on the features you are generating.

`ReduceMemoryProcessor` downscales the type of your numeric columns to reduce the memory footprint as much as possible.

In [15]:
# export
class ReduceMemoryProcessor(BaseProcessor):
    """
    Reduce memory usage as much as possible.

    Credits to kainsama and others for writing about memory usage reduction for Numerai data:
    https://forum.numer.ai/t/reducing-memory/313

    :param deep_mem_inspect: Introspect the data deeply by interrogating object dtypes.
    Yields a more accurate representation of memory usage if you have complex object columns.
    """
    def __init__(self, deep_mem_inspect = False):
        super().__init__()
        self.deep_mem_inspect = deep_mem_inspect

    @display_processor_info
    def transform(self, dataf: Union[pd.DataFrame, NumerFrame]) -> NumerFrame:
        dataf = self._reduce_mem_usage(dataf)
        return NumerFrame(dataf)

    def _reduce_mem_usage(self, dataf: pd.DataFrame) -> pd.DataFrame:
        """
        Iterate through all columns and modify the numeric column types
        to reduce memory usage.
        """
        start_memory_usage = dataf.memory_usage(deep=self.deep_mem_inspect).sum() / 1024**2
        rich_print(f"Memory usage of DataFrame is [bold]{round(start_memory_usage, 2)} MB[/bold]")

        for col in dataf.columns:
            col_type = dataf[col].dtype.name

            if col_type not in ['object', 'category', 'datetime64[ns, UTC]','datetime64[ns]']:
                c_min = dataf[col].min()
                c_max = dataf[col].max()
                if str(col_type)[:3] == 'int':
                    if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                        dataf[col] = dataf[col].astype(np.int16)
                    elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                        dataf[col] = dataf[col].astype(np.int16)
                    elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                        dataf[col] = dataf[col].astype(np.int32)
                    elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                        dataf[col] = dataf[col].astype(np.int64)
                else:
                    if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
                        dataf[col] = dataf[col].astype(np.float16)
                    elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                        dataf[col] = dataf[col].astype(np.float32)
                    else:
                        dataf[col] = dataf[col].astype(np.float64)

        end_memory_usage = dataf.memory_usage(deep=self.deep_mem_inspect).sum() / 1024**2
        rich_print(f"Memory usage after optimization is: [bold]{round(end_memory_usage, 2)} MB[/bold]")
        rich_print(f"[green] Usage decreased by [bold]{round(100 * (start_memory_usage - end_memory_usage) / start_memory_usage, 2)}%[/bold][/green]")
        return dataf

<IPython.core.display.Javascript object>

In [16]:
dataf = create_numerframe("test_assets/mini_numerai_version_2_data.parquet")
rmp = ReduceMemoryProcessor()
dataf = rmp.transform(dataf)

<IPython.core.display.Javascript object>

In [17]:
#hide
dataf.head(2)

Unnamed: 0_level_0,era,data_type,feature_dichasial_hammier_spawner,feature_rheumy_epistemic_prancer,feature_pert_performative_hormuz,feature_hillier_unpitied_theobromine,feature_perigean_bewitching_thruster,feature_renegade_undomestic_milord,feature_koranic_rude_corf,feature_demisable_expiring_millepede,...,target_paul_20,target_paul_60,target_george_20,target_george_60,target_william_20,target_william_60,target_arthur_20,target_arthur_60,target_thomas_20,target_thomas_60
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
n559bd06a8861222,297,train,0.25,0.75,0.25,0.75,0.25,0.5,1.0,0.25,...,0.0,0.5,0.25,0.5,0.0,0.5,0.166626,0.5,0.333252,0.5
n9d39dea58c9e3cf,3,train,0.75,0.5,0.75,1.0,0.5,0.25,0.5,0.0,...,0.5,0.75,0.5,0.5,0.666504,0.666504,0.5,0.666504,0.5,0.666504


<IPython.core.display.Javascript object>

### 1.0.5. SyntheticDataGenerator

In [None]:
# export
class SyntheticDataGenerator(BaseProcessor):
    """
    Generate synthetic eras of data.
    Uses SDV (sdv.dev) under the hood.

    :param model_name: Exact class name of a model supported on sdv.
    :param model_path: Path to trained model if you have so.
    By default, initializes and fits a new model.
    """
    SUPPORTED_MODELS = ["GaussianCopula", "CTGAN", "CopulaGAN"]
    def __init__(self,
                 model_path: str,
                 model_name = "GaussianCopula",
                 eras_to_add: int = 1):
        super().__init__()
        self.model_name = model_name
        assert self.model_name in self.SUPPORTED_MODELS,\
            f"Only models '{self.SUPPORTED_MODELS}' are supported. Got '{self.model_name}'."
        self.model_path = Path(model_path)
        self.eras_to_add = eras_to_add

    @display_processor_info
    def transform(self, dataf: Union[pd.DataFrame, NumerFrame]) -> NumerFrame:
        model = self.prepare_model(dataf=dataf)
        synth_datafs = []
        for era_n in range(self.eras_to_add):
            synth_era_data = self.get_synthetic_batch(model=model)
            synth_era_data[dataf.meta.era_col] = f"synth_{era_n.zfill(4)}"
            synth_datafs.append(synth_era_data)

        synth_dataf = pd.concat(synth_datafs)


        # Parse all contents of NumerFrame to the next pipeline step
        return NumerFrame(dataf)

    def prepare_model(self, dataf: Union[pd.DataFrame, NumerFrame]) -> Union[SUPPORTED_MODELS]:
        if self.model_path.is_file():
            model = getattr(sdv.tabular, self.model_name).load(self.model_path)
        else:
            rich_print(f":warning: Model path '{self.model_path}' does not point to a file. Initializing, fitting and saving new model. :warning:")
            model = getattr(sdv.tabular, self.model_name)()
            model.fit(dataf=dataf)
            model.save(self.model_path)
        return model

    @staticmethod
    def get_synthetic_batch(model: Union[SUPPORTED_MODELS],
                            num_rows: int = 200) -> pd.DataFrame:
        synthetic_dataf = model.sample(num_rows=num_rows)
        return synthetic_dataf



## 1.1. Numerai Classic

The Numerai Classic dataset has a certain structure that you may not encounter in the Numerai Signals tournament.
Therefore, this section has all preprocessors that can only be applied to Numerai Classic.

### 1.1.0 Numerai Classic: Version agnostic

Preprocessors that work for all Numerai Classic versions.

#### 1.1.0.1. BayesianGMMTargetProcessor

In [18]:
# export
class BayesianGMMTargetProcessor(BaseProcessor):
    """
    Generate synthetic (fake) target using a Bayesian Gaussian Mixture model. \n
    Based on Michael Oliver's GitHub Gist implementation: \n
    https://gist.github.com/the-moliver/dcdd2862dc2c78dda600f1b449071c93

    :param target_col: Column from which to create fake target. \n
    :param n_components: Number of components for fitting Bayesian Gaussian Mixture Model.
    """
    def __init__(self, target_col: str = "target", n_components: int = 6):
        super().__init__()
        self.target_col = target_col
        self.n_components = n_components
        self.ridge = Ridge(fit_intercept=False)
        self.bins = [0, 0.05, 0.25, 0.75, 0.95, 1]

    @display_processor_info
    def transform(self, dataf: NumerFrame, *args, **kwargs) -> NumerFrame:
        all_eras = dataf[dataf.meta.era_col].unique()
        coefs = self._get_coefs(dataf=dataf, all_eras=all_eras)
        bgmm = self._fit_bgmm(coefs=coefs)
        fake_target = self._generate_target(dataf=dataf,
                                            bgmm=bgmm,
                                            all_eras=all_eras)
        dataf[f"fake_{self.target_col}"] = fake_target
        return NumerFrame(dataf)

    def _get_coefs(self, dataf: NumerFrame, all_eras: list) -> np.ndarray:
        """
        Generate coefficients for BGMM.
        Data should already be scaled between 0 and 1
        (Already done with Numerai Classic data)
        """
        coefs = []
        for era in all_eras:
            features, target = self.__get_features_target(dataf=dataf, era=era)
            self.ridge.fit(features, target)
            coefs.append(self.ridge.coef_)
        stacked_coefs = np.vstack(coefs)
        return stacked_coefs

    def _fit_bgmm(self, coefs: np.ndarray) -> BayesianGaussianMixture:
        """
        Fit Bayesian Gaussian Mixture model on coefficients and normalize.
        """
        bggm = BayesianGaussianMixture(n_components=self.n_components)
        bggm.fit(coefs)
        # make probability of sampling each component equal to better balance rare regimes
        bggm.weights_[:] = 1 / self.n_components
        return bggm

    def _generate_target(self, dataf: NumerFrame,
                         bgmm: BayesianGaussianMixture,
                         all_eras: list) -> np.ndarray:
        """ Generate fake target using Bayesian Gaussian Mixture model. """
        fake_target = []
        for era in tqdm(all_eras, desc="Generating fake target"):
            features, _ = self.__get_features_target(dataf=dataf, era=era)
            # Sample a set of weights from GMM
            beta, _ = bgmm.sample(1)
            # Create fake continuous target
            fake_targ = features @ beta[0]
            # Bin fake target like real target
            fake_targ = (rankdata(fake_targ) - .5) / len(fake_targ)
            fake_targ = (np.digitize(fake_targ, self.bins) - 1) / 4
            fake_target.append(fake_targ)
        return np.concatenate(fake_target)

    def __get_features_target(self, dataf: NumerFrame, era) -> tuple:
        """ Get features and target for one era and center data. """
        sub_df = dataf[dataf.era == era]
        features = sub_df.get_feature_data
        target = sub_df[self.target_col]
        features = features.values - .5
        target = target.values - .5
        return features, target


<IPython.core.display.Javascript object>

In [19]:
# hide
directory = "bgmm_test/"
downloader = NumeraiClassicDownloader(directory_path=directory)
downloader.download_single_dataset(filename="numerai_validation_data.parquet",
                                   dest_path=directory + "numerai_validation_data.parquet")
val_dataf = create_numerframe("bgmm_test/numerai_validation_data.parquet")

2022-04-04 21:28:27,368 INFO numerapi.utils: starting download
bgmm_test/numerai_validation_data.parquet: 228MB [01:36, 2.35MB/s]                            


<IPython.core.display.Javascript object>

In [20]:
bgmm = BayesianGMMTargetProcessor()
val_dataf = bgmm(val_dataf)
val_dataf[['target', 'fake_target']].head(3)

Generating fake target:   0%|          | 0/105 [00:00<?, ?it/s]

Unnamed: 0_level_0,target,fake_target
id,Unnamed: 1_level_1,Unnamed: 2_level_1
n000777698096000,0.25,0.5
n0009793a3b91c27,0.5,0.75
n00099ccd6698ab0,0.0,0.25


<IPython.core.display.Javascript object>

In [21]:
# hide
downloader.remove_base_directory()

<IPython.core.display.Javascript object>

### 1.1.1. Numerai Classic: Version 1 specific

Preprocessors that only work for version 1 (legacy data).
When using version 1 preprocessor it is recommended that the input `NumerFrame` has `version` in its metadata.
This avoids using version 1 preprocessors on version 2 data and encountering confusing error messages.

As a new user we recommend to start modeling the version 2 data and avoid version 1.
The preprocessors below are only there for legacy and compatibility reasons.

#### 1.1.1.1. GroupStatsPreProcessor

The version 1 legacy data has 6 groups of features which allows us to calculate aggregate features.

In [22]:
# export
class GroupStatsPreProcessor(BaseProcessor):
    """
    WARNING: Only supported for Version 1 (legacy) data. \n
    Calculate group statistics for all data groups. \n
    | :param groups: Groups to create features for. All groups by default.
    """

    def __init__(self, groups: list = None):
        super().__init__()
        self.all_groups = [
            "intelligence",
            "wisdom",
            "charisma",
            "dexterity",
            "strength",
            "constitution",
        ]
        self.group_names = groups if groups else self.all_groups

    @display_processor_info
    def transform(self, dataf: NumerFrame, *args, **kwargs) -> NumerFrame:
        """Check validity and add group features."""
        self._check_data_validity(dataf=dataf)
        dataf = dataf.pipe(self._add_group_features)
        return NumerFrame(dataf)

    def _add_group_features(self, dataf: pd.DataFrame) -> pd.DataFrame:
        """Mean, standard deviation and skew for each group."""
        for group in self.group_names:
            cols = [col for col in dataf.columns if group in col]
            dataf[f"feature_{group}_mean"] = dataf[cols].mean(axis=1)
            dataf[f"feature_{group}_std"] = dataf[cols].std(axis=1)
            dataf[f"feature_{group}_skew"] = dataf[cols].skew(axis=1)
        return dataf

    def _check_data_validity(self, dataf: NumerFrame):
        """Make sure this is only used for version 1 data."""
        assert hasattr(
            dataf.meta, "version"
        ), f"Version should be specified for '{self.__class__.__name__}' This Preprocessor will only work on version 1 data."
        assert (
            getattr(dataf.meta, "version") == 1
        ), f"'{self.__class__.__name__}' only works on version 1 data. Got version: '{getattr(dataf.meta, 'version')}'."

<IPython.core.display.Javascript object>

In [23]:
dataf = create_numerframe(
    "test_assets/mini_numerai_version_1_data.csv", metadata={"version": 1}
)
group_features_dataf = GroupStatsPreProcessor().transform(dataf)
group_features_dataf.head(2)
assert group_features_dataf.meta.version == 1

<IPython.core.display.Javascript object>

In [24]:
# hide
new_cols = [
    "feature_intelligence_mean",
    "feature_intelligence_std",
    "feature_intelligence_skew",
    "feature_wisdom_mean",
    "feature_wisdom_std",
    "feature_wisdom_skew",
    "feature_charisma_mean",
    "feature_charisma_std",
    "feature_charisma_skew",
    "feature_dexterity_mean",
    "feature_dexterity_std",
    "feature_dexterity_skew",
    "feature_strength_mean",
    "feature_strength_std",
    "feature_strength_skew",
    "feature_constitution_mean",
    "feature_constitution_std",
    "feature_constitution_skew",
]
assert set(group_features_dataf.columns).intersection(new_cols)
group_features_dataf.get_feature_data[new_cols].head(2)

Unnamed: 0,feature_intelligence_mean,feature_intelligence_std,feature_intelligence_skew,feature_wisdom_mean,feature_wisdom_std,feature_wisdom_skew,feature_charisma_mean,feature_charisma_std,feature_charisma_skew,feature_dexterity_mean,feature_dexterity_std,feature_dexterity_skew,feature_strength_mean,feature_strength_std,feature_strength_skew,feature_constitution_mean,feature_constitution_std,feature_constitution_skew
0,0.333333,0.246183,0.558528,0.668478,0.236022,-0.115082,0.438953,0.25991,-0.004783,0.696429,0.200446,-0.60762,0.480263,0.292829,-0.372064,0.427632,0.27572,0.276155
1,0.208333,0.234359,0.382554,0.559783,0.358177,-0.062362,0.485465,0.252501,-0.021737,0.267857,0.249312,0.382267,0.407895,0.309866,0.220625,0.644737,0.33408,-0.794938


<IPython.core.display.Javascript object>

`GroupStatsPreProcessor` should break if `version != 1`.

In [25]:
# hide
def test_invalid_version(dataf: NumerFrame):
    copied_dataf = dataf.copy()
    copied_dataf.version = 2
    try:
        GroupStatsPreProcessor().transform(copied_dataf)
    except AssertionError:
        return True
    return False


test_invalid_version(dataf)

False

<IPython.core.display.Javascript object>

### 1.1.2. Numerai Classic: Version 2 specific

Preprocessors that are only compatible with version 2 data. If the preprocessor is agnostic to Numerai Classic version implement under heading 1.1.0.

In [26]:
# 1.1.2
# No version 2 specific Numerai Classic preprocessors implemented yet.

<IPython.core.display.Javascript object>

## 1.2. Numerai Signals

Preprocessors that are specific to Numerai Signals.

### 1.2.1. TA-Lib Features (TalibFeatureGenerator)

[TA-Lib](https://mrjbq7.github.io/ta-lib) is an optimized technical analysis library. It is based on Cython and includes 150+ indicators. We have selected features based on feature importances, SHAP and correlation with the Numerai Signals target. If you want to implement other features check out the [TA-Lib documentation](https://mrjbq7.github.io/ta-lib/index.html).

Installation of TA-Lib is a bit more involved than just a pip install and is an optional dependency for this library. Visit the [installation documentation](https://mrjbq7.github.io/ta-lib/install.html) for instructions.

In [27]:
# export
class TalibFeatureGenerator(BaseProcessor):
    """
    Generate relevant features available in TA-Lib. \n
    More info: https://mrjbq7.github.io/ta-lib \n
    Input DataFrames for these functions should have the following columns defined:
    ['open', 'high', 'low', 'close', 'volume'] \n
    Make sure that all values are sorted in chronological order (by ticker). \n
    :param windows: List of ranges for window features.
    Windows will be applied for all features specified in self.window_features. \n
    :param ticker_col: Which column to groupby for feature generation.
    """

    def __init__(self, windows: List[int], ticker_col: str = "bloomberg_ticker"):
        self.__check_talib_import()
        super().__init__()

        self.windows = windows
        self.ticker_col = ticker_col
        self.window_features = [
            "NATR",
            "ADXR",
            "AROONOSC",
            "DX",
            "MFI",
            "MINUS_DI",
            "MINUS_DM",
            "MOM",
            "ROCP",
            "ROCR100",
            "PLUS_DI",
            "PLUS_DM",
            "BETA",
            "RSI",
            "ULTOSC",
            "TRIX",
            "ADXR",
            "CCI",
            "CMO",
            "WILLR",
        ]
        self.no_window_features = ["AD", "OBV", "APO", "MACD", "PPO"]
        self.hlocv_cols = ["open", "high", "low", "close", "volume"]

    def get_no_window_features(self, dataf: pd.DataFrame):
        for func in tqdm(self.no_window_features, desc="No window features"):
            dataf.loc[:, f"feature_{func}"] = (
                dataf.groupby(self.ticker_col)
                .apply(lambda x: pd.Series(self._no_window(x, func)).bfill())
                .values.astype(np.float32)
            )
        return dataf

    def get_window_features(self, dataf: pd.DataFrame):
        for win in tqdm(self.windows, position=0, desc="Window features"):
            for func in tqdm(self.window_features, position=1):
                dataf.loc[:, f"feature_{func}_{win}"] = (
                    dataf.groupby(self.ticker_col)
                    .apply(lambda x: pd.Series(self._window(x, func, win)).bfill())
                    .values.astype(np.float32)
                )
        return dataf

    def get_all_features(self, dataf: pd.DataFrame) -> pd.DataFrame:
        dataf = self.get_no_window_features(dataf)
        dataf = self.get_window_features(dataf)
        return dataf

    def transform(self, dataf: pd.DataFrame, *args, **kwargs) -> NumerFrame:
        return NumerFrame(self.get_all_features(dataf=dataf))

    def _no_window(self, dataf: pd.DataFrame, func) -> pd.Series:
        from talib import abstract as tab

        inputs = self.__get_inputs(dataf)
        if func in ["MACD"]:
            # MACD outputs tuple of 3 elements (value, signal and hist)
            return tab.Function(func)(inputs["close"])[0]
        else:
            return tab.Function(func)(inputs)

    def _window(self, dataf: pd.DataFrame, func, window: int) -> pd.Series:
        from talib import abstract as tab

        inputs = self.__get_inputs(dataf)
        if func in ["ULTOSC"]:
            # ULTOSC requires 3 timeperiods as input
            return tab.Function(func)(
                inputs["high"],
                inputs["low"],
                inputs["close"],
                timeperiod1=window,
                timeperiod2=window * 2,
                timeperiod3=window * 4,
            )
        else:
            return tab.Function(func)(inputs, timeperiod=window)

    def __get_inputs(self, dataf: pd.DataFrame) -> dict:
        return {col: dataf[col].values.astype(np.float64) for col in self.hlocv_cols}

    @staticmethod
    def __check_talib_import():
        try:
            from talib import abstract as tab
        except ImportError:
            raise ImportError(
                "TA-Lib is not installed for this environment. If you are using this class make sure to have TA-Lib installed. check https://mrjbq7.github.io/ta-lib/install.html for instructions on installation."
            )

<IPython.core.display.Javascript object>

In [28]:
# hide
# Example usage
# dataf = pd.DataFrame() # Your Signals DataFrame here.
# tfg = TalibFeatureGenerator(windows=[10, 20, 40], ticker_col="bloomberg_ticker")
# ta_dataf = tfg.transform(dataf=dataf)
# ta_dataf.head(2)

<IPython.core.display.Javascript object>

### 1.2.2. KatsuFeatureGenerator

[Katsu1110](https://www.kaggle.com/code1110) provides an excellent and fast feature engineering scheme in his [Kaggle notebook on starting with Numerai Signals](https://www.kaggle.com/code1110/numeraisignals-starter-for-beginners). It is surprisingly effective, fast and works well for modeling. This preprocessor is based on his feature engineering setup in that notebook.

Features generated:
1. MACD and MACD signal
2. RSI
3. Percentage rate of return
4. Volatility
5. MA (moving average) gap


In [29]:
# export
class KatsuFeatureGenerator(BaseProcessor):
    """
    Effective feature engineering setup based on Katsu's starter notebook.
    Based on source by Katsu1110: https://www.kaggle.com/code1110/numeraisignals-starter-for-beginners

    :param windows: Time interval to apply for window features: \n
    1. Percentage Rate of change \n
    2. Volatility \n
    3. Moving Average gap \n
    :param ticker_col: Columns with tickers to iterate over. \n
    :param close_col: Column name where you have closing price stored.
    """

    warnings.filterwarnings("ignore")

    def __init__(
        self,
        windows: list,
        ticker_col: str = "ticker",
        close_col: str = "close",
        num_cores: int = None,
    ):
        super().__init__()
        self.windows = windows
        self.ticker_col = ticker_col
        self.close_col = close_col
        self.num_cores = num_cores if num_cores else os.cpu_count()

    @display_processor_info
    def transform(self, dataf: Union[pd.DataFrame, NumerFrame]) -> NumerFrame:
        """Multiprocessing feature engineering."""
        tickers = dataf.loc[:, self.ticker_col].unique().tolist()
        rich_print(
            f"Feature engineering for {len(tickers)} tickers using {self.num_cores} CPU cores."
        )
        dataf_list = [
            x
            for _, x in tqdm(
                dataf.groupby(self.ticker_col), desc="Generating ticker DataFrames"
            )
        ]
        dataf = self._generate_features(dataf_list=dataf_list)
        return NumerFrame(dataf)

    def feature_engineering(self, dataf: pd.DataFrame) -> pd.DataFrame:
        """Feature engineering for single ticker."""
        close_series = dataf.loc[:, self.close_col]
        for x in self.windows:
            dataf.loc[
                :, f"feature_{self.close_col}_ROCP_{x}"
            ] = close_series.pct_change(x)

            dataf.loc[:, f"feature_{self.close_col}_VOL_{x}"] = (
                np.log1p(close_series).pct_change().rolling(x).std()
            )

            dataf.loc[:, f"feature_{self.close_col}_MA_gap_{x}"] = (
                close_series / close_series.rolling(x).mean()
            )

        dataf.loc[:, "feature_RSI"] = self._rsi(close_series)
        macd, macd_signal = self._macd(close_series)
        dataf.loc[:, "feature_MACD"] = macd
        dataf.loc[:, "feature_MACD_signal"] = macd_signal
        return dataf.bfill()

    def _generate_features(self, dataf_list: list) -> pd.DataFrame:
        """Add features for list of ticker DataFrames and concatenate."""
        with Pool(self.num_cores) as p:
            feature_datafs = list(
                tqdm(
                    p.imap(self.feature_engineering, dataf_list),
                    desc="Generating features",
                    total=len(dataf_list),
                )
            )
        return pd.concat(feature_datafs)

    @staticmethod
    def _rsi(close: pd.Series, period: int = 14) -> pd.Series:
        """
        See source https://github.com/peerchemist/finta
        and fix https://www.tradingview.com/wiki/Talk:Relative_Strength_Index_(RSI)
        """
        delta = close.diff()
        up, down = delta.copy(), delta.copy()
        up[up < 0] = 0
        down[down > 0] = 0

        gain = up.ewm(com=(period - 1), min_periods=period).mean()
        loss = down.abs().ewm(com=(period - 1), min_periods=period).mean()

        rs = gain / loss
        return pd.Series(100 - (100 / (1 + rs)))

    def _macd(
        self, close: pd.Series, span1=12, span2=26, span3=9
    ) -> Tuple[pd.Series, pd.Series]:
        """Compute MACD and MACD signal."""
        exp1 = self.__ema1(close, span1)
        exp2 = self.__ema1(close, span2)
        macd = 100 * (exp1 - exp2) / exp2
        signal = self.__ema1(macd, span3)
        return macd, signal

    @staticmethod
    def __ema1(series: pd.Series, span: int) -> pd.Series:
        """Exponential moving average"""
        a = 2 / (span + 1)
        return series.ewm(alpha=a).mean()

<IPython.core.display.Javascript object>

In [30]:
# other
from numerblox.download import KaggleDownloader
# Get price data from Kaggle
home_dir = "katsu_features_test/"
kd = KaggleDownloader(home_dir)
kd.download_training_data("code1110/yfinance-stock-price-data-for-numerai-signals")

<IPython.core.display.Javascript object>

In [31]:
# other
dataf = create_numerframe(f"{home_dir}/full_data.parquet")
dataf.loc[:, 'friday_date'] = dataf['date']
# Take 500 ticker sample for test
dataf = dataf[dataf['ticker'].isin(dataf['ticker'].unique()[:500])]

<IPython.core.display.Javascript object>

In [32]:
# other
kfpp = KatsuFeatureGenerator(windows=[20, 40, 60], num_cores=8)
new_dataf = kfpp.transform(dataf)

Generating ticker DataFrames:   0%|          | 0/500 [00:00<?, ?it/s]

2022-04-04 21:35:36,083 INFO numexpr.utils: Note: NumExpr detected 16 cores but "NUMEXPR_MAX_THREADS" not set, so enforcing safe limit of 8.
2022-04-04 21:35:36,083 INFO numexpr.utils: Note: NumExpr detected 16 cores but "NUMEXPR_MAX_THREADS" not set, so enforcing safe limit of 8.


Generating features:   0%|          | 0/500 [00:00<?, ?it/s]

2022-04-04 21:35:36,083 INFO numexpr.utils: Note: NumExpr detected 16 cores but "NUMEXPR_MAX_THREADS" not set, so enforcing safe limit of 8.
2022-04-04 21:35:36,088 INFO numexpr.utils: NumExpr defaulting to 8 threads.
2022-04-04 21:35:36,089 INFO numexpr.utils: NumExpr defaulting to 8 threads.
2022-04-04 21:35:36,089 INFO numexpr.utils: NumExpr defaulting to 8 threads.
2022-04-04 21:35:36,088 INFO numexpr.utils: Note: NumExpr detected 16 cores but "NUMEXPR_MAX_THREADS" not set, so enforcing safe limit of 8.
2022-04-04 21:35:36,093 INFO numexpr.utils: NumExpr defaulting to 8 threads.
2022-04-04 21:35:36,095 INFO numexpr.utils: Note: NumExpr detected 16 cores but "NUMEXPR_MAX_THREADS" not set, so enforcing safe limit of 8.
2022-04-04 21:35:36,102 INFO numexpr.utils: NumExpr defaulting to 8 threads.
2022-04-04 21:35:36,105 INFO numexpr.utils: Note: NumExpr detected 16 cores but "NUMEXPR_MAX_THREADS" not set, so enforcing safe limit of 8.
2022-04-04 21:35:36,113 INFO numexpr.utils: NumExpr

<IPython.core.display.Javascript object>

12 features are generated in this test (3*3 window features + 3 non window features).

In [33]:
# other
new_dataf.sort_values(["ticker", "date"]).get_feature_data.tail(2)

Unnamed: 0,feature_close_ROCP_20,feature_close_VOL_20,feature_close_MA_gap_20,feature_close_ROCP_40,feature_close_VOL_40,feature_close_MA_gap_40,feature_close_ROCP_60,feature_close_VOL_60,feature_close_MA_gap_60,feature_RSI,feature_MACD,feature_MACD_signal
19849675,-0.04,0.01197,0.99348,-0.150442,0.009611,0.941269,-0.185059,0.008468,0.899522,43.858443,-2.812436,-2.858512
19854897,-0.012097,0.012178,1.014808,-0.141856,0.00973,0.964709,-0.169492,0.008587,0.921139,48.401512,-2.428545,-2.772518


<IPython.core.display.Javascript object>

### 1.2.3. EraQuantileProcessor

In [34]:
# export
class EraQuantileProcessor(BaseProcessor):
    """
    Transform features into quantiles on a per-era basis

    :param num_quantiles: Number of buckets to split data into: \n
    :param era_col: Era column name in the dataframe to perform each transformation \n
    """

    def __init__(
        self,
        num_quantiles: int = 50,
        era_col: str = "friday_date",
        features: list = None,
        num_cores: int = None,
    ):
        super().__init__()
        self.num_quantiles = num_quantiles
        self.era_col = era_col
        self.num_cores = num_cores if num_cores else os.cpu_count()
        self.features = features

    def _process_eras(self, groupby_object):
        quantizer = QuantileTransformer(n_quantiles=self.num_quantiles, random_state=0)
        qt = lambda x: quantizer.fit_transform(x.values.reshape(-1, 1)).ravel()

        column = groupby_object.transform(qt)
        return column

    @display_processor_info
    def transform(
        self,
        dataf: Union[pd.DataFrame, NumerFrame],
    ) -> NumerFrame:
        """Multiprocessing quantile transforms by era."""
        self.features = self.features if self.features else dataf.feature_cols
        rich_print(
            f"Quantiling for {len(self.features)} features using {self.num_cores} CPU cores."
        )

        date_groups = dataf.groupby(self.era_col)
        groupby_objects = [date_groups[feature] for feature in self.features]

        with Pool() as p:
            results = list(
                tqdm(
                    p.imap(self._process_eras, groupby_objects),
                    total=len(groupby_objects),
                )
            )

        quantiles = pd.concat(results, axis=1)
        dataf[[f"{feature}_quantile" for feature in self.features]] = quantiles
        return NumerFrame(dataf)

<IPython.core.display.Javascript object>

In [35]:
# other
era_quantiler = EraQuantileProcessor(num_quantiles=50)
era_dataf = era_quantiler.transform(new_dataf)

  0%|          | 0/12 [00:00<?, ?it/s]

<IPython.core.display.Javascript object>

In [36]:
# other
era_dataf.get_feature_data.tail(2)

Unnamed: 0,feature_close_ROCP_20,feature_close_VOL_20,feature_close_MA_gap_20,feature_close_ROCP_40,feature_close_VOL_40,feature_close_MA_gap_40,feature_close_ROCP_60,feature_close_VOL_60,feature_close_MA_gap_60,feature_RSI,...,feature_close_MA_gap_20_quantile,feature_close_ROCP_40_quantile,feature_close_VOL_40_quantile,feature_close_MA_gap_40_quantile,feature_close_ROCP_60_quantile,feature_close_VOL_60_quantile,feature_close_MA_gap_60_quantile,feature_RSI_quantile,feature_MACD_quantile,feature_MACD_signal_quantile
19849675,-0.04,0.01197,0.99348,-0.150442,0.009611,0.941269,-0.185059,0.008468,0.899522,43.858443,...,0.246373,0.03487,0.960689,0.069401,0.106101,0.960712,0.045926,0.177787,0.029794,0.041381
19854897,-0.012097,0.012178,1.014808,-0.141856,0.00973,0.964709,-0.169492,0.008587,0.921139,48.401512,...,0.454079,0.035387,0.962643,0.157835,0.117604,0.9613,0.079544,0.326628,0.040558,0.039853


<IPython.core.display.Javascript object>

In [37]:
# other
# hide
kd.remove_base_directory()

<IPython.core.display.Javascript object>

## 2. Custom preprocessors

There are an almost unlimited number of ways to preprocess (selection, engineering and manipulation). We have only scratched the surface with the preprocessors currently implemented. We invite the Numerai community to develop Numerai Classic and Numerai Signals preprocessors.

A new Preprocessor should inherit from `BaseProcessor` and implement a `transform` method. For efficient implementation, we recommend you use `NumerFrame` functionality for preprocessing. You can also support Pandas DataFrame input as long as the `transform` method returns a `NumerFrame`. This ensures that the Preprocessor still works within a full `numerai-blocks` pipeline. A template for new preprocessors is given below.

To enable fancy logging output. Add the `@display_processor_info` decorator to the `transform` method.

In [38]:
# export
class AwesomePreProcessor(BaseProcessor):
    """ TEMPLATE - Do some awesome preprocessing. """
    def __init__(self):
        super().__init__()

    @display_processor_info
    def transform(self, dataf: NumerFrame, *args, **kwargs) -> NumerFrame:
        # Do processing
        ...
        # Parse all contents of NumerFrame to the next pipeline step
        return NumerFrame(dataf)

<IPython.core.display.Javascript object>

-------------------------------------------

In [39]:
# hide
# Run this cell to sync all changes with library
from nbdev.export import notebook2script

notebook2script()

Converted 00_misc.ipynb.
Converted 01_download.ipynb.
Converted 02_numerframe.ipynb.
Converted 03_preprocessing.ipynb.
Converted 04_model.ipynb.
Converted 05_postprocessing.ipynb.
Converted 06_modelpipeline.ipynb.
Converted 07_evaluation.ipynb.
Converted 08_key.ipynb.
Converted 09_submission.ipynb.
Converted 10_staking.ipynb.
Converted index.ipynb.


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>