Let's consider a scenario where we're working with a dataset that has both numerical and categorical features. We want to build a pipeline that cleans the data, processes the different types of features appropriately, and then fits a classifier to predict customer churn. For the custom transformer, we'll create one that adds a new feature, which could be a domain-specific feature such as customer interaction score based on multiple numerical features.

In [1]:
import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import RandomForestClassifier
from joblib import dump, load


In [2]:
class InteractionScoreTransformer(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        # Assuming the first two columns are the ones we're interested in
        self.max_interaction = X[:, :2].max()
        return self
    
    def transform(self, X):
        # Create an interaction score: a simple product of the first two columns
        interaction_score = (X[:, 0] * X[:, 1]) / self.max_interaction
        return np.hstack((X, interaction_score[:, None]))


In [3]:
# Define the columns
num_features = ['age', 'balance', 'num_contacts']
cat_features = ['job', 'marital_status']

# Create transformers for the numerical and categorical features
numeric_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())])

categorical_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
    ('onehot', OneHotEncoder(handle_unknown='ignore'))])

# Create a column transformer to apply the transformations to the respective columns
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, num_features),
        ('cat', categorical_transformer, cat_features)])

# Full pipeline
pipeline = Pipeline(steps=[
    ('preprocessor', preprocessor),
    ('interaction', InteractionScoreTransformer()),
    ('classifier', RandomForestClassifier(random_state=42))])

In [4]:
# Simulate numerical data
np.random.seed(42)
X_num = np.random.rand(100, 3)

# Simulate categorical data
X_cat = np.random.choice(['admin', 'technician', 'retired', 'management'], size=(100, 2))

# Combine numerical and categorical data
X = np.hstack((X_num, X_cat))

# Generate a binary target variable for churn (1 for churned, 0 for not churned)
y = np.random.choice([0, 1], size=(100,))

# Convert to DataFrame
feature_names = num_features + cat_features
X_df = pd.DataFrame(X, columns=feature_names)

In [6]:
# Fit the pipeline
pipeline.fit(X_df, y)

In [None]:
# Save the pipeline to disk
dump(pipeline, 'complex_pipeline.joblib')

In [None]:
# Load the pipeline from disk
loaded_pipeline = load('complex_pipeline.joblib')

In [None]:
# Validate the loaded pipeline
original_preds = pipeline.predict(X_df)
loaded_preds = loaded_pipeline.predict(X_df)
assert np.array_equal(original_preds, loaded_preds), "The loaded pipeline's predictions differ from the original!"