# How to Integrate Any Data Transformation Step into Sklearn Pipelines With Custom Transformers
## Let's do everything in Sklearn
![](images/unsplash.jpg)
<figcaption style="text-align: center;">
    <strong>
        Photo by 
        <a href='https://unsplash.com/@tetrakiss?utm_source=unsplash&utm_medium=referral&utm_content=creditCopyText'>Arseny Togulev</a>
        on 
        <a href='https://unsplash.com/s/photos/transformer?utm_source=unsplash&utm_medium=referral&utm_content=creditCopyText'>Unsplash.</a> All images are by the author unless specified otherwise.
    </strong>
</figcaption>

# Setup

In [32]:
import logging
import time
import warnings

import catboost as cb
import joblib
import lightgbm as lgbm
import matplotlib.pyplot as plt
import numpy as np
import optuna
import pandas as pd
import seaborn as sns
import xgboost as xgb
from optuna.samplers import TPESampler
from sklearn.compose import (
    ColumnTransformer,
    make_column_selector,
    make_column_transformer,
)
from sklearn.impute import SimpleImputer
from sklearn.metrics import log_loss, mean_squared_error
from sklearn.model_selection import (
    KFold,
    StratifiedKFold,
    cross_validate,
    train_test_split,
)
from sklearn.pipeline import Pipeline, make_pipeline
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder

logging.basicConfig(
    format="%(asctime)s - %(message)s", datefmt="%d-%b-%y %H:%M:%S", level=logging.INFO
)
optuna.logging.set_verbosity(optuna.logging.WARNING)
warnings.filterwarnings("ignore")

# Introduction

Single `fit`, single `predict` - how awesome would that be?

You get the data, fit your pipeline just one time and it takes care of everything - preprocessing, feature engineering, modeling, everything. All you have to do is call predict and have the output. 

What kind of pipeline is *that* powerful? Yes, Sklearn has many transformers but it doesn't have one for every imaginable preprocessing scenario. So, is such a pipeline a *pipe* dream?

Absolutely not. Today, we will learn how to create custom Sklearn transformers that enable you to integrate virtually any function or data transformation into Sklearn's Pipeline class.

# What are Sklearn pipelines?

Below is a simple pipeline that imputes the missing values in numeric data, scales them and fits an XGBRegressor to `X`, `y`:

```python
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
import xgboost as xgb

xgb_pipe = make_pipeline(
                SimpleImputer(strategy='mean'),
                StandardScaler(),
                xgb.XGBRegressor()
            )

_ = xgb_pipe.fit(X, y)
```

I have talked at length about the nitty-gritty of Sklearn pipelines and their benefits in an [older post](https://towardsdatascience.com/how-to-use-sklearn-pipelines-for-ridiculously-neat-code-a61ab66ca90d). Most notable advantages are their ability to collapse all preprocessing and modeling steps into a singe estimator, preventing data leakage by never calling `fit` on validation sets and an added bonus that makes the code concise, reproducible and modular. 

But this whole idea of atomic, neat pipelines breaks when we need to perform operations that are not built into Sklearn as estimators. For example, what if you need to extract regex patterns to clean text data? What do you do if you want to create a new feature combining existing ones based on domain knowledge?

To preserve all the benefits that come with pipelines, you need a way to integrate your custom preprocessing and feature engineering logic into Sklearn. That's where custom transformers come into play.

# Integrating simple functions with `FunctionTransformer`

In this month's (September) TPS Competition on Kaggle, one of the ideas that boosted model performance significantly was adding the number of missing values in a row as a new feature. This is a custom operation, not implemented in Sklearn, so let's create a function to achieve that after importing the data:

In [44]:
tps_df = pd.read_csv("data/train.csv")
tps_df.head()

Unnamed: 0,id,f1,f2,f3,f4,f5,f6,f7,f8,f9,...,f110,f111,f112,f113,f114,f115,f116,f117,f118,claim
0,0,0.10859,0.004314,-37.566,0.017364,0.28915,-10.251,135.12,168900.0,399240000000000.0,...,-12.228,1.7482,1.9096,-7.1157,4378.8,1.2096,861340000000000.0,140.1,1.0177,1
1,1,0.1009,0.29961,11822.0,0.2765,0.4597,-0.83733,1721.9,119810.0,3874100000000000.0,...,-56.758,4.1684,0.34808,4.142,913.23,1.2464,7575100000000000.0,1861.0,0.28359,0
2,2,0.17803,-0.00698,907.27,0.27214,0.45948,0.17327,2298.0,360650.0,12245000000000.0,...,-5.7688,1.2042,0.2629,8.1312,45119.0,1.1764,321810000000000.0,3838.2,0.4069,1
3,3,0.15236,0.007259,780.1,0.025179,0.51947,7.4914,112.51,259490.0,77814000000000.0,...,-34.858,2.0694,0.79631,-16.336,4952.4,1.1784,4533000000000.0,4889.1,0.51486,1
4,4,0.11623,0.5029,-109.15,0.29791,0.3449,-0.40932,2538.9,65332.0,1907200000000000.0,...,-13.641,1.5298,1.1464,-0.43124,3856.5,1.483,-8991300000000.0,,0.23049,1


In [45]:
tps_df.shape

(957919, 120)

In [6]:
# Find the number of missing values across rows
tps_df.isnull().sum(axis=1)

0         1
1         0
2         5
3         2
4         8
         ..
957914    0
957915    4
957916    0
957917    1
957918    4
Length: 957919, dtype: int64

Let's create a function that takes a DataFrame as an input and implements the above operation:

In [52]:
def num_missing_row(X: pd.DataFrame, y=None):
    # Calculate some metrics across rows
    num_missing = X.isnull().sum(axis=1)
    num_missing_std = X.isnull().std(axis=1)

    # Add the above series as a new feature to the df
    X["#missing"] = num_missing
    X["num_missing_std"] = num_missing_std

    return X

Now, adding this function into a pipeline is just as easy as passing it to the `FunctionTransformer`:

In [53]:
from sklearn.preprocessing import FunctionTransformer

num_missing_estimator = FunctionTransformer(num_missing_row)

Passing a custom function to `FunctionTransformer` creates an estimator with `fit`, `transform` and `fit_transform` methods:

In [54]:
# Check number of columns before
print(f"Number of features before preprocessing: {len(tps_df.columns)}")

# Apply the custom estimator
tps_df = num_missing_estimator.transform(tps_df)
print(f"Number of features after preprocessing: {len(tps_df.columns)}")

Number of features before preprocessing: 120
Number of features after preprocessing: 122


Since we have a simple function, no need to call `fit` as it just returns the estimator untouched. The only requirement of `FunctionTransformer` is that the passed function should accept the data as its first argument.
Optionally, you can pass the target array as well if you need it inside the function:

```python
# FunctionTransformer signature
def custom_function(X, y=None):
    ...

estimator = FunctionTransformer(custom_function)  # no errors

custom_pipeline = make_pipeline(StandardScaler(), estimator, xgb.XGBRegressor())
custom_pipeline.fit(X, y)
```

`FunctionTransformer` also accepts an inverse of the passed function if you ever need to revert the changes:

In [55]:
def custom_function(X, y=None):
    ...


def inverse_of_custom(X, y=None):
    ...


estimator = FunctionTransformer(func=custom_function, inverse_func=inverse_of_custom)

Check out the [documentation](https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.FunctionTransformer.html) for details on other arguments.

# Integrating more complex preprocessing steps with custom transformers

> This section assumes some knowledge of Python object-oriented programming (OOP). Specifically, the basics of creating classes and inheritance. If you are not already down with those, check out my [Python OOP series](https://ibexorigin.medium.com/list/objectoriented-programming-essentials-for-data-scientists-cf2ff3dc9fc9?source=user_lists---------1-------cf2ff3dc9fc9---------------------), written for data scientists.

One of the most common scaling options for skewed data is a logarithmic transform. But here is a caveat: if a feature has even a single 0, the transformation with `np.log` or Sklearn's `PowerTransformer` return an error. 

So, as a workaround, Kagglers add 1 to all samples and then apply the transformation. If the transformation is performed on the target array, you will also need an inverse transform. For that, after making predictions, you need to use the exponential function and subtract 1. Here is what it looks like in code:

```python
y_transformed = np.log(y + 1)

_ = model.fit(X, y_transformed)
preds = np.exp(model.predict(X, y_transformed) - 1)
```

This works but we have the same old problem - we can't include this into a pipeline out of the box. Sure, we could use our new-found friend `FunctionTransformer` but, it is not well-suited for more complex preprocessing steps such as this. 

Instead, we will implement a custom transformer class and write the `fit`, `transform` functions manually. In the end, we will again have an Sklearn-compatible estimator that we can pass into a pipeline. Let's start:

In [56]:
from sklearn.base import BaseEstimator, TransformerMixin


class CustomLogTransformer(BaseEstimator, TransformerMixin):
    pass

We first create a class that inherits from `BaseEstimator` and `TransformerMixin` classes of `sklearn.base`. Inheriting from these classes allows Sklearn pipelines to recognize our classes as custom estimators. 

Then, we will implement the `__init__` method, where we just initialize an instance of `PowerTransformer`:

In [57]:
from sklearn.preprocessing import PowerTransformer


class CustomLogTransformer(BaseEstimator, TransformerMixin):
    def __init__(self):
        self._estimator = PowerTransformer()

Next, we write the `fit` where we add 1 to all features in the data and fit the PowerTransformer:

In [58]:
class CustomLogTransformer(BaseEstimator, TransformerMixin):
    def __init__(self):
        self._estimator = PowerTransformer()

    def fit(self, X, y=None):
        X_copy = np.copy(X) + 1
        self._estimator.fit(X_copy)

        return self

The `fit` method should return the transformer itself, which is done by returning `self`. Let's test what we have done so far:

In [59]:
custom_log = CustomLogTransformer()
custom_log.fit(tps_df)

CustomLogTransformer()

Next, we have the `transform`, in which we just use the `transform` method of PowerTransformer after adding 1 to the passed data:

In [60]:
class CustomLogTransformer(BaseEstimator, TransformerMixin):
    def __init__(self):
        self._estimator = PowerTransformer()

    def fit(self, X, y=None):
        X_copy = np.copy(X) + 1
        self._estimator.fit(X_copy)

        return self

    def transform(self, X):
        X_copy = np.copy(X) + 1

        return self._estimator.transform(X_copy)

Let's make another check:

In [22]:
custom_log = CustomLogTransformer()
custom_log.fit(tps_df)

transformed_tps = custom_log.transform(tps_df)

In [27]:
transformed_tps[:5, :5]

array([[ 0.48908946, -2.17126787, -1.79124946, -0.52828469,         nan],
       [ 0.38660665, -0.29384644,  1.31313666,  0.1901713 , -0.34236997],
       [-0.04286469, -0.05047097, -1.16463754,  0.95459266,  1.71830766],
       [-0.584329  , -1.5743182 , -1.02444525, -0.15117546,  0.46952437],
       [-0.87027925, -0.13045462, -0.10489176, -0.36806683,  1.21317668]])

Working as expected. Now, as I said earlier, we need a method for reverting the transform:

In [61]:
class CustomLogTransformer(BaseEstimator, TransformerMixin):
    def __init__(self):
        self._estimator = PowerTransformer()

    def fit(self, X, y=None):
        X_copy = np.copy(X) + 1
        self._estimator.fit(X_copy)

        return self

    def transform(self, X):
        X_copy = np.copy(X) + 1

        return self._estimator.transform(X_copy)

    def inverse_transform(self, X):
        X_reversed = self._estimator.inverse_transform(np.copy(X))

        return X_reversed - 1

We also could have used `np.exp` instead of `inverse_transform`. Now, let's make a final check:

In [33]:
custom_log = CustomLogTransformer()

tps_transformed = custom_log.fit_transform(tps_df)
tps_inversed = custom_log.inverse_transform(tps_transformed)

But wait! We didn't write `fit_transform` - where did that come from?

It is simple - when you inherit from `BaseEstimator` and `TransformerMixin`, you get a `fit_transform` method for free. After the inverse transform, you can compare it with the original data:

In [41]:
tps_df.values[:5, 5]

array([0.35275, 0.17725, 0.25997, 0.4293 , 0.34079])

In [42]:
tps_inversed[:5, 5]

array([0.35275, 0.17725, 0.25997, 0.4293 , 0.34079])

Now, we have a custom transformer ready to be included in a pipeline. Let's put everything together:

In [72]:
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import train_test_split
from sklearn.pipeline import make_pipeline

xgb_pipe = make_pipeline(
    FunctionTransformer(num_missing_row),
    SimpleImputer(strategy="constant", fill_value=-99999),
    CustomLogTransformer(),
    xgb.XGBClassifier(
        n_estimators=1000, tree_method="gpu_hist", objective="binary:logistic"
    ),
)

X, y = tps_df.drop("claim", axis=1), tps_df[["claim"]].values.flatten()
split = train_test_split(X, y, test_size=0.33, random_state=1121218)
X_train, X_test, y_train, y_test = split

In [73]:
xgb_pipe.fit(X_train, y_train)
preds = xgb_pipe.predict_proba(X_test)

roc_auc_score(y_test, preds[:, 1])



0.7986831816726399

Even though log transform actually hurt the score, we got our custom pipeline working!

In short, the signature of your custom transformer class should be like this:

In [74]:
class CustomTransformer(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass

    def fit(self):
        pass

    def transform(self):
        pass

    def inverse_transform(self):
        pass

This way, you get `fit_transform` for free. If you don't need any of `__init__`, `fit`, `transform` or `inverse_transform` methods, omit them and the parent Sklearn classes take care of everything. The logic of these methods are entirely up to your coding skills and needs.

# Wrapping up...

Writing good code is a skill developed over time. You will realize that a big part of it comes from using the existing tools and libraries at the right time and place, without having to reinvent the wheel.

One of such tools is Sklearn pipelines and custom transformers are just extensions of them. Use them well and you will produce quality code with little effort.