# Pipelines: Or How I Learned to Stop Duplicating Work and Love the Convenience of Storing Multiple Steps in a Single Object

Original author: [Cristian E. Nuno](https://github.com/cenuno)

## Load necessary modules

In [1]:
import numpy as np
import pandas as pd
import random
from typing import List, Optional
from sklearn.base import BaseEstimator
from sklearn.compose import ColumnTransformer
from sklearn.datasets import load_iris
from sklearn.metrics.classification import classification_report
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.tree import DecisionTreeClassifier

## Load necessary data

In [2]:
# load iris data set
iris = load_iris()

# load feature matrix
X = iris["data"]
# load target vector
y = iris["target"]

# standardize feature names spelling and casing
feature_names = [col.replace(" ", "_").replace("_(cm)", "") 
                 for col in (iris["feature_names"] + ["species"])]

# transform X and y into data frame
iris_df = pd.DataFrame(np.column_stack((X, y.reshape(-1, 1)))
                       , columns=feature_names)

# convert species from numeric to categorical
class_names = {0.0: "setosa", 1.0: "versicolor", 2.0: "virginica"}
iris_df["species"] = iris_df["species"].map(class_names)

# show first few records
iris_df.head()

Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width,species
0,5.1,3.5,1.4,0.2,setosa
1,4.9,3.0,1.4,0.2,setosa
2,4.7,3.2,1.3,0.2,setosa
3,4.6,3.1,1.5,0.2,setosa
4,5.0,3.6,1.4,0.2,setosa


### For educational purposes only, let's replace some values with `NaN`

In [3]:
# initialize random number generator
random.seed(2019)

# generate list of random integers 
random_ints = [random.randrange(0, len(iris_df)) for _ in range(20)]

# for these random index values, replace their real values with NaN
iris_df.loc[random_ints, "sepal_length"] = np.nan
iris_df.loc[random_ints, "sepal_width"] = np.nan

# manually force a few "sepal_width" value to also be NaN
iris_df.loc[0:25, "sepal_width"] = np.nan

iris_df.head(10)

Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width,species
0,5.1,,1.4,0.2,setosa
1,4.9,,1.4,0.2,setosa
2,4.7,,1.3,0.2,setosa
3,4.6,,1.5,0.2,setosa
4,5.0,,1.4,0.2,setosa
5,5.4,,1.7,0.4,setosa
6,4.6,,1.4,0.3,setosa
7,5.0,,1.5,0.2,setosa
8,,,1.4,0.2,setosa
9,4.9,,1.5,0.1,setosa


## Split `iris_df` into training and testing sets

In [4]:
X_train, X_test, y_train, y_test = train_test_split(iris_df.drop("species", axis=1),
                                                    iris_df["species"],
                                                    test_size=0.3,
                                                    random_state=2019)

## Prepare data for modeling by cleaning it

1. Flag which records contain `NaN` values in the `sepal_length` and `sepal_width` features;

In [5]:
def is_missing(x: float) -> int:
    """Identifies if an element is missing
    Args:
        - x (float): element from a series
    Returns:
        int: 1 if the element is missing; 0 otherwise
    """
    if np.isnan(x):
        return 1
    else:
        return 0

In [6]:
X_train["sepal_length_missing"] = X_train["sepal_length"].apply(is_missing)
X_train["sepal_width_missing"] = X_train["sepal_width"].apply(is_missing)
X_train.head(14)

Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width,sepal_length_missing,sepal_width_missing
73,6.1,2.8,4.7,1.2,0,0
30,4.8,3.1,1.6,0.2,0,0
98,5.1,2.5,3.0,1.1,0,0
36,5.5,3.5,1.3,0.2,0,0
1,4.9,,1.4,0.2,0,1
149,5.9,3.0,5.1,1.8,0,0
4,5.0,,1.4,0.2,0,1
100,6.3,3.3,6.0,2.5,0,0
64,5.6,2.9,3.6,1.3,0,0
135,7.7,3.0,6.1,2.3,0,0


2. Impute the median value for `NaN` values in the `sepal_length` and `sepal_width` features

In [7]:
print(f"Median value for sepal length from X_train: {X_train['sepal_length'].median()}")
print(f"Median value for sepal width from X_train: {X_train['sepal_width'].median()}")

Median value for sepal length from X_train: 5.7
Median value for sepal width from X_train: 3.0


In [8]:
imp = SimpleImputer(missing_values=np.nan, strategy="median")
imp.fit(X_train)
X_train_clean = pd.DataFrame(data=imp.transform(X_train),
                             columns=X_train.columns,
                             index=X_train.index)
X_train_clean.head(14)

Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width,sepal_length_missing,sepal_width_missing
73,6.1,2.8,4.7,1.2,0.0,0.0
30,4.8,3.1,1.6,0.2,0.0,0.0
98,5.1,2.5,3.0,1.1,0.0,0.0
36,5.5,3.5,1.3,0.2,0.0,0.0
1,4.9,3.0,1.4,0.2,0.0,1.0
149,5.9,3.0,5.1,1.8,0.0,0.0
4,5.0,3.0,1.4,0.2,0.0,1.0
100,6.3,3.3,6.0,2.5,0.0,0.0
64,5.6,2.9,3.6,1.3,0.0,0.0
135,7.7,3.0,6.1,2.3,0.0,0.0


As we can see from above, we've already satisified the 3rd piece of logic, which was:

3. Return all other columns - `petal_length` & `petal_width` - as is.


## Now let's fit our data to a model and see how it does

In [10]:
# build classifier
dt_clf = DecisionTreeClassifier(random_state=2019,
                                criterion="gini")

dt_clf.fit(X_train_clean, y_train)

# print(classification_report(y_test, dt_clf.predict(X_test)))

DecisionTreeClassifier(class_weight=None, criterion='gini', max_depth=None,
                       max_features=None, max_leaf_nodes=None,
                       min_impurity_decrease=0.0, min_impurity_split=None,
                       min_samples_leaf=1, min_samples_split=2,
                       min_weight_fraction_leaf=0.0, presort=False,
                       random_state=2019, splitter='best')

### What happened?

Calling `dt_clf.predict()` requires that `X_test` be cleaned/processed in the same as the data that `dt_clf` was fit on (e.g. `X_train_clean`).

#### Brute force

One way to resolve this is by copying and pasting our logic and swapping out `X_train_clean` for `X_test`

```python
# flag missing values
X_test["sepal_length_missing"] = X_test["sepal_length"].apply(is_missing)
X_test["sepal_width_missing"] = X_test["sepal_width"].apply(is_missing)

# impute the median for missing values
X_test_clean = pd.DataFrame(data=imp.transform(X_test),
                             columns=X_test.columns,
                             index=X_test.index)

# see how well the model does
print(classification_report(y_test, dt_clf.predict(X_test_clean)))
```

The flaw with this strategy is that your code starts to have repetition in that you are duplicating logic (a violation of the DRY principle).

#### Instead, wrap your logic into smaller functions

By breaking the cleaning steps into smaller functions, you make it easier for your future self to debug and maintain existing code.

In [11]:
def make_missing_flags(df: pd.DataFrame, col_names: List[str]) -> pd.DataFrame:
    """Create a new column that flags which elements are missing
    Args:
        - df: train/test data frame
        - col_names: column name
    Returns:
        Data frame with the newly added missing flag columns
    """
    for col_name in col_names:
        df[f"{col_name}_missing"] = df[f"{col_name}"].apply(is_missing)
        
    return df
    
def impute_missing_values(df: pd.DataFrame, imp: SimpleImputer) -> pd.DataFrame:
    """Impute missing values with the median for a data frame
    Args:
        - df:  a data frame
        - imp: a SimpleImputer object
    Returns:
        Data frame with imputed values for those that were missing
    """
    imputed_df = pd.DataFrame(data=imp.transform(df),
                              columns=df.columns,
                              index=df.index)
    return imputed_df

#### Once your smaller functions are done, you can then place them into one larger function

In [12]:
def clean_data(train_df: pd.DataFrame,
               col_names: List[str],
               test_df: Optional[pd.DataFrame] = None) -> pd.DataFrame:
    """Cleans either the train or test set
    Args:
        - train_df:  training data frame
        - col_names: list of column names that contain missing values
        - test_df:   testing data frame
    Return:
        if train_df is supplied but not test_df, cleaned train_df will be returned;
        if both train and test df are supplied, cleaned test_df will be returned
    """
    # create an imputer object
    imp = SimpleImputer(missing_values=np.nan, strategy="median")
    # fit the imputer to the training set
    _ = imp.fit(train_df)
    
    
    # make copy of the input df
    if test_df is None:
        copy_df = train_df.copy()
    elif test_df is not None:
        copy_df = test_df.copy()
    
    # flag missing values
    clean_df = make_missing_flags(df=copy_df,
                                  col_names=col_names)
    
    # replace missing values with the median
    clean_df = impute_missing_values(df=clean_df,
                                     imp=imp)
    
    return clean_df

### Let's use our preprocessor  function to clean `X_test` and then retry evaluating our model

In [13]:
X_test_clean = clean_data(train_df=X_train,
                          col_names=["sepal_length", "sepal_width"],
                          test_df=X_test)

print(classification_report(y_test, dt_clf.predict(X_test_clean)))

              precision    recall  f1-score   support

      setosa       1.00      1.00      1.00        19
  versicolor       0.90      0.90      0.90        10
   virginica       0.94      0.94      0.94        16

    accuracy                           0.96        45
   macro avg       0.95      0.95      0.95        45
weighted avg       0.96      0.96      0.96        45



## Introducing `sklearn.pipeline.Pipeline()` object

![kid transformer](visuals/transformer.gif)

### Definition of a pipeline

We'll store these steps in a `Pipeline` object. From the [docs](https://scikit-learn.org/stable/modules/generated/sklearn.pipeline.Pipeline.html#sklearn-pipeline-pipeline):

> The purpose of the pipeline is to assemble several steps that can be cross-validated together while setting different parameters. [The `Pipeline` object] sequentially applies a list of transforms and a final estimator. Intermediate steps of the pipeline must be ‘transforms’ (i.e. transformers), that is, they must implement fit and transform methods. 

### Broad generalization of a pipeline

The key here is we need to specify a specific column, pass it's "transformer" (i.e. `SimpleImputer`, `OneHotEncoder`, `StandardScaler`), and determine if the transformation belongs in it's own new column or if it's more appropriate for the transformed column to overwrite the input column.

### Benefits of the `Pipeline`

From the [User Guide](https://scikit-learn.org/stable/modules/compose.html#pipeline-chaining-estimators):
> * **Convenience and encapsulation**
>     + You only have to call fit and predict once on your data to fit a whole sequence of estimators.
> * **Joint parameter selection**
>     + You can grid search over parameters of all estimators in the pipeline at once.
> * **Safety**
>     + Pipelines help avoid leaking statistics from your test data into the trained model in cross-validation, by ensuring that the same samples are used to train the transformers and predictors. 

Here, we'll create a few `Pipeline` objects to do the following:

* flag which records contain `NaN` values in the `sepal_length` and `sepal_width` features;
* impute the median value for `NaN` values in the `sepal_length` and `sepal_width` features; and
* return all other columns - `petal_length` & `petal_width` - as is

### Create custom transformer that identifies records that have `NaN` values

Sometimes we need to add new features to our existing feature space. In this case, we can't rely on importing a traditional transformer (i.e. `StandardScaler`, `OneHotEncoder`, etc.).

Instead, we'll need to create our own custom transformer. We will do this by creating a new class that implements both `.fit()` and `.transform()` methods. 

*Shoutout to [Sebastian Raschka](https://sebastianraschka.com/) for helping me out on [Twitter](https://twitter.com/cenuno_/status/1179855832374099968) to figure this out!*

In [15]:
class IsMissing(BaseEstimator):
    """Creates a new column flagging if any values from one column are missing
    
    Note: this class will be used inside a scikit-learn Pipeline
    
    Attributes:
        col_name (str): name of a column
        
    Methods:
        _is_missing(): returns 1 if record contains NaN value; 0 if else
        
        fit(): fit all the transformers one after the other 
               then fit the transformed data using the final estimator
               
        transform(): apply transformers, and transform with the final estimator
    """
    
    def __init__(self, col_name):
        self.col_name = col_name
    
    
    def fit(self, X, y=None):
        return self
    
    
    def _is_missing(self, X):
        """Flag if a record has a NaN value"""
        if pd.isna(X):
            return 1
        else:
            return 0
    
    
    def transform(self, X, y=None):
        """Copies X and creates a new column before returning X_new"""
        new_col = self.col_name + "_missing"
        X_new = X.copy()
        X_new[new_col] = X_new[self.col_name].apply(self._is_missing)
        return X_new
    
    

## Create first `Pipeline`

In [16]:
missing_mapper = Pipeline(steps=[
    ("missing_sl", IsMissing(col_name="sepal_length")),
    ("missing_sw", IsMissing(col_name="sepal_width"))
])

## Inspect results

In [17]:
missing_mapper.fit(X_train).transform(X_train).head(15)

Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width,sepal_length_missing,sepal_width_missing
73,6.1,2.8,4.7,1.2,0,0
30,4.8,3.1,1.6,0.2,0,0
98,5.1,2.5,3.0,1.1,0,0
36,5.5,3.5,1.3,0.2,0,0
1,4.9,,1.4,0.2,0,1
149,5.9,3.0,5.1,1.8,0,0
4,5.0,,1.4,0.2,0,1
100,6.3,3.3,6.0,2.5,0,0
64,5.6,2.9,3.6,1.3,0,0
135,7.7,3.0,6.1,2.3,0,0


## Create a pipeline to impute the median values for `sepal_length` and `sepal_width`

In [18]:
print(f"Median value for sepal length from X_train: {X_train['sepal_length'].median()}")
print(f"Median value for sepal width from X_trani: {X_train['sepal_width'].median()}")

Median value for sepal length from X_train: 5.7
Median value for sepal width from X_trani: 3.0


In [19]:
impute_mapper = ColumnTransformer(transformers=[
    ("impute", SimpleImputer(missing_values=np.nan, strategy="median"), ['sepal_length', 'sepal_width'])
],
                      remainder="passthrough")

_Note: we're setting `remainder` to "passthrough" so that all non `sepal_length` and `sepal_width` columns are returned without any transformations_

In [20]:
impute_mapper.fit(X_train).transform(X_train)[0:15]

array([[6.1, 2.8, 4.7, 1.2, 0. , 0. ],
       [4.8, 3.1, 1.6, 0.2, 0. , 0. ],
       [5.1, 2.5, 3. , 1.1, 0. , 0. ],
       [5.5, 3.5, 1.3, 0.2, 0. , 0. ],
       [4.9, 3. , 1.4, 0.2, 0. , 1. ],
       [5.9, 3. , 5.1, 1.8, 0. , 0. ],
       [5. , 3. , 1.4, 0.2, 0. , 1. ],
       [6.3, 3.3, 6. , 2.5, 0. , 0. ],
       [5.6, 2.9, 3.6, 1.3, 0. , 0. ],
       [7.7, 3. , 6.1, 2.3, 0. , 0. ],
       [5.4, 3. , 1.7, 0.4, 0. , 1. ],
       [5.3, 3.7, 1.5, 0.2, 0. , 0. ],
       [6.4, 2.8, 5.6, 2.2, 0. , 0. ],
       [5.7, 3. , 1.5, 0.2, 1. , 1. ],
       [5.7, 3. , 3.9, 1.1, 1. , 1. ]])

From above, notice the 14 and 15th records: they were previously `NaN`. After using the `SimpleImputer` transformer, the `NaN` values were replaced with the median values of `sepal_length` and `sepal_width`.

## Combine the two pipelines into one

In [21]:
data_prep_mapper = Pipeline(steps=[
    ("missing", missing_mapper),
    ("impute", impute_mapper)
])

In [22]:
data_prep_mapper.fit(X_train).transform(X_train)[0:15]

array([[6.1, 2.8, 4.7, 1.2, 0. , 0. ],
       [4.8, 3.1, 1.6, 0.2, 0. , 0. ],
       [5.1, 2.5, 3. , 1.1, 0. , 0. ],
       [5.5, 3.5, 1.3, 0.2, 0. , 0. ],
       [4.9, 3. , 1.4, 0.2, 0. , 1. ],
       [5.9, 3. , 5.1, 1.8, 0. , 0. ],
       [5. , 3. , 1.4, 0.2, 0. , 1. ],
       [6.3, 3.3, 6. , 2.5, 0. , 0. ],
       [5.6, 2.9, 3.6, 1.3, 0. , 0. ],
       [7.7, 3. , 6.1, 2.3, 0. , 0. ],
       [5.4, 3. , 1.7, 0.4, 0. , 1. ],
       [5.3, 3.7, 1.5, 0.2, 0. , 0. ],
       [6.4, 2.8, 5.6, 2.2, 0. , 0. ],
       [5.7, 3. , 1.5, 0.2, 1. , 1. ],
       [5.7, 3. , 3.9, 1.1, 1. , 1. ]])

## Now let's take this up by perfoming all the preprocessing steps prior to building a `DecisionTreeClassifier` model

This model will help us classify which species a flower is from `X_test`

In [23]:
# build classifier
dt_clf = DecisionTreeClassifier(random_state=2019,
                                min_samples_leaf=30,
                                criterion="gini",
                                min_samples_split=2)

In [24]:
# build pipeline
pipe = Pipeline(steps=[
    ("dataprep", data_prep_mapper),
    ("model", dt_clf)
])

Fit `X_train` and `y_train` onto the `pipe` object

In [25]:
pipe.fit(X_train, y_train)

Pipeline(memory=None,
         steps=[('dataprep',
                 Pipeline(memory=None,
                          steps=[('missing',
                                  Pipeline(memory=None,
                                           steps=[('missing_sl',
                                                   IsMissing(col_name='sepal_length')),
                                                  ('missing_sw',
                                                   IsMissing(col_name='sepal_width'))],
                                           verbose=False)),
                                 ('impute',
                                  ColumnTransformer(n_jobs=None,
                                                    remainder='passthrough',
                                                    sparse_threshold=0.3,
                                                    transformer_weights=None,
                                                    transformers=[('i...
                                

Use the `pipe` object to make predictions on `X_test`

In [26]:
y_pred = pipe.predict(X_test)

In [27]:
print(classification_report(y_test, y_pred))

              precision    recall  f1-score   support

      setosa       1.00      1.00      1.00        19
  versicolor       0.90      0.90      0.90        10
   virginica       0.94      0.94      0.94        16

    accuracy                           0.96        45
   macro avg       0.95      0.95      0.95        45
weighted avg       0.96      0.96      0.96        45



## Conclusion

You now know how to take advantage of `Pipeline` objects to stop duplicating your preprocessing steps in the training and testing sets. I hope this helps you take advantage of yet another module built into `scikit-learn` to help improve your machine learning workflow!