# Preprocessing: Production Pipeline

This notebook creates a **complete sklearn pipeline** for preprocessing the King County house price dataset. The pipeline encapsulates all transformations, enabling:

1. **End-to-end inference**: Transform raw CSV data directly
2. **Reproducibility**: All fitted parameters saved in a single artifact
3. **No data leakage**: Proper fit/transform separation enforced by design

> **Prerequisite:** For step-by-step explanations of each preprocessing step, see `04a-preprocessing-step-by-step.ipynb`.

In [None]:
import numpy as np
import pandas as pd
from pathlib import Path

## 1. Load and Prepare Data

In [None]:
import kagglehub

path = kagglehub.dataset_download("harlfoxem/housesalesprediction")
csv_path = Path(path) / "kc_house_data.csv"
df = pd.read_csv(csv_path)
print(f"Dataset shape: {df.shape}")

Dataset shape: (21613, 21)


### Parse dates for splitting

We parse dates here to enable temporal splitting. The pipeline will handle its own date parsing for end-to-end inference.

In [None]:
df["date_parsed"] = pd.to_datetime(df["date"].str[:8], format="%Y%m%d")
df = df.sort_values("date_parsed").reset_index(drop=True)
print(f"Date range: {df['date_parsed'].min().date()} to {df['date_parsed'].max().date()}")

Date range: 2014-05-02 to 2015-05-27


### Temporal split

In [None]:
def temporal_train_val_test_split(
    df: pd.DataFrame, 
    date_column: str,
    val_size: float = 0.15, 
    test_size: float = 0.15
) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """Split data temporally into train, validation, and test sets."""
    df_sorted = df.sort_values(date_column).reset_index(drop=True)
    n = len(df_sorted)
    train_end = int(n * (1 - val_size - test_size))
    val_end = int(n * (1 - test_size))
    return (
        df_sorted.iloc[:train_end].copy(),
        df_sorted.iloc[train_end:val_end].copy(),
        df_sorted.iloc[val_end:].copy()
    )

train_df, val_df, test_df = temporal_train_val_test_split(df, date_column="date_parsed")

print(f"Train: {len(train_df):,} | Val: {len(val_df):,} | Test: {len(test_df):,}")
print(f"Train dates: {train_df['date_parsed'].min().date()} to {train_df['date_parsed'].max().date()}")
print(f"Val dates:   {val_df['date_parsed'].min().date()} to {val_df['date_parsed'].max().date()}")
print(f"Test dates:  {test_df['date_parsed'].min().date()} to {test_df['date_parsed'].max().date()}")

Train: 15,129 | Val: 3,242 | Test: 3,242
Train dates: 2014-05-02 to 2015-01-16
Val dates:   2015-01-16 to 2015-03-26
Test dates:  2015-03-26 to 2015-05-27


## 2. Custom Transformers

We create custom sklearn transformers to encapsulate all preprocessing logic. Each transformer follows the sklearn protocol:

- `fit(X, y=None)`: Learn parameters from training data only
- `transform(X)`: Apply transformation using learned parameters
- Fitted parameters end with underscore (e.g., `min_date_`)

### 2.1 DateParser

Transforms the raw date string into a datetime column. This enables true end-to-end inference from CSV data.

This is a **stateless** transformer (no fitting needed) — it just parses strings.

In [None]:
from sklearn.base import BaseEstimator, TransformerMixin

class DateParser(BaseEstimator, TransformerMixin):
    """
    Parse date strings to datetime.
    
    Converts the 'date' column from format "YYYYMMDDTHHMMSS" to datetime.
    This is a stateless transformer (no fitting needed).
    
    Parameters
    ----------
    date_column : str
        Name of the column containing date strings (default: "date").
    output_column : str
        Name for the parsed datetime column (default: "date_parsed").
    """
    
    def __init__(self, date_column: str = "date", output_column: str = "date_parsed"):
        self.date_column = date_column
        self.output_column = output_column
    
    def fit(self, X: pd.DataFrame, y=None):
        """No fitting needed — returns self."""
        return self
    
    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        """Parse date strings to datetime."""
        X = X.copy()
        X[self.output_column] = pd.to_datetime(
            X[self.date_column].str[:8], 
            format="%Y%m%d"
        )
        return X

### 2.2 FeatureEngineer

Creates all derived features and drops non-predictive columns.

> **Key design:** The `min_date_` is learned during `fit()` from training data, then used in `transform()` for all data. This prevents data leakage in the `days_since_start` feature.

In [None]:
class FeatureEngineer(BaseEstimator, TransformerMixin):
    """
    Feature engineering transformer for house price prediction.
    
    Creates derived features and drops non-predictive columns.
    
    Fitted Parameters
    -----------------
    min_date_ : Timestamp
        Minimum date from training data. Used to compute days_since_start
        consistently across all splits (preventing data leakage).
    
    Features Created
    ----------------
    - days_since_start: Days since min_date_ (temporal trend)
    - house_age: Age of house at sale time
    - was_renovated: Binary indicator
    - years_since_renovation: Years since last renovation
    - basement_ratio: sqft_basement / sqft_living
    - living_vs_neighbors: sqft_living / sqft_living15
    - lot_vs_neighbors: sqft_lot / sqft_lot15
    
    Columns Dropped
    ---------------
    id, date, date_parsed, zipcode, yr_built, yr_renovated
    """
    
    def __init__(self):
        pass
    
    def fit(self, X: pd.DataFrame, y=None):
        """
        Learn reference values from training data.
        
        Stores min_date_ to ensure consistent days_since_start across splits.
        """
        # Validate required columns
        required = ["date_parsed", "yr_built", "yr_renovated", "sqft_basement", 
                    "sqft_living", "sqft_living15", "sqft_lot", "sqft_lot15"]
        missing = set(required) - set(X.columns)
        if missing:
            raise ValueError(f"Missing required columns: {missing}")
        
        # Store minimum date from TRAINING data
        # This prevents data leakage when transforming val/test
        self.min_date_ = X["date_parsed"].min()
        
        return self
    
    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        """
        Apply feature engineering transformations.
        
        Uses fitted min_date_ for days_since_start calculation.
        """
        X = X.copy()
        
        # Temporal feature: days since training start
        # Uses fitted min_date_ (NOT recomputed from X)
        X["days_since_start"] = (X["date_parsed"] - self.min_date_).dt.days
        
        # Age features
        sale_year = X["date_parsed"].dt.year
        X["house_age"] = sale_year - X["yr_built"]
        X["was_renovated"] = (X["yr_renovated"] > 0).astype(int)
        X["years_since_renovation"] = np.where(
            X["yr_renovated"] > 0,
            sale_year - X["yr_renovated"],
            0
        )
        
        # Ratio features
        X["basement_ratio"] = X["sqft_basement"] / X["sqft_living"].replace(0, 1)
        X["living_vs_neighbors"] = X["sqft_living"] / X["sqft_living15"].replace(0, 1)
        X["lot_vs_neighbors"] = X["sqft_lot"] / X["sqft_lot15"].replace(0, 1)
        
        # Drop non-predictive columns
        columns_to_drop = [
            "id",           # Property identifier
            "date",         # Original string format
            "date_parsed",  # Used for feature engineering only
            "zipcode",      # High cardinality (using lat/long)
            "yr_built",     # Replaced by house_age
            "yr_renovated", # Replaced by was_renovated, years_since_renovation
        ]
        X = X.drop(columns=columns_to_drop)
        
        return X

## 3. Build Complete Pipeline

The pipeline has three stages:

1. **DateParser**: Raw date string → datetime
2. **FeatureEngineer**: Create derived features, drop non-predictive columns
3. **NumericPreprocessor**: Log transform + scaling

In [None]:
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, FunctionTransformer

In [None]:
# Define feature groups (after feature engineering)
log_features = [
    "sqft_living", "sqft_lot", "sqft_above", "sqft_basement",
    "sqft_living15", "sqft_lot15"
]

passthrough_features = ["waterfront", "was_renovated"]

# We need to know all features after FeatureEngineer to define scale_features
# First, get a sample output from FeatureEngineer
temp_engineer = Pipeline([
    ("date_parser", DateParser()),
    ("feature_engineer", FeatureEngineer())
])

# Separate X/y for fitting
X_train_raw = train_df.drop(columns=["price", "date_parsed"])  # Raw features (keep 'date' for pipeline)
y_train = train_df["price"]

# Get sample output to determine all feature names
temp_engineer.fit(X_train_raw)
temp_output = temp_engineer.transform(X_train_raw)

all_features = list(temp_output.columns)
scale_features = [f for f in all_features if f not in log_features + passthrough_features]

print(f"Log+scale ({len(log_features)}): {log_features}")
print(f"Scale only ({len(scale_features)}): {scale_features}")
print(f"Passthrough ({len(passthrough_features)}): {passthrough_features}")

Log+scale (6): ['sqft_living', 'sqft_lot', 'sqft_above', 'sqft_basement', 'sqft_living15', 'sqft_lot15']
Scale only (14): ['bedrooms', 'bathrooms', 'floors', 'view', 'condition', 'grade', 'lat', 'long', 'days_since_start', 'house_age', 'years_since_renovation', 'basement_ratio', 'living_vs_neighbors', 'lot_vs_neighbors']
Passthrough (2): ['waterfront', 'was_renovated']


In [None]:
# Log transformation + scaling pipeline
log_pipeline = Pipeline([
    ("log", FunctionTransformer(np.log1p, validate=True, feature_names_out="one-to-one")),
    ("scale", StandardScaler())
])

# Numeric preprocessor (applied after feature engineering)
numeric_preprocessor = ColumnTransformer([
    ("log", log_pipeline, log_features),
    ("scale", StandardScaler(), scale_features),
    ("passthrough", "passthrough", passthrough_features)
])

# COMPLETE PIPELINE: Raw data → Processed features
full_pipeline = Pipeline([
    ("date_parser", DateParser()),
    ("feature_engineer", FeatureEngineer()),
    ("preprocessing", numeric_preprocessor)
])

print("Complete pipeline structure:")
print(full_pipeline)

Complete pipeline structure:
Pipeline(steps=[('date_parser', DateParser()),
                ('feature_engineer', FeatureEngineer()),
                ('preprocessing',
                 ColumnTransformer(transformers=[('log',
                                                  Pipeline(steps=[('log',
                                                                   FunctionTransformer(feature_names_out='one-to-one',
                                                                                       func=<ufunc 'log1p'>,
                                                                                       validate=True)),
                                                                  ('scale',
                                                                   StandardScaler())]),
                                                  ['sqft_living', 'sqft_lot',
                                                   'sqft_above',
                                                   'sqft_baseme

## 4. Fit and Transform

We fit the complete pipeline on training data, then transform all splits.

In [None]:
# Prepare raw X (without date_parsed, but with date string)
X_train = train_df.drop(columns=["price", "date_parsed"])
y_train = train_df["price"]

X_val = val_df.drop(columns=["price", "date_parsed"])
y_val = val_df["price"]

X_test = test_df.drop(columns=["price", "date_parsed"])
y_test = test_df["price"]

print(f"Raw X shapes: Train={X_train.shape}, Val={X_val.shape}, Test={X_test.shape}")

Raw X shapes: Train=(15129, 20), Val=(3242, 20), Test=(3242, 20)


In [None]:
# FIT on training data only
full_pipeline.fit(X_train, y_train)

# Show fitted parameters
fitted_min_date = full_pipeline.named_steps['feature_engineer'].min_date_
print(f"Fitted min_date from training: {fitted_min_date.date()}")

Fitted min_date from training: 2014-05-02


In [None]:
# Transform all splits
X_train_processed = full_pipeline.transform(X_train)
X_val_processed = full_pipeline.transform(X_val)
X_test_processed = full_pipeline.transform(X_test)

print(f"Processed shapes:")
print(f"  Train: {X_train_processed.shape}")
print(f"  Val:   {X_val_processed.shape}")
print(f"  Test:  {X_test_processed.shape}")

Processed shapes:
  Train: (15129, 22)
  Val:   (3242, 22)
  Test:  (3242, 22)


In [None]:
# Get feature names
feature_names = full_pipeline.named_steps['preprocessing'].get_feature_names_out()
print(f"\nTotal features: {len(feature_names)}")
print(f"Feature names: {list(feature_names)}")


Total features: 22
Feature names: ['log__sqft_living', 'log__sqft_lot', 'log__sqft_above', 'log__sqft_basement', 'log__sqft_living15', 'log__sqft_lot15', 'scale__bedrooms', 'scale__bathrooms', 'scale__floors', 'scale__view', 'scale__condition', 'scale__grade', 'scale__lat', 'scale__long', 'scale__days_since_start', 'scale__house_age', 'scale__years_since_renovation', 'scale__basement_ratio', 'scale__living_vs_neighbors', 'scale__lot_vs_neighbors', 'passthrough__waterfront', 'passthrough__was_renovated']


## 5. Pipeline Validation

We verify that the pipeline correctly prevents data leakage and works end-to-end.

### 5.1 Verify `days_since_start` consistency

The `days_since_start` feature should have **increasing ranges** across splits (since they're ordered temporally). If each split starts from 0, we have data leakage.

In [None]:
# Find the index of days_since_start in the final features
days_idx = list(feature_names).index('scale__days_since_start')

train_days = X_train_processed[:, days_idx]
val_days = X_val_processed[:, days_idx]
test_days = X_test_processed[:, days_idx]

print("days_since_start (SCALED) ranges:")
print(f"  Train: {train_days.min():.2f} to {train_days.max():.2f}")
print(f"  Val:   {val_days.min():.2f} to {val_days.max():.2f}")
print(f"  Test:  {test_days.min():.2f} to {test_days.max():.2f}")

# Validation: val min should be > train min (it's later in time)
assert val_days.min() > train_days.min(), "Data leakage detected in validation set!"
assert test_days.min() > val_days.min(), "Data leakage detected in test set!"
print("\n✓ No data leakage detected: temporal ordering preserved")

days_since_start (SCALED) ranges:
  Train: -1.67 to 1.99
  Val:   1.99 to 2.97
  Test:  2.97 to 3.85

✓ No data leakage detected: temporal ordering preserved


### 5.2 Test end-to-end inference

The pipeline should be able to transform completely raw data (as it would come from a CSV) without any preprocessing.

In [None]:
# Create a synthetic "new" row as it would come from raw CSV
new_data = pd.DataFrame([{
    "id": 9999999,
    "date": "20150701T000000",  # Raw date string!
    "bedrooms": 3,
    "bathrooms": 2.0,
    "sqft_living": 1800,
    "sqft_lot": 5000,
    "floors": 1.0,
    "waterfront": 0,
    "view": 0,
    "condition": 3,
    "grade": 7,
    "sqft_above": 1800,
    "sqft_basement": 0,
    "yr_built": 1990,
    "yr_renovated": 0,
    "zipcode": 98001,
    "lat": 47.5,
    "long": -122.2,
    "sqft_living15": 1750,
    "sqft_lot15": 4800
}])

print("Raw input data:")
print(new_data[["date", "sqft_living", "yr_built"]].to_string(index=False))

Raw input data:
           date  sqft_living  yr_built
20150701T000000         1800      1990


In [None]:
# Transform using the complete pipeline
new_processed = full_pipeline.transform(new_data)

print(f"Processed shape: {new_processed.shape}")
print(f"Processed values (first 5 features):")
for i, name in enumerate(feature_names[:5]):
    print(f"  {name}: {new_processed[0, i]:.4f}")

print("\n✓ End-to-end inference works!")

Processed shape: (1, 22)
Processed values (first 5 features):
  log__sqft_living: -0.1486
  log__sqft_lot: -0.5295
  log__sqft_above: 0.2176
  log__sqft_basement: -0.8008
  log__sqft_living15: -0.2343

✓ End-to-end inference works!


## 6. Save Pipeline and Data

In [None]:
import joblib

output_dir = Path("processed_data")
output_dir.mkdir(exist_ok=True)

# Save processed arrays
np.save(output_dir / "X_train.npy", X_train_processed)
np.save(output_dir / "X_val.npy", X_val_processed)
np.save(output_dir / "X_test.npy", X_test_processed)
np.save(output_dir / "y_train.npy", y_train.values)
np.save(output_dir / "y_val.npy", y_val.values)
np.save(output_dir / "y_test.npy", y_test.values)

# Save the COMPLETE pipeline (includes everything!)
joblib.dump(full_pipeline, output_dir / "preprocessor.joblib")

# Save feature names
np.save(output_dir / "feature_names.npy", feature_names)

print(f"Saved to {output_dir.absolute()}")

Saved to /media/DIURNOext4/alejandro/wip-clase/PIA-SAA/examen-SAA/example_repos/king-county/processed_data


## 7. Inference Example

This section shows how to use the saved pipeline for inference on new data.

In [None]:
# Simulate loading in a new session
loaded_pipeline = joblib.load(output_dir / "preprocessor.joblib")

# Check that fitted parameters are preserved
print(f"Loaded pipeline min_date: {loaded_pipeline.named_steps['feature_engineer'].min_date_.date()}")

Loaded pipeline min_date: 2014-05-02


In [None]:
# Process new data using loaded pipeline
new_data_inference = pd.DataFrame([{
    "id": 8888888,
    "date": "20151015T000000",
    "bedrooms": 4,
    "bathrooms": 2.5,
    "sqft_living": 2500,
    "sqft_lot": 8000,
    "floors": 2.0,
    "waterfront": 0,
    "view": 1,
    "condition": 4,
    "grade": 8,
    "sqft_above": 2000,
    "sqft_basement": 500,
    "yr_built": 2005,
    "yr_renovated": 0,
    "zipcode": 98052,
    "lat": 47.6,
    "long": -122.1,
    "sqft_living15": 2400,
    "sqft_lot15": 7500
}])

X_new = loaded_pipeline.transform(new_data_inference)
print(f"Inference result shape: {X_new.shape}")
print("\n✓ Ready for model prediction!")

Inference result shape: (1, 22)

✓ Ready for model prediction!


## Summary

### Pipeline contents

The saved `preprocessor.joblib` contains:

| Stage | Transformer | Fitted Parameters |
|-------|-------------|-------------------|
| 1 | DateParser | None (stateless) |
| 2 | FeatureEngineer | `min_date_` (from training) |
| 3 | ColumnTransformer | Scaler means/stds |

### How to use for inference

```python
import joblib
import pandas as pd

# Load pipeline
pipeline = joblib.load("processed_data/preprocessor.joblib")

# Load raw CSV data
new_data = pd.read_csv("new_houses.csv")

# Transform (handles everything automatically)
X = pipeline.transform(new_data)

# Predict (requires trained model)
model = joblib.load("model.joblib")
predictions = model.predict(X)
```

### Key benefits

- ✅ **No data leakage**: `min_date_` learned from training only
- ✅ **End-to-end inference**: Accepts raw CSV data with date strings
- ✅ **Single artifact**: All preprocessing in one `joblib` file
- ✅ **sklearn compatible**: Works with `GridSearchCV`, `cross_val_score`, etc.