# 🚝 Pipelines

<img src="https://i.imgur.com/3BcD2Tf.png"/>

### 🤔 Problem Statement:

- Cross-validation comes with data leakage if its applied on the model only (fitting preprocessors on whole data) which can make it inaccurate:
- What's the problem with the following code:

In [25]:
import numpy as np
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.svm import SVC
from sklearn.preprocessing import StandardScaler

# 1. Load dataset
x_data, y_data = load_iris(return_X_y=True)

# 2. Scale the features using StandardScaler
scaler = StandardScaler()
x_data_scaled = scaler.fit_transform(x_data)

# 3. SVM classifier
svm_classifier = SVC(kernel='linear')

# 4. Perform cross-validation on the training set
scores = cross_val_score(svm_classifier, x_data_scaled, y_data, cv=5)
print("Mean CV accuracy:", np.mean(scores))

Mean CV accuracy: 0.9666666666666668



- Hyperparameter search limited to single models but specific combinations of hyperparameters from different stages in the pipeline may result in greater performance

- Need to encapsulate different stages for deployment nonetheless

### Basic Setup

In [26]:
from sklearn.datasets import load_iris
from sklearn.feature_selection import SequentialFeatureSelector
from sklearn.naive_bayes import GaussianNB
from sklearn.pipeline import make_pipeline
from sklearn.metrics import accuracy_score

x_data, y_data = load_iris(return_X_y=True)
x_train, x_test, y_train, y_test = train_test_split(x_data, y_data, test_size=0.2, random_state=30)

### 🧑‍🌾 Rookie Approach

In [27]:
###> Training
# 1. Standardize features
ss = StandardScaler(with_mean=True, with_std=True)
x_train_scaled = ss.fit_transform(x_train)

# 2. Feature selection using Sequential Feature Selector
gnb = GaussianNB(priors=[1/3, 1/3, 1/3], var_smoothing=0.3)
sfs = SequentialFeatureSelector(gnb, n_features_to_select=3, cv=2)
x_train_selected = sfs.fit_transform(x_train_scaled, y_train)

# 3. Train the model
gnb.fit(x_train_selected, y_train)

###> Testing
# 1. Transform test data
x_test_scaled = ss.transform(x_test)
x_test_selected = sfs.transform(x_test_scaled)
#2.  Make predictions
y_pred = gnb.predict(x_test_selected)

# Evaluate the model
accuracy = accuracy_score(y_test, y_pred)
print("Accuracy:", accuracy)

Accuracy: 0.9333333333333333


### 😎 ML Engineer Approach

#### Make the pipeline!


In [28]:
ss = StandardScaler(with_mean=True, with_std=True)
gnb = GaussianNB(priors=[1/3, 1/3, 1/3], var_smoothing=0.3)
sfs = SequentialFeatureSelector(gnb, n_features_to_select=3, cv=2)

pipe = make_pipeline(ss, sfs, gnb)          # equivalent to pipe = Pipeline([("standardscaler", ss), ("sequentialfeatureselector", sfs), ("gaussiannb", gnb)])
pipe

#### Train and predict as if it is a single model (or even cross-validate)!

In [29]:
pipe.fit(x_train, y_train)
y_pred = pipe.predict(x_test)
print(accuracy_score(y_test, y_pred))

0.9333333333333333


In [30]:
scores = cross_val_score(pipe, x_data, y_data, cv=5)      
print("Mean CV accuracy:", np.mean(scores))

Mean CV accuracy: 0.9533333333333334


**Pipeline in Detail:**

Recall that estimators in Scikit either support a `fit-transform` API (e.g., unsupervised models, preprocessors, etc.) or a `fit-predict` API (supervised classification and regression).
- A pipeline is a sequence of $N$ estimators where the first $N-1$ are transformers and the last is a transformer or predictor. The type of the pipeline (transformer or predictor) depends on the type of the last block.

- During training it calls `fit_transform` for the first $N-1$ and calls `fit` for the last.

- `pipeline.transform(x_data)` calls `transform(x_prev)` on all blocks (used for transformer pipelines)

- `pipeline.predict(x_val)` calls `transform(x_prev)` on the first $N-1$ blocks and calls `predict(x_prev)` for the last (used for predictor pipelines)

#### Setting Hyperparameters and Hyperparameter Search

In [31]:
print(pipe.named_steps['gaussiannb'].get_params())
pipe.set_params(gaussiannb__priors=[0.5, 0.3, 0.2])        
pipe.fit(x_train, y_train)
# Can use external cache to avoid standard scaler from fitting again when its hyperparameters have not changed:
# https://scikit-learn.org/stable/modules/compose.html#caching-transformers-avoid-repeated-computation

{'priors': [0.3333333333333333, 0.3333333333333333, 0.3333333333333333], 'var_smoothing': 0.3}


In [32]:
from sklearn.model_selection import GridSearchCV

param_grid = {
    'gaussiannb__priors': [[1/3, 1/3, 1/3], [0.3, 0.5, 0.2], [0.2, 0.3, 0.5]],
    'sequentialfeatureselector__n_features_to_select': [1, 2, 3],
    }
grid = GridSearchCV(pipe, param_grid, cv=5, return_train_score=True)
grid.fit(x_train, y_train)

print(f"Best score: {grid.best_score_}")
print(f"Best params: {grid.best_params_}")

Best score: 0.975
Best params: {'gaussiannb__priors': [0.2, 0.3, 0.5], 'sequentialfeatureselector__n_features_to_select': 2}


With this, you can even do hyperparameter search over a whole block (e.g., model) but this would be left for you to research.

#### Save and Load Pipeline

Save the final model after final test

In [33]:
import joblib

joblib.dump(pipe, 'pipeline.joblib')

['pipeline.joblib']

Deployment

In [34]:
loaded_pipeline = joblib.load('pipeline.joblib')
x = input("input the four flower features: ").split()       # In reality, GUI

def CMP_4(x):                                           # In reality POST request
    x = np.array(x)
    x = x[np.newaxis, :]        # shape (4,)=>(1,4)
    pred = loaded_pipeline.predict(x)
    return pred[0]

CMP_4(x)

2

In [35]:
assert np.array_equal(pipe.predict(x_data), loaded_pipeline.predict(x_data)), \
    "Predictions from the loaded pipeline are not equivalent to the original pipeline."

Note that there is a small deployment caveat in case the custom function is not the same file or has external dependencies. You can figure this out yourself later.

#### Custom Functions

In [36]:
from sklearn.base import BaseEstimator, TransformerMixin

class Identity(BaseEstimator, TransformerMixin):
    def __init__(self, p=1):  
        self.p = 1
    
    def fit(self, x_data, y_data=None):  
        return self
    
    def transform(self, x_data, y_data=None):
        p = self.p
        return x_data * p * 1/p

identity = Identity(p=7)
pipe = make_pipeline(ss, identity, sfs, gnb) 
pipe.fit(x_train, y_train)
pipe

If you have a function that performs a transform and needs no fit then you can convert it into a transformer using `sklearn.preprocessing.FunctionalTransformer`

In [37]:
from sklearn.model_selection import GridSearchCV

param_grid = {
    'gaussiannb__priors': [[1/3, 1/3, 1/3], [0.3, 0.5, 0.2], [0.2, 0.3, 0.5]],
    'sequentialfeatureselector__n_features_to_select': [1, 2, 3],
    'identity__p': [1,2,3,4,5,6]
    }
grid = GridSearchCV(pipe, param_grid, cv=5, return_train_score=True)
grid.fit(x_train, y_train)

print(f"Best score: {grid.best_score_}")
print(f"Best params: {grid.best_params_}")

Best score: 0.975
Best params: {'gaussiannb__priors': [0.2, 0.3, 0.5], 'identity__p': 1, 'sequentialfeatureselector__n_features_to_select': 2}


### 🧰 Column Transformer

Allows you to build a transformer that applies different transformations selectively over the columns of the input dataframe:

In [41]:
import pandas as pd

x_data = pd.DataFrame(
    {'city': ['London', 'London', 'Paris', 'Sallisaw'],
     'title': ["His Last Bow", "How Watson Learned the Trick", "A Moveable Feast", "The Grapes of Wrath"],
     'expert_rating': [5, 3, 4, 5],
     'user_rating': [4, 5, 4, 3]
     })
y_data = [0, 0, 1, 1]
x_data

Unnamed: 0,city,title,expert_rating,user_rating
0,London,His Last Bow,5,4
1,London,How Watson Learned the Trick,3,5
2,Paris,A Moveable Feast,4,4
3,Sallisaw,The Grapes of Wrath,5,3


Let's define a column transformer

In [39]:
from sklearn.compose import make_column_transformer
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.preprocessing import OneHotEncoder
from sklearn.preprocessing import MinMaxScaler

preprocessor = make_column_transformer(                 # or ColumnTransformer([('onehotencoder',...
    (OneHotEncoder(drop='first'), ['city']),            # can also use column index (handle numpy arrays)
    (CountVectorizer(lowercase=True), 'title'),         # expects 1D data
    (MinMaxScaler(), ['expert_rating', 'user_rating'])
    ,
    remainder='drop'  # or 'passthrough' or remainder=MinMaxScaler()
)

x_data_p = preprocessor.fit_transform(x_data)
print(type(x_data_p), x_data_p.shape)                   # it can also be forced to output a Pandas DataFrame

<class 'numpy.ndarray'> (4, 17)


Can be embedded in a pipeline!

In [40]:
import warnings; warnings.filterwarnings("ignore")

pipe = make_pipeline(preprocessor, sfs, gnb)
pipe.set_params(gaussiannb__priors=[0.5, 0.5])                          # it's a two class problem above  
pipe.set_params(columntransformer__countvectorizer__lowercase=False)    # hyperparameter search likewise possible
pipe.fit(x_data, y_data)

And we can also embed a transformer pipeline into a column transformer (e.g., imputation + MinMax scaling). Try adding the imputation step in series with MinMax scaling in the example above as an exercise.

#### What we covered so far:

1. Can wrap a sequence of transformers and a predictor/transformer in a pipeline

2. The type of the pipeline depending on the the last step in it

3. Helps do training, cross-validation, hyperpamater tuning, etc. as if it's a single model

4. Ensures no leakage during cross-validation

5. Can save pipelines!

5. A pipeline can include custom functions

6. A step of a pipeline can also be column transformer

7. It applies transformers to different columns of a dataframe

8. A branch in a column transformer can be a pipeline of transformers