
# FTI Design Pattern Pipelines with scikit-learn.
## Example with the Wine Dataset

This notebook implements the **FTI design pattern** — **Feature**, **Training**, and **Inference** pipelines — using scikit-learn on the classic **Wine** classification dataset.

- **Feature Pipeline (F)**: builds a reproducible feature table from raw data (e.g., imputing, scaling, encoding), and persists artifacts (features, schemas, transformers).
- **Training Pipeline (T)**: consumes the features/labels (or composes the feature pipeline internally), trains a model with cross-validation, and persists the **trained model** and metrics.
- **Inference Pipeline (I)**: loads the deployed model and applies the **same feature logic** to new/unseen data to produce predictions, enabling batch or online inference.
  
 FTI reference: Hopsworks' perspective of **Feature / Training / Inference pipelines** for well-structured ML systems.


## Setup

In [None]:

from pathlib import Path
import json
import numpy as np
import pandas as pd

from sklearn.datasets import load_wine
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
import joblib
import matplotlib.pyplot as plt

In [None]:
def find_repo_root(start: Path, marker_dir: str = "data") -> Path:

    current = start.resolve()
    for parent in [current] + list(current.parents):
        if (parent / marker_dir).is_dir():
            return parent
    return current

RANDOM_STATE = 42

REPO_ROOT = find_repo_root(Path.cwd(), marker_dir="data")

DATA_DIR = REPO_ROOT / 'data'
ARTIFACTS_DIR = DATA_DIR / 'processed' / 'artifacts'
FEATURES_DIR = ARTIFACTS_DIR / 'features'
MODELS_DIR = ARTIFACTS_DIR / 'models'
REPORTS_DIR = ARTIFACTS_DIR / 'reports'

for d in [ARTIFACTS_DIR, FEATURES_DIR, MODELS_DIR, REPORTS_DIR]:
    d.mkdir(parents=True, exist_ok=True)

## Load the Wine dataset

In [None]:

wine = load_wine(as_frame=True)
X = wine.data.copy()
y = wine.target.copy()
feature_names = X.columns.tolist()
target_name = 'target'

print('Features:', feature_names[:5], '...')
print('Target classes:', wine.target_names)

# Persist a raw snapshot for provenance
raw_df = X.copy()
raw_df[target_name] = y
raw_path = FEATURES_DIR / 'wine_raw_snapshot.csv'
raw_df.to_csv(raw_path, index=False)
print('Raw snapshot saved to:', raw_path)

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=RANDOM_STATE, stratify=y
)
X_train.head()



##  Feature Pipeline (F)

**Goal:** Transform raw inputs into a clean, consistent **feature table** suitable for both training and inference.
- Handles data quality concerns (e.g., missing values) and applies **scaling/encoding**.
- Persists the **feature transformer** and **feature table** to ensure **training/serving parity**.
- Produces: `features_train.npy`, `features_test.npy`, `feature_names.json`, and `feature_transformer.joblib`.

**In this dataset:** All features are numeric, so we will apply `SimpleImputer(strategy='median')` followed by `StandardScaler`.


In [None]:

numeric_features = feature_names  # all numeric for Wine
numeric_transform = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])

feature_pipeline = ColumnTransformer(
    transformers=[('num', numeric_transform, numeric_features)],
    remainder='drop'
)

# Fit on training data only
X_train_features = feature_pipeline.fit_transform(X_train)
X_test_features  = feature_pipeline.transform(X_test)

# Persist feature artifacts
np.save(FEATURES_DIR / 'features_train.npy', X_train_features)
np.save(FEATURES_DIR / 'features_test.npy',  X_test_features)

with open(FEATURES_DIR / 'feature_names.json', 'w') as f:
    json.dump(numeric_features, f, indent=2)

joblib.dump(feature_pipeline, FEATURES_DIR / 'feature_transformer.joblib')

print('Feature shapes:', X_train_features.shape, X_test_features.shape)
print('Saved feature artifacts to:', FEATURES_DIR)



##  Training Pipeline (T)

**Goal:** Train and validate a model using the **feature pipeline**, and persist the **trained model** and **metrics**.

Two common approaches:
1. **Compose** the feature pipeline inside the training pipeline (ensures end-to-end reproducibility from raw data).
2. **Consume** pre-computed features from the Feature pipeline (useful in production/batch scenarios).

Here, we demonstrate **Approach 1** by creating a single scikit-learn `Pipeline` that nests the feature transformer and classifier.  
This guarantees the same preprocessing at inference time if we export the **fitted pipeline** as a single artifact.


In [None]:
from datetime import datetime, timezone

# Define the full training pipeline: Feature engineering + Model
full_training_pipeline = Pipeline(steps=[
    ('features', feature_pipeline),
    ('clf', LogisticRegression(max_iter=200, random_state=RANDOM_STATE))
])

# Hyperparameter grid for demonstration
param_grid = {
    'clf__C': [0.1, 1.0, 10.0],
    'clf__penalty': ['l2'],
    'clf__solver': ['lbfgs']  # good default for multinomial problems
}

grid = GridSearchCV(
    estimator=full_training_pipeline,
    param_grid=param_grid,
    scoring='accuracy',
    cv=5,
    n_jobs=-1
)

grid.fit(X_train, y_train)
best_model = grid.best_estimator_
print('Best params:', grid.best_params_)

# Evaluate on test set
y_pred = best_model.predict(X_test)
acc = accuracy_score(y_test, y_pred)
print('Test accuracy:', acc)

# Persist model & a simple report
model_path = MODELS_DIR / 'wine_fti_model.joblib'
joblib.dump(best_model, model_path)


report = {
    'timestamp': datetime.now(timezone.utc).isoformat() + 'Z',
    'best_params': grid.best_params_,
    'test_accuracy': float(acc),
    'classification_report': classification_report(y_test, y_pred, output_dict=True)
}
with open(REPORTS_DIR / 'evaluation_report.json', 'w') as f:
    json.dump(report, f, indent=2)

print('Model saved to:', model_path)
print('Report saved to:', REPORTS_DIR / 'evaluation_report.json')

### (Optional) Quick Confusion Matrix Plot

In [None]:

cm = confusion_matrix(y_test, y_pred)
fig, ax = plt.subplots(figsize=(4, 4))
im = ax.imshow(cm)
ax.set_title('Confusion Matrix')
ax.set_xlabel('Predicted')
ax.set_ylabel('True')
for (i, j), v in np.ndenumerate(cm):
    ax.text(j, i, str(v), ha='center', va='center')
plt.show()



##  Inference Pipeline (I)

**Goal:** Load the **deployed model** artifact and apply it to **new/unseen data** to produce predictions.

Key requirements:
- **Same feature logic** as training (we satisfy this by exporting the *entire* fitted pipeline).
- Support **batch** (this notebook) or **online** (e.g., FastAPI) inference.
- Persist predictions if needed.

Below we simulate a batch inference job using a slice of the test set as "new data".


In [None]:

# Load the deployed/fitted pipeline
deployed_pipeline = joblib.load(MODELS_DIR / 'wine_fti_model.joblib')

# Simulate "new" data: take 5 rows from the test set
new_data = X_test.sample(5, random_state=RANDOM_STATE).copy()
new_true = y_test.loc[new_data.index]

preds = deployed_pipeline.predict(new_data)
probs = deployed_pipeline.predict_proba(new_data)

inference_df = new_data.copy()
inference_df['predicted_class'] = preds
inference_df['true_class'] = new_true.values

# Persist batch predictions
pred_path = REPORTS_DIR / 'batch_predictions.csv'
inference_df.to_csv(pred_path, index=False)

print('Batch predictions saved to:', pred_path)
inference_df



##  How the pipelines are **connected**

- **Features**: The **Feature pipeline** (`feature_pipeline`) defines the transformations for the input table.  
  In training, we **compose** it into the `full_training_pipeline` to ensure parity. In inference, we **load the fitted full pipeline** so the exact same transformations are applied.

- **Labels**: The target (`wine.target`) is kept separate from feature engineering, then used in the **Training pipeline** for fitting and evaluation.

- **Models**: The **Training pipeline** produces a single serialized artifact (`wine_fti_model.joblib`) that encapsulates **both preprocessing and the classifier**.  
  The **Inference pipeline** loads this artifact and runs `predict` / `predict_proba` on new data.



##  Artifacts produced

- `artifacts/features/wine_raw_snapshot.csv` – raw snapshot of features + label
- `artifacts/features/features_train.npy`, `features_test.npy` – example persisted feature matrices
- `artifacts/features/feature_names.json` – schema of features
- `artifacts/features/feature_transformer.joblib` – feature transformer (if needed separately)
- `artifacts/models/wine_fti_model.joblib` – **deployed** (fitted) pipeline (features + model)
- `artifacts/reports/evaluation_report.json` – metrics and best params
- `artifacts/reports/batch_predictions.csv` – example batch inference output
