In [None]:
#| include: false
%load_ext autoreload
%autoreload 2

In [None]:
#| default_exp preprocessing

In [None]:
#| include: false
from nbdev.showdoc import *

In [None]:
#| export
import os
import time
import warnings
import numpy as np
import pandas as pd
import datetime as dt
import pandas_ta as ta
from tqdm.auto import tqdm
from functools import wraps
from scipy.stats import rankdata
from abc import ABC, abstractmethod
from rich import print as rich_print
from typing import Union, Tuple, List
from multiprocessing.pool import Pool
from sklearn.linear_model import Ridge
from sklearn.mixture import BayesianGaussianMixture
from sklearn.preprocessing import QuantileTransformer, MinMaxScaler

from numerblox.numerframe import NumerFrame, create_numerframe

## 0. Base

These objects will provide a base for all pre- and post-processing functionality and log relevant information.

## 0.1. BaseProcessor

`BaseProcessor` defines common functionality for `preprocessing` and `postprocessing` (Section 5).

Every Preprocessor should inherit from `BaseProcessor` and implement the `.transform` method.

In [None]:
#| export
class BaseProcessor(ABC):
    """Common functionality for preprocessors and postprocessors."""

    def __init__(self):
        ...

    @abstractmethod
    def transform(
        self, dataf: Union[pd.DataFrame, NumerFrame], *args, **kwargs
    ) -> NumerFrame:
        ...

    def __call__(
        self, dataf: Union[pd.DataFrame, NumerFrame], *args, **kwargs
    ) -> NumerFrame:
        return self.transform(dataf=dataf, *args, **kwargs)

## 0.2. Logging

We would like to keep an overview of which steps are done in a data pipeline and where processing bottlenecks occur.
The decorator below will display for a given function/method:
1. When it has finished.
2. What the output shape of the data is.
3. How long it took to finish.

To use this functionality, simply add `@display_processor_info` as a decorator to the function/method you want to track.

We will use this decorator throughout the pipeline (`preprocessing`, `model` and `postprocessing`).

Inspiration for this decorator: [Calmcode Pandas Pipe Logs](https://calmcode.io/pandas-pipe/logs.html)

In [None]:
#| export
def display_processor_info(func):
    """Fancy console output for data processing."""

    @wraps(func)
    def wrapper(*args, **kwargs):
        tic = dt.datetime.now()
        result = func(*args, **kwargs)
        time_taken = str(dt.datetime.now() - tic)
        class_name = func.__qualname__.split(".")[0]
        rich_print(
            f":white_check_mark: Finished step [bold]{class_name}[/bold]. Output shape={result.shape}. Time taken for step: [blue]{time_taken}[/blue]. :white_check_mark:"
        )
        return result

    return wrapper

In [None]:
#| echo: false
class TestDisplay:
    """
    Small test for logging.
    Output should mention 'TestDisplay',
    Return output shape of (10, 314) and
    time taken for step should be close to 2 seconds.
    """

    def __init__(self, dataf: NumerFrame):
        self.dataf = dataf

    @display_processor_info
    def test(self) -> NumerFrame:
        time.sleep(2)
        return self.dataf


dataf = create_numerframe("test_assets/mini_numerai_version_2_data.parquet")
TestDisplay(dataf).test()

Unnamed: 0_level_0,era,data_type,feature_dichasial_hammier_spawner,feature_rheumy_epistemic_prancer,feature_pert_performative_hormuz,feature_hillier_unpitied_theobromine,feature_perigean_bewitching_thruster,feature_renegade_undomestic_milord,feature_koranic_rude_corf,feature_demisable_expiring_millepede,...,target_paul_20,target_paul_60,target_george_20,target_george_60,target_william_20,target_william_60,target_arthur_20,target_arthur_60,target_thomas_20,target_thomas_60
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
n559bd06a8861222,297,train,0.25,0.75,0.25,0.75,0.25,0.5,1.0,0.25,...,0.0,0.5,0.25,0.5,0.0,0.5,0.166667,0.5,0.333333,0.5
n9d39dea58c9e3cf,3,train,0.75,0.5,0.75,1.0,0.5,0.25,0.5,0.0,...,0.5,0.75,0.5,0.5,0.666667,0.666667,0.5,0.666667,0.5,0.666667
nb64f06d3a9fc9f1,472,train,1.0,1.0,1.0,0.5,0.0,1.0,0.25,0.5,...,0.0,0.25,0.5,0.5,0.333333,0.333333,0.333333,0.333333,0.333333,0.333333
n1927b4862500882,265,train,0.0,0.0,0.25,0.0,1.0,0.0,0.0,0.0,...,0.75,0.75,0.5,0.75,0.833333,0.833333,0.666667,0.833333,0.666667,0.666667
nc3234b6eeacd6b7,299,train,0.75,0.25,0.0,0.75,1.0,0.25,0.0,0.0,...,0.25,0.5,0.5,0.5,0.166667,0.666667,0.333333,0.5,0.5,0.666667
n1b41d583e12f051,9,train,0.0,0.5,0.5,0.25,0.25,0.5,0.5,1.0,...,0.5,0.25,0.5,0.0,0.5,0.333333,0.5,0.333333,0.5,0.333333
n116898cdc07d4e2,13,train,0.5,1.0,1.0,0.75,0.0,1.0,0.5,0.75,...,0.5,0.75,0.5,0.5,0.5,0.666667,0.5,0.666667,0.5,0.666667
nb0a7aef640025dc,232,train,0.25,0.25,0.5,0.0,1.0,0.0,0.5,0.0,...,0.5,0.25,0.5,0.0,0.5,0.166667,0.5,0.0,0.666667,0.166667
n12466a161ab0a24,92,train,0.5,0.75,1.0,0.5,0.25,0.25,0.25,0.75,...,0.5,0.5,0.5,0.5,0.5,0.5,0.5,0.5,0.5,0.5
n40132f4765f9185,270,train,0.5,0.5,0.0,0.5,0.75,0.5,0.0,0.25,...,0.25,0.5,0.0,0.5,0.333333,0.333333,0.333333,0.333333,0.333333,0.5


## 1. Common preprocessing steps


This section implements commonly used preprocessing for Numerai. We invite the Numerai community to develop new preprocessors.

## 1.0 Tournament agnostic

Preprocessors that can be applied for both Numerai Classic and Numerai Signals.

### 1.0.1. CopyPreProcessor

The first and obvious preprocessor is copying, which is implemented as a default in `ModelPipeline` (Section 4) to avoid manipulation of the original DataFrame or `NumerFrame` that you load in.

In [None]:
#| export
class CopyPreProcessor(BaseProcessor):
    """Copy DataFrame to avoid manipulation of original DataFrame."""

    def __init__(self):
        super().__init__()

    @display_processor_info
    def transform(self, dataf: Union[pd.DataFrame, NumerFrame]) -> NumerFrame:
        return NumerFrame(dataf.copy())

In [None]:
dataset = create_numerframe(
    "test_assets/mini_numerai_version_2_data.parquet"
)
copied_dataset = CopyPreProcessor().transform(dataset)
assert np.array_equal(copied_dataset.values, dataset.values)
assert dataset.meta == copied_dataset.meta

### 1.0.2. FeatureSelectionPreProcessor

`FeatureSelectionPreProcessor` will keep all features that you pass + keeps all other columns that are not features.

In [None]:
#| export
class FeatureSelectionPreProcessor(BaseProcessor):
    """
    Keep only features given + all target, predictions and aux columns.
    """

    def __init__(self, feature_cols: Union[str, list]):
        super().__init__()
        self.feature_cols = feature_cols

    @display_processor_info
    def transform(self, dataf: NumerFrame) -> NumerFrame:
        keep_cols = (
            self.feature_cols
            + dataf.target_cols
            + dataf.prediction_cols
            + dataf.aux_cols
        )
        dataf = dataf.loc[:, keep_cols]
        return NumerFrame(dataf)

In [None]:
selected_dataset = FeatureSelectionPreProcessor(
    feature_cols=["feature_dichasial_hammier_spawner"]
).transform(dataset)

assert selected_dataset.get_feature_data.shape[1] == 1
assert dataset.meta == selected_dataset.meta

In [None]:
selected_dataset.head(2)

Unnamed: 0_level_0,feature_dichasial_hammier_spawner,target,target_nomi_20,target_nomi_60,target_jerome_20,target_jerome_60,target_janet_20,target_janet_60,target_ben_20,target_ben_60,...,target_george_20,target_george_60,target_william_20,target_william_60,target_arthur_20,target_arthur_60,target_thomas_20,target_thomas_60,era,data_type
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
n559bd06a8861222,0.25,0.25,0.25,0.5,0.0,0.5,0.5,0.5,0.25,0.5,...,0.25,0.5,0.0,0.5,0.166667,0.5,0.333333,0.5,297,train
n9d39dea58c9e3cf,0.75,0.5,0.5,0.75,0.5,0.75,0.5,0.5,0.5,0.5,...,0.5,0.5,0.666667,0.666667,0.5,0.666667,0.5,0.666667,3,train


### 1.0.3. TargetSelectionPreProcessor

`TargetSelectionPreProcessor` will keep all targets that you pass + all other columns that are not targets.

Not relevant for an inference pipeline, but especially convenient for Numerai Classic training if you train on a subset of the available targets. Can also be applied to Signals if you are using engineered targets in your pipeline.


In [None]:
#| export
class TargetSelectionPreProcessor(BaseProcessor):
    """
    Keep only features given + all target, predictions and aux columns.
    """

    def __init__(self, target_cols: Union[str, list]):
        super().__init__()
        self.target_cols = target_cols

    @display_processor_info
    def transform(self, dataf: NumerFrame) -> NumerFrame:
        keep_cols = (
            self.target_cols
            + dataf.feature_cols
            + dataf.prediction_cols
            + dataf.aux_cols
        )
        dataf = dataf.loc[:, keep_cols]
        return NumerFrame(dataf)

In [None]:
dataset = create_numerframe(
    "test_assets/mini_numerai_version_2_data.parquet"
)
target_cols = ["target", "target_nomi_20", "target_nomi_60"]
selected_dataset = TargetSelectionPreProcessor(target_cols=target_cols).transform(
    dataset
)
assert selected_dataset.get_target_data.shape[1] == len(target_cols)
selected_dataset.head(2)

Unnamed: 0_level_0,target,target_nomi_20,target_nomi_60,feature_dichasial_hammier_spawner,feature_rheumy_epistemic_prancer,feature_pert_performative_hormuz,feature_hillier_unpitied_theobromine,feature_perigean_bewitching_thruster,feature_renegade_undomestic_milord,feature_koranic_rude_corf,...,feature_drawable_exhortative_dispersant,feature_metabolic_minded_armorist,feature_investigatory_inerasable_circumvallation,feature_centroclinal_incentive_lancelet,feature_unemotional_quietistic_chirper,feature_behaviorist_microbiological_farina,feature_lofty_acceptable_challenge,feature_coactive_prefatorial_lucy,era,data_type
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
n559bd06a8861222,0.25,0.25,0.5,0.25,0.75,0.25,0.75,0.25,0.5,1.0,...,1.0,0.0,0.0,0.25,0.0,0.0,1.0,0.25,297,train
n9d39dea58c9e3cf,0.5,0.5,0.75,0.75,0.5,0.75,1.0,0.5,0.25,0.5,...,0.25,0.5,0.0,0.25,0.75,1.0,0.75,1.0,3,train


### 1.0.4. ReduceMemoryProcessor

Numerai datasets can take up a lot of RAM and may put a strain on your compute environment.

For Numerai Classic, many of the feature and target columns can be downscaled to `float16`. `int8` if you are using the Numerai int8 datasets. For Signals it depends on the features you are generating.

`ReduceMemoryProcessor` downscales the type of your numeric columns to reduce the memory footprint as much as possible.

In [None]:
#| export
class ReduceMemoryProcessor(BaseProcessor):
    """
    Reduce memory usage as much as possible.

    Credits to kainsama and others for writing about memory usage reduction for Numerai data:
    https://forum.numer.ai/t/reducing-memory/313

    :param deep_mem_inspect: Introspect the data deeply by interrogating object dtypes.
    Yields a more accurate representation of memory usage if you have complex object columns.
    """

    def __init__(self, deep_mem_inspect=False):
        super().__init__()
        self.deep_mem_inspect = deep_mem_inspect

    @display_processor_info
    def transform(self, dataf: Union[pd.DataFrame, NumerFrame]) -> NumerFrame:
        dataf = self._reduce_mem_usage(dataf)
        return NumerFrame(dataf)

    def _reduce_mem_usage(self, dataf: pd.DataFrame) -> pd.DataFrame:
        """
        Iterate through all columns and modify the numeric column types
        to reduce memory usage.
        """
        start_memory_usage = (
            dataf.memory_usage(deep=self.deep_mem_inspect).sum() / 1024**2
        )
        rich_print(
            f"Memory usage of DataFrame is [bold]{round(start_memory_usage, 2)} MB[/bold]"
        )

        for col in dataf.columns:
            col_type = dataf[col].dtype.name

            if col_type not in [
                "object",
                "category",
                "datetime64[ns, UTC]",
                "datetime64[ns]",
            ]:
                c_min = dataf[col].min()
                c_max = dataf[col].max()
                if str(col_type)[:3] == "int":
                    if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                        dataf[col] = dataf[col].astype(np.int16)
                    elif (
                        c_min > np.iinfo(np.int16).min
                        and c_max < np.iinfo(np.int16).max
                    ):
                        dataf[col] = dataf[col].astype(np.int16)
                    elif (
                        c_min > np.iinfo(np.int32).min
                        and c_max < np.iinfo(np.int32).max
                    ):
                        dataf[col] = dataf[col].astype(np.int32)
                    elif (
                        c_min > np.iinfo(np.int64).min
                        and c_max < np.iinfo(np.int64).max
                    ):
                        dataf[col] = dataf[col].astype(np.int64)
                else:
                    if (
                        c_min > np.finfo(np.float16).min
                        and c_max < np.finfo(np.float16).max
                    ):
                        dataf[col] = dataf[col].astype(np.float16)
                    elif (
                        c_min > np.finfo(np.float32).min
                        and c_max < np.finfo(np.float32).max
                    ):
                        dataf[col] = dataf[col].astype(np.float32)
                    else:
                        dataf[col] = dataf[col].astype(np.float64)

        end_memory_usage = (
            dataf.memory_usage(deep=self.deep_mem_inspect).sum() / 1024**2
        )
        rich_print(
            f"Memory usage after optimization is: [bold]{round(end_memory_usage, 2)} MB[/bold]"
        )
        rich_print(
            f"[green] Usage decreased by [bold]{round(100 * (start_memory_usage - end_memory_usage) / start_memory_usage, 2)}%[/bold][/green]"
        )
        return dataf

In [None]:
dataf = create_numerframe("test_assets/mini_numerai_version_2_data.parquet")
rmp = ReduceMemoryProcessor()
dataf = rmp.transform(dataf)

In [None]:
#| include: false
dataf.head(2)

Unnamed: 0_level_0,era,data_type,feature_dichasial_hammier_spawner,feature_rheumy_epistemic_prancer,feature_pert_performative_hormuz,feature_hillier_unpitied_theobromine,feature_perigean_bewitching_thruster,feature_renegade_undomestic_milord,feature_koranic_rude_corf,feature_demisable_expiring_millepede,...,target_paul_20,target_paul_60,target_george_20,target_george_60,target_william_20,target_william_60,target_arthur_20,target_arthur_60,target_thomas_20,target_thomas_60
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
n559bd06a8861222,297,train,0.25,0.75,0.25,0.75,0.25,0.5,1.0,0.25,...,0.0,0.5,0.25,0.5,0.0,0.5,0.166626,0.5,0.333252,0.5
n9d39dea58c9e3cf,3,train,0.75,0.5,0.75,1.0,0.5,0.25,0.5,0.0,...,0.5,0.75,0.5,0.5,0.666504,0.666504,0.5,0.666504,0.5,0.666504


### 1.0.6. UMAPFeatureGenerator

Uniform Manifold Approximation and Projection (UMAP) is a dimensionality reduction technique that we can utilize to generate new Numerai features. This processor uses [umap-learn](https://pypi.org/project/umap-learn) under the hood to model the manifold. The dimension of the input data will be reduced to `n_components` number of features.

In [None]:
#| export
class UMAPFeatureGenerator(BaseProcessor):
    """
    Generate new Numerai features using UMAP. Uses umap-learn under the hood: \n
    https://pypi.org/project/umap-learn/
    :param n_components: How many new features to generate.
    :param n_neighbors: Number of neighboring points used in local approximations of manifold structure.
    :param min_dist: How tightly the embedding is allows to compress points together.
    :param metric: Metric to measure distance in input space. Correlation by default.
    :param feature_names: Selection of features used to perform UMAP on. All features by default.
    *args, **kwargs will be passed to initialization of UMAP.
    """

    def __init__(
        self,
        n_components: int = 5,
        n_neighbors: int = 15,
        min_dist: float = 0.0,
        metric: str = "correlation",
        feature_names: list = None,
        *args,
        **kwargs,
    ):
        from umap import UMAP
        super().__init__()
        self.n_components = n_components
        self.n_neighbors = n_neighbors
        self.min_dist = min_dist
        self.feature_names = feature_names
        self.metric = metric
        self.umap = UMAP(
            n_components=self.n_components,
            n_neighbors=self.n_neighbors,
            min_dist=self.min_dist,
            metric=self.metric,
            *args,
            **kwargs,
        )

    def transform(self, dataf: NumerFrame, *args, **kwargs) -> NumerFrame:
        feature_names = self.feature_names if self.feature_names else dataf.feature_cols
        new_feature_data = self.umap.fit_transform(dataf[feature_names])
        umap_feature_names = [f"feature_umap_{i}" for i in range(self.n_components)]
        norm_new_feature_data = MinMaxScaler().fit_transform(new_feature_data)
        dataf.loc[:, umap_feature_names] = norm_new_feature_data
        return NumerFrame(dataf)

In [None]:
# n_components = 3
# umap_gen = UMAPFeatureGenerator(n_components=n_components, n_neighbors=9)
# dataf = create_numerframe("test_assets/mini_numerai_version_2_data.parquet")
# dataf = umap_gen(dataf)

2023-09-04 12:30:52.241984: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-09-04 12:30:52.420233: E tensorflow/stream_executor/cuda/cuda_blas.cc:2981] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2023-09-04 12:30:53.111178: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: :/opt/miniconda3/envs/classic_prod/lib/
2023-09-04 12:30:53.111304: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvinfer_plu

The new features will be names with the convention `f"feature_umap_{i}"`.

In [None]:
# umap_features = [f"feature_umap_{i}" for i in range(n_components)]
# dataf[umap_features].head(3)

Unnamed: 0_level_0,feature_umap_0,feature_umap_1,feature_umap_2
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
n559bd06a8861222,0.296676,0.31961,1.0
n9d39dea58c9e3cf,0.535175,0.081603,0.743973
nb64f06d3a9fc9f1,0.0,0.867384,0.378873


## 1.1. Numerai Classic

The Numerai Classic dataset has a certain structure that you may not encounter in the Numerai Signals tournament.
Therefore, this section has all preprocessors that can only be applied to Numerai Classic.

### 1.1.0 Numerai Classic: Version agnostic

Preprocessors that work for all Numerai Classic versions.

#### 1.1.0.1. BayesianGMMTargetProcessor

In [None]:
#| export
class BayesianGMMTargetProcessor(BaseProcessor):
    """
    Generate synthetic (fake) target using a Bayesian Gaussian Mixture model. \n
    Based on Michael Oliver's GitHub Gist implementation: \n
    https://gist.github.com/the-moliver/dcdd2862dc2c78dda600f1b449071c93

    :param target_col: Column from which to create fake target. \n
    :param feature_names: Selection of features used for Bayesian GMM. All features by default.
    :param n_components: Number of components for fitting Bayesian Gaussian Mixture Model.
    """

    def __init__(
        self,
        target_col: str = "target",
        feature_names: list = None,
        n_components: int = 6,
    ):
        super().__init__()
        self.target_col = target_col
        self.feature_names = feature_names
        self.n_components = n_components
        self.ridge = Ridge(fit_intercept=False)
        self.bins = [0, 0.05, 0.25, 0.75, 0.95, 1]

    @display_processor_info
    def transform(self, dataf: NumerFrame, *args, **kwargs) -> NumerFrame:
        all_eras = dataf[dataf.meta.era_col].unique()
        coefs = self._get_coefs(dataf=dataf, all_eras=all_eras)
        bgmm = self._fit_bgmm(coefs=coefs)
        fake_target = self._generate_target(dataf=dataf, bgmm=bgmm, all_eras=all_eras)
        dataf[f"{self.target_col}_fake"] = fake_target
        return NumerFrame(dataf)

    def _get_coefs(self, dataf: NumerFrame, all_eras: list) -> np.ndarray:
        """
        Generate coefficients for BGMM.
        Data should already be scaled between 0 and 1
        (Already done with Numerai Classic data)
        """
        coefs = []
        for era in all_eras:
            features, target = self.__get_features_target(dataf=dataf, era=era)
            self.ridge.fit(features, target)
            coefs.append(self.ridge.coef_)
        stacked_coefs = np.vstack(coefs)
        return stacked_coefs

    def _fit_bgmm(self, coefs: np.ndarray) -> BayesianGaussianMixture:
        """
        Fit Bayesian Gaussian Mixture model on coefficients and normalize.
        """
        bgmm = BayesianGaussianMixture(n_components=self.n_components)
        bgmm.fit(coefs)
        # make probability of sampling each component equal to better balance rare regimes
        bgmm.weights_[:] = 1 / self.n_components
        return bgmm

    def _generate_target(
        self, dataf: NumerFrame, bgmm: BayesianGaussianMixture, all_eras: list
    ) -> np.ndarray:
        """Generate fake target using Bayesian Gaussian Mixture model."""
        fake_target = []
        for era in tqdm(all_eras, desc="Generating fake target"):
            features, _ = self.__get_features_target(dataf=dataf, era=era)
            # Sample a set of weights from GMM
            beta, _ = bgmm.sample(1)
            # Create fake continuous target
            fake_targ = features @ beta[0]
            # Bin fake target like real target
            fake_targ = (rankdata(fake_targ) - 0.5) / len(fake_targ)
            fake_targ = (np.digitize(fake_targ, self.bins) - 1) / 4
            fake_target.append(fake_targ)
        return np.concatenate(fake_target)

    def __get_features_target(self, dataf: NumerFrame, era) -> tuple:
        """Get features and target for one era and center data."""
        sub_df = dataf[dataf[dataf.meta.era_col] == era]
        features = self.feature_names if self.feature_names else sub_df.feature_cols
        target = sub_df[self.target_col].values - 0.5
        features = sub_df[features].values - 0.5
        return features, target

## 1.2. Numerai Signals

Preprocessors that are specific to Numerai Signals.

### 1.2.1. KatsuFeatureGenerator

[Katsu1110](https://www.kaggle.com/code1110) provides an excellent and fast feature engineering scheme in his [Kaggle notebook on starting with Numerai Signals](https://www.kaggle.com/code1110/numeraisignals-starter-for-beginners). It is surprisingly effective, fast and works well for modeling. This preprocessor is based on his feature engineering setup in that notebook.

Features generated:
1. MACD and MACD signal
2. RSI
3. Percentage rate of return
4. Volatility
5. MA (moving average) gap


In [None]:
#| export
class KatsuFeatureGenerator(BaseProcessor):
    """
    Effective feature engineering setup based on Katsu's starter notebook.
    Based on source by Katsu1110: https://www.kaggle.com/code1110/numeraisignals-starter-for-beginners

    :param windows: Time interval to apply for window features: \n
    1. Percentage Rate of change \n
    2. Volatility \n
    3. Moving Average gap \n
    :param ticker_col: Columns with tickers to iterate over. \n
    :param close_col: Column name where you have closing price stored.
    """

    warnings.filterwarnings("ignore")

    def __init__(
        self,
        windows: list,
        ticker_col: str = "ticker",
        close_col: str = "close",
        num_cores: int = None,
    ):
        super().__init__()
        self.windows = windows
        self.ticker_col = ticker_col
        self.close_col = close_col
        self.num_cores = num_cores if num_cores else os.cpu_count()

    @display_processor_info
    def transform(self, dataf: Union[pd.DataFrame, NumerFrame]) -> NumerFrame:
        """Multiprocessing feature engineering."""
        tickers = dataf.loc[:, self.ticker_col].unique().tolist()
        rich_print(
            f"Feature engineering for {len(tickers)} tickers using {self.num_cores} CPU cores."
        )
        dataf_list = [
            x
            for _, x in tqdm(
                dataf.groupby(self.ticker_col), desc="Generating ticker DataFrames"
            )
        ]
        dataf = self._generate_features(dataf_list=dataf_list)
        return NumerFrame(dataf)

    def feature_engineering(self, dataf: pd.DataFrame) -> pd.DataFrame:
        """Feature engineering for single ticker."""
        close_series = dataf.loc[:, self.close_col]
        for x in self.windows:
            dataf.loc[
                :, f"feature_{self.close_col}_ROCP_{x}"
            ] = close_series.pct_change(x)

            dataf.loc[:, f"feature_{self.close_col}_VOL_{x}"] = (
                np.log1p(close_series).pct_change().rolling(x).std()
            )

            dataf.loc[:, f"feature_{self.close_col}_MA_gap_{x}"] = (
                close_series / close_series.rolling(x).mean()
            )

        dataf.loc[:, "feature_RSI"] = self._rsi(close_series)
        macd, macd_signal = self._macd(close_series)
        dataf.loc[:, "feature_MACD"] = macd
        dataf.loc[:, "feature_MACD_signal"] = macd_signal
        return dataf.bfill()

    def _generate_features(self, dataf_list: list) -> pd.DataFrame:
        """Add features for list of ticker DataFrames and concatenate."""
        with Pool(self.num_cores) as p:
            feature_datafs = list(
                tqdm(
                    p.imap(self.feature_engineering, dataf_list),
                    desc="Generating features",
                    total=len(dataf_list),
                )
            )
        return pd.concat(feature_datafs)

    @staticmethod
    def _rsi(close: pd.Series, period: int = 14) -> pd.Series:
        """
        See source https://github.com/peerchemist/finta
        and fix https://www.tradingview.com/wiki/Talk:Relative_Strength_Index_(RSI)
        """
        delta = close.diff()
        up, down = delta.copy(), delta.copy()
        up[up < 0] = 0
        down[down > 0] = 0

        gain = up.ewm(com=(period - 1), min_periods=period).mean()
        loss = down.abs().ewm(com=(period - 1), min_periods=period).mean()

        rs = gain / loss
        return pd.Series(100 - (100 / (1 + rs)))

    def _macd(
        self, close: pd.Series, span1=12, span2=26, span3=9
    ) -> Tuple[pd.Series, pd.Series]:
        """Compute MACD and MACD signal."""
        exp1 = self.__ema1(close, span1)
        exp2 = self.__ema1(close, span2)
        macd = 100 * (exp1 - exp2) / exp2
        signal = self.__ema1(macd, span3)
        return macd, signal

    @staticmethod
    def __ema1(series: pd.Series, span: int) -> pd.Series:
        """Exponential moving average"""
        a = 2 / (span + 1)
        return series.ewm(alpha=a).mean()

Let's create a simple synthetic dataset to test preprocessors on. Many preprocessor require at least `ticker`, `date` and `close` columns. More advanced feature engineering preprocessors should also have `open`, `high`, `low` and `volume` columns.

In [None]:
instances = []
tickers = ["ABC.US", "DEF.US", "GHI.US"]
for ticker in tickers:
    price = np.random.randint(10, 100)
    for i in range(100):
        price += np.random.uniform(-1, 1)
        instances.append(
            {
                "ticker": ticker,
                "date": pd.Timestamp("2020-01-01") + pd.Timedelta(days=i),
                "open": price - 0.05,
                "high": price + 0.02,
                "low": price - 0.01,
                "close": price,
                "volume": np.random.randint(1000, 10000),
            }
        )
dummy_df = NumerFrame(instances)

In [None]:
dummy_df.head(2)

Unnamed: 0,ticker,date,open,high,low,close,volume
0,ABC.US,2020-01-01,57.454395,57.524395,57.494395,57.504395,4981
1,ABC.US,2020-01-02,56.730869,56.800869,56.770869,56.780869,4270


In [None]:
dataf = NumerFrame(dummy_df)
dataf.loc[:, "friday_date"] = dataf["date"]

In [None]:
kfpp = KatsuFeatureGenerator(windows=[20, 40, 60], num_cores=8)
new_dataf = kfpp.transform(dataf)

Generating ticker DataFrames:   0%|          | 0/3 [00:00<?, ?it/s]

Generating features:   0%|          | 0/3 [00:00<?, ?it/s]

12 features are generated in this test (3*3 window features + 3 non window features).

In [None]:
new_dataf.sort_values(["ticker", "date"]).get_feature_data.tail(2)

Unnamed: 0,feature_close_ROCP_20,feature_close_VOL_20,feature_close_MA_gap_20,feature_close_ROCP_40,feature_close_VOL_40,feature_close_MA_gap_40,feature_close_ROCP_60,feature_close_VOL_60,feature_close_MA_gap_60,feature_RSI,feature_MACD,feature_MACD_signal
298,-0.02449,0.001454,0.981365,-0.02597,0.001475,0.971712,-0.001261,0.001416,0.974322,38.102198,-0.946766,-0.589103
299,-0.018306,0.001406,0.980066,-0.026745,0.001476,0.970185,-0.002713,0.001417,0.972187,37.035251,-0.961635,-0.663609


### 1.2.2. EraQuantileProcessor

Numerai Signals' objective is predicting a ranking of equities. Therefore, we can benefit from creating rankings out of the features. Doing this reduces noise and works as a normalization mechanism for your features. `EraQuantileProcessor` bins features in a given number of quantiles for each era in the dataset.

In [None]:
#| export
class EraQuantileProcessor(BaseProcessor):
    """
    Transform features into quantiles on a per-era basis

    :param num_quantiles: Number of buckets to split data into. \n
    :param era_col: Era column name in the dataframe to perform each transformation. \n
    :param features: All features that you want quantized. All feature cols by default. \n
    :param num_cores: CPU cores to allocate for quantile transforming. All available cores by default. \n
    :param random_state: Seed for QuantileTransformer. \n
    :param batch_size: How many feature to process at the same time.
    For Numerai Signals scale data it is advisable to process features one by one. 
    This is the default setting.
    """

    def __init__(
        self,
        num_quantiles: int = 50,
        era_col: str = "friday_date",
        features: list = None,
        num_cores: int = None,
        random_state: int = 0,
        batch_size: int = 1
    ):
        super().__init__()
        self.num_quantiles = num_quantiles
        self.era_col = era_col
        self.num_cores = num_cores if num_cores else os.cpu_count()
        self.features = features 
        self.random_state = random_state
        self.batch_size = batch_size 

    def _process_eras(self, groupby_object):
        quantizer = QuantileTransformer(
            n_quantiles=self.num_quantiles, random_state=self.random_state
        )
        qt = lambda x: quantizer.fit_transform(x.values.reshape(-1, 1)).ravel()

        column = groupby_object.transform(qt)
        return column

    @display_processor_info
    def transform(
        self,
        dataf: Union[pd.DataFrame, NumerFrame],
    ) -> NumerFrame:
        """Multiprocessing quantile transforms by era."""
        features = self.features if self.features else dataf.feature_cols
        rich_print(
            f"Quantiling for {len(features)} features using {self.num_cores} CPU cores."
        )

        date_groups = dataf.groupby(self.era_col)
        for batch_start in tqdm(range(0, len(features), self.batch_size), total=len(features)):
            # Create batch of features. Default is to process features on by one.
            batch_end = min(batch_start + self.batch_size, len(features))
            batch_features = features[batch_start:batch_end]
            groupby_objects = [date_groups[feature] for feature in batch_features]

            with Pool() as p:
                results = list(
                        p.imap(self._process_eras, groupby_objects),
                )

            quantiles = pd.concat(results, axis=1)
            dataf[
                [f"{feature}_quantile{self.num_quantiles}" for feature in batch_features]
            ] = quantiles
            return NumerFrame(dataf)

In [None]:
era_quantiler = EraQuantileProcessor(num_quantiles=50)
era_dataf = era_quantiler.transform(new_dataf)

  0%|          | 0/12 [00:00<?, ?it/s]

In [None]:
era_dataf.get_feature_data.tail(2)

Unnamed: 0,feature_close_ROCP_20,feature_close_VOL_20,feature_close_MA_gap_20,feature_close_ROCP_40,feature_close_VOL_40,feature_close_MA_gap_40,feature_close_ROCP_60,feature_close_VOL_60,feature_close_MA_gap_60,feature_RSI,feature_MACD,feature_MACD_signal,feature_close_ROCP_20_quantile50
298,-0.02449,0.001454,0.981365,-0.02597,0.001475,0.971712,-0.001261,0.001416,0.974322,38.102198,-0.946766,-0.589103,0.5
299,-0.018306,0.001406,0.980066,-0.026745,0.001476,0.970185,-0.002713,0.001417,0.972187,37.035251,-0.961635,-0.663609,0.0


### 1.2.3. TickerMapper

Numerai Signals data APIs may work with different ticker formats. Our goal with `TickerMapper` is to map `ticker_col` to `target_ticker_format`.

In [None]:
#| export
class TickerMapper(BaseProcessor):
    """
    Map ticker from one format to another. \n
    :param ticker_col: Column used for mapping. Must already be present in the input data. \n
    :param target_ticker_format: Format to map tickers to. Must be present in the ticker map. \n
    For default mapper supported ticker formats are: ['ticker', 'bloomberg_ticker', 'yahoo'] \n
    :param mapper_path: Path to CSV file containing at least ticker_col and target_ticker_format columns. \n
    Can be either a web link of local path. Numerai Signals mapping by default.
    """

    def __init__(
        self, ticker_col: str = "ticker", target_ticker_format: str = "bloomberg_ticker",
        mapper_path: str = "https://numerai-signals-public-data.s3-us-west-2.amazonaws.com/signals_ticker_map_w_bbg.csv"
    ):
        super().__init__()
        self.ticker_col = ticker_col
        self.target_ticker_format = target_ticker_format

        self.signals_map_path = mapper_path
        self.ticker_map = pd.read_csv(self.signals_map_path)

        assert (
            self.ticker_col in self.ticker_map.columns
        ), f"Ticker column '{self.ticker_col}' is not available in ticker mapping."
        assert (
            self.target_ticker_format in self.ticker_map.columns
        ), f"Target ticker column '{self.target_ticker_format}' is not available in ticker mapping."

        self.mapping = dict(
            self.ticker_map[[self.ticker_col, self.target_ticker_format]].values
        )

    @display_processor_info
    def transform(
        self, dataf: Union[pd.DataFrame, NumerFrame], *args, **kwargs
    ) -> NumerFrame:
        dataf[self.target_ticker_format] = dataf[self.ticker_col].map(self.mapping)
        return NumerFrame(dataf)

Use default signals mapping to convert between Numerai ticker, Bloomberg ticker and Yahoo ticker formats.

In [None]:
test_dataf = pd.DataFrame(["AAPL", "MSFT"], columns=["ticker"])
mapper = TickerMapper()
mapper.transform(test_dataf)

Unnamed: 0,ticker,bloomberg_ticker
0,AAPL,AAPL US
1,MSFT,MSFT US


You can also use a CSV file for mapping. For example, the mapping Numerai user degerhan provides in [dsignals](https://github.com/degerhan/dsignals) for EOD data.

In [None]:
test_dataf = pd.DataFrame(["LLB SW", "DRAK NA", "SWB MK", "ELEKTRA* MF", "NOT_A_TICKER"], columns=["bloomberg_ticker"])
mapper = TickerMapper(ticker_col="bloomberg_ticker", target_ticker_format="signals_ticker",
                      mapper_path="test_assets/eodhd-map.csv")
mapper.transform(test_dataf)

Unnamed: 0,bloomberg_ticker,signals_ticker
0,LLB SW,LLB.SW
1,DRAK NA,DRAK.AS
2,SWB MK,5211.KLSE
3,ELEKTRA* MF,ELEKTRA.MX
4,NOT_A_TICKER,


### 1.2.4. SignalsTargetProcessor


Numerai provides [targets for 5000 stocks](https://docs.numer.ai/numerai-signals/signals-overview#universe) that are neutralized against all sorts of factors. However, it can be helpful to experiment with creating your own targets. You might want to explore different windows, different target binning and/or neutralization. `SignalsTargetProcessor` engineers 3 different targets for every given windows:
- `_raw`: Raw return based on price movements.
- `_rank`: Ranks of raw return.
- `_group`: Binned returns based on rank.

Note that Numerai provides targets based on 4-day returns and 20-day returns. While you can explore any window you like, it makes sense to start with `windows` close to these timeframes.

For the `bins` argument there are also many options possible. The followed are commonly used binning:
- Nomi bins: `[0, 0.05, 0.25, 0.75, 0.95, 1]`
- Uniform bins: `[0, 0.20, 0.40, 0.60, 0.80, 1]`

In [None]:
#| export
class SignalsTargetProcessor(BaseProcessor):
    """
    Engineer targets for Numerai Signals. \n
    More information on implements Numerai Signals targets: \n
    https://forum.numer.ai/t/decoding-the-signals-target/2501

    :param price_col: Column from which target will be derived. \n
    :param windows: Timeframes to use for engineering targets. 10 and 20-day by default. \n
    :param bins: Binning used to create group targets. Nomi binning by default. \n
    :param labels: Scaling for binned target. Must be same length as resulting bins (bins-1). Numerai labels by default.
    """

    def __init__(
        self,
        price_col: str = "close",
        windows: list = None,
        bins: list = None,
        labels: list = None,
    ):
        super().__init__()
        self.price_col = price_col
        self.windows = windows if windows else [10, 20]
        self.bins = bins if bins else [0, 0.05, 0.25, 0.75, 0.95, 1]
        self.labels = labels if labels else [0, 0.25, 0.50, 0.75, 1]

    @display_processor_info
    def transform(self, dataf: NumerFrame) -> NumerFrame:
        for window in tqdm(self.windows, desc="Signals target engineering windows"):
            dataf.loc[:, f"target_{window}d_raw"] = (
                dataf[self.price_col].pct_change(periods=window).shift(-window)
            )
            era_groups = dataf.groupby(dataf.meta.era_col)

            dataf.loc[:, f"target_{window}d_rank"] = era_groups[
                f"target_{window}d_raw"
            ].rank(pct=True, method="first")
            dataf.loc[:, f"target_{window}d_group"] = era_groups[
                f"target_{window}d_rank"
            ].transform(
                lambda group: pd.cut(
                    group, bins=self.bins, labels=self.labels, include_lowest=True
                )
            )
        return NumerFrame(dataf)

In [None]:
stp = SignalsTargetProcessor()
era_dataf.meta.era_col = "date"
new_target_dataf = stp.transform(era_dataf)
new_target_dataf.get_target_data.head(2)

Signals target engineering windows:   0%|          | 0/2 [00:00<?, ?it/s]

Unnamed: 0,target_10d_raw,target_10d_rank,target_10d_group,target_20d_raw,target_20d_rank,target_20d_group
0,0.016032,0.666667,0.5,-0.005306,0.666667,0.5
1,0.029885,1.0,1.0,0.007374,1.0,1.0


### 1.2.5. LagPreProcessor

Many models like Gradient Boosting Machines (GBMs) don't learn any time-series patterns by itself. However, if we create lags of our features the models will pick up on time dependencies between features. `LagPreProcessor` create lag features for given features and windows.

In [None]:
#| export
class LagPreProcessor(BaseProcessor):
    """
    Add lag features based on given windows.

    :param windows: All lag windows to process for all features. \n
    [5, 10, 15, 20] by default (4 weeks lookback) \n
    :param ticker_col: Column name for grouping by tickers. \n
    :param feature_names: All features for which you want to create lags. All features by default.
    """

    def __init__(
        self,
        windows: list = None,
        ticker_col: str = "bloomberg_ticker",
        feature_names: list = None,
    ):
        super().__init__()
        self.windows = windows if windows else [5, 10, 15, 20]
        self.ticker_col = ticker_col
        self.feature_names = feature_names

    @display_processor_info
    def transform(self, dataf: NumerFrame, *args, **kwargs) -> NumerFrame:
        feature_names = self.feature_names if self.feature_names else dataf.feature_cols
        ticker_groups = dataf.groupby(self.ticker_col)
        for feature in tqdm(feature_names, desc="Lag feature generation"):
            feature_group = ticker_groups[feature]
            for day in self.windows:
                shifted = feature_group.shift(day, axis=0)
                dataf.loc[:, f"{feature}_lag{day}"] = shifted
        return NumerFrame(dataf)

In [None]:
lpp = LagPreProcessor(ticker_col="ticker", feature_names=["close", "volume"])
dataf = lpp(dataf)

Lag feature generation:   0%|          | 0/2 [00:00<?, ?it/s]

All lag features will contain `lag` in the column name.

In [None]:
dataf.get_pattern_data("lag").tail(2)

Unnamed: 0,close_lag5,close_lag10,close_lag15,close_lag20,volume_lag5,volume_lag10,volume_lag15,volume_lag20
298,92.045443,94.146706,93.692671,93.266366,8499.0,8281.0,1624.0,4747.0
299,91.149458,94.065231,93.90132,92.471683,9741.0,3112.0,6851.0,8207.0


### 1.2.6. DifferencePreProcessor

After creating lags with the `LagPreProcessor`, it may be useful to create new features that calculate the difference between those lags. Through this process in `DifferencePreProcessor`, we can provide models with more time-series related patterns.

In [None]:
#| export
class DifferencePreProcessor(BaseProcessor):
    """
    Add difference features based on given windows. Run LagPreProcessor first.

    :param windows: All lag windows to process for all features. \n
    :param feature_names: All features for which you want to create differences. All features that also have lags by default. \n
    :param pct_change: Method to calculate differences. If True, will calculate differences with a percentage change. Otherwise calculates a simple difference. Defaults to False \n
    :param abs_diff: Whether to also calculate the absolute value of all differences. Defaults to True \n
    """

    def __init__(
        self,
        windows: list = None,
        feature_names: list = None,
        pct_diff: bool = False,
        abs_diff: bool = False,
    ):
        super().__init__()
        self.windows = windows if windows else [5, 10, 15, 20]
        self.feature_names = feature_names
        self.pct_diff = pct_diff
        self.abs_diff = abs_diff

    @display_processor_info
    def transform(self, dataf: NumerFrame, *args, **kwargs) -> NumerFrame:
        feature_names = self.feature_names if self.feature_names else dataf.feature_cols
        for feature in tqdm(feature_names, desc="Difference feature generation"):
            lag_columns = dataf.get_pattern_data(f"{feature}_lag").columns
            if not lag_columns.empty:
                for day in self.windows:
                    differenced_values = (
                        (dataf[feature] / dataf[f"{feature}_lag{day}"]) - 1
                        if self.pct_diff
                        else dataf[feature] - dataf[f"{feature}_lag{day}"]
                    )
                    dataf[f"{feature}_diff{day}"] = differenced_values
                    if self.abs_diff:
                        dataf[f"{feature}_absdiff{day}"] = np.abs(
                            dataf[f"{feature}_diff{day}"]
                        )
            else:
                rich_print(
                    f":warning: WARNING: Skipping {feature}. Lag features for feature: {feature} were not detected. Have you already run LagPreProcessor? :warning:"
                )
        return NumerFrame(dataf)

In [None]:
dpp = DifferencePreProcessor(
    feature_names=["close", "volume"], windows=[5, 10, 15, 20], pct_diff=True
)
dataf = dpp.transform(dataf)

Difference feature generation:   0%|          | 0/2 [00:00<?, ?it/s]

All difference features will contain `diff` in the column name.

In [None]:
dataf.get_pattern_data("diff").tail(2)

Unnamed: 0,close_diff5,close_diff10,close_diff15,close_diff20,volume_diff5,volume_diff10,volume_diff15,volume_diff20
298,-0.01155,-0.033611,-0.028928,-0.02449,-0.735498,-0.728535,0.384236,-0.526438
299,-0.004065,-0.034937,-0.033252,-0.018306,-0.751668,-0.222686,-0.646913,-0.705252


### 1.2.7. PandasTaFeatureGenerator

This generator takes in a [pandas-ta](https://github.com/twopirllc/pandas-ta) strategy and processing them on multiple cores. There is a simple default strategy available with RSI features for 14 and 60 rows.

To learn more about defining pandas-ta strategies. Check [this section of the pandas-ta README](https://github.com/twopirllc/pandas-ta#pandas-ta-strategies).

In [None]:
#| export
class PandasTaFeatureGenerator:
    """
    Generate features with pandas-ta.
    https://github.com/twopirllc/pandas-ta

    :param strategy: Valid Pandas Ta strategy. \n
    For more information on creating a strategy, see: \n
    https://github.com/twopirllc/pandas-ta#pandas-ta-strategy \n
    By default, a strategy with RSI(14) and RSI(60) is used. \n
    :param ticker_col: Column name for grouping by tickers. \n
    :param num_cores: Number of cores to use for multiprocessing. \n
    By default, all available cores are used. \n
    """
    def __init__(self, 
                 strategy: ta.Strategy = None,
                 ticker_col: str = "ticker",
                 num_cores: int = None,
    ):
        super().__init__()
        self.ticker_col = ticker_col
        self.num_cores = num_cores if num_cores else os.cpu_count()
        standard_strategy = ta.Strategy(name="standard", 
                                        ta=[{"kind": "rsi", "length": 14, "col_names": ("feature_RSI_14")},
                                            {"kind": "rsi", "length": 60, "col_names": ("feature_RSI_60")}])
        self.strategy = strategy if strategy is not None else standard_strategy

    @display_processor_info
    def transform(self, dataf: Union[pd.DataFrame, NumerFrame]) -> NumerFrame:
        """
        Main feature generation method. \n 
        :param dataf: DataFrame with columns: [ticker, date, open, high, low, close, volume] \n
        :return: DataFrame with features added.
        """
        dataf_list = [
            x
            for _, x in tqdm(
                dataf.groupby(self.ticker_col), desc="Generating ticker DataFrames"
            )
        ]
        dataf = self._generate_features(dataf_list=dataf_list)
        return NumerFrame(dataf)
    
    def _generate_features(self, dataf_list: List[pd.DataFrame]) -> pd.DataFrame:
        """
        Add features for list of ticker DataFrames and concatenate.
        :param dataf_list: List of DataFrames for each ticker.
        :return: Concatenated DataFrame for all full list with features added.
        """
        with Pool(self.num_cores) as p:
            feature_datafs = list(
                tqdm(
                    p.imap(self.add_features, dataf_list),
                    desc="Generating pandas-ta features",
                    total=len(dataf_list),
                )
            )
        return pd.concat(feature_datafs)

    def add_features(self, ticker_df: pd.DataFrame) -> pd.DataFrame:
        """ 
        The TA strategy is applied to the DataFrame here.
        :param ticker_df: DataFrame for a single ticker.
        :return: DataFrame with features added.
        """
        # We use a different multiprocessing engine so shutting off pandas_ta's multiprocessing
        ticker_df.ta.cores = 0
        ticker_df.ta.strategy(self.strategy)
        return ticker_df

In [None]:
pta = PandasTaFeatureGenerator()
new_pta_df = pta.transform(dummy_df)
new_pta_df.tail(2)

Generating ticker DataFrames:   0%|          | 0/3 [00:00<?, ?it/s]

Generating pandas-ta features:   0%|          | 0/3 [00:00<?, ?it/s]

Unnamed: 0,ticker,date,open,high,low,close,volume,friday_date,feature_RSI_14,feature_RSI_60
298,GHI.US,2020-04-08,90.932313,91.002313,90.972313,90.982313,2248,2020-04-08,38.102198,44.985589
299,GHI.US,2020-04-09,90.728894,90.798894,90.768894,90.778894,2419,2020-04-09,37.035251,44.61622


The feature data can be selected directly through a `NumerFrame` convenience method called `.get_feature_data`.

In [None]:
new_pta_df.get_feature_data.tail(2)

Unnamed: 0,feature_RSI_14,feature_RSI_60
298,38.102198,44.985589
299,37.035251,44.61622


A custom `pandas-ta` strategy can be defined as follows. Check the [pandas-ta docs](https://github.com/twopirllc/pandas-ta#indicators-by-category) for more information on available indicators and arguments.

`ta` takes in a list of dictionaries defining indicators and optional additional arguments. We use `col_names` for convenience so features are prefixed by `feature_` and can be easily retrieved within a `NumerFrame`.

In [None]:
strategy = ta.Strategy(name="mystrategy",
                       ta=[{"kind": "cmo", "col_names": ("feature_CMO")}, # Chande Momentum Oscillator
                           {"kind": "rsi", "length": 60, "col_names": ("feature_RSI_60")} # Relative Strength Index
                           ])

In [None]:
pta = PandasTaFeatureGenerator(strategy=strategy)
new_pta_df = pta.transform(dummy_df)
new_pta_df.get_feature_data.tail(5)

Generating ticker DataFrames:   0%|          | 0/3 [00:00<?, ?it/s]

Generating pandas-ta features:   0%|          | 0/3 [00:00<?, ?it/s]

Unnamed: 0,feature_CMO,feature_RSI_60
295,-43.025817,42.903224
296,-46.13618,42.191717
297,-30.607037,44.09839
298,-23.795604,44.985589
299,-25.929498,44.61622


## 2. Custom preprocessors

There are an almost unlimited number of ways to preprocess (selection, engineering and manipulation). We have only scratched the surface with the preprocessors currently implemented. We invite the Numerai community to develop Numerai Classic and Numerai Signals preprocessors.

A new Preprocessor should inherit from `BaseProcessor` and implement a `transform` method. For efficient implementation, we recommend you use `NumerFrame` functionality for preprocessing. You can also support Pandas DataFrame input as long as the `transform` method returns a `NumerFrame`. This ensures that the Preprocessor still works within a full `numerai-blocks` pipeline. A template for new preprocessors is given below.

To enable fancy logging output. Add the `@display_processor_info` decorator to the `transform` method.

In [None]:
#| export
class AwesomePreProcessor(BaseProcessor):
    """ TEMPLATE - Do some awesome preprocessing. """
    def __init__(self):
        super().__init__()

    @display_processor_info
    def transform(self, dataf: NumerFrame, *args, **kwargs) -> NumerFrame:
        # Do processing
        ...
        # Parse all contents of NumerFrame to the next pipeline step
        return NumerFrame(dataf)

-------------------------------------------