# Step 8: Model Development (Regression)

Goal: Train baseline + stronger regression models for sales forecasting using a time-aware workflow, compare performance on a held-out test window, and save a deployment-ready pipeline.

Rules followed:
- Time-based validation (no shuffling)
- Deterministic runs (fixed random state)
- Outputs and saved reports are ASCII-only
- Text files written with UTF-8 encoding

Primary holdout metrics:
- MAE, RMSE, MAPE (safe), R2

Reference formulas:

$$\text{RMSE}=\sqrt{\frac{1}{n}\sum_{i=1}^n (y_i-\hat{y}_i)^2}$$
$$\text{MAE}=\frac{1}{n}\sum_{i=1}^n |y_i-\hat{y}_i|$$


In [1]:
from __future__ import annotations

import json
import time
from dataclasses import dataclass
from pathlib import Path

import numpy as np
import pandas as pd

from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.inspection import permutation_importance
from sklearn.linear_model import ElasticNet, Lasso, LinearRegression, Ridge
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV, TimeSeriesSplit
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.tree import DecisionTreeRegressor
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor, HistGradientBoostingRegressor
from sklearn.compose import TransformedTargetRegressor

import joblib

RANDOM_STATE = 42
np.random.seed(RANDOM_STATE)

pd.set_option('display.max_columns', 200)
pd.set_option('display.width', 140)


def find_project_root(start: Path) -> Path:
    start = start.resolve()
    for p in [start, *start.parents]:
        if (p / 'data').exists() and (p / 'models').exists() and (p / 'reports').exists():
            return p
    return start


NOTEBOOK_CWD = Path.cwd().resolve()
PROJECT_ROOT = find_project_root(NOTEBOOK_CWD)

DATA_DIR = PROJECT_ROOT / 'data'
MODELS_DIR = PROJECT_ROOT / 'models'
REPORTS_DIR = PROJECT_ROOT / 'reports'

for d in (DATA_DIR, MODELS_DIR, REPORTS_DIR):
    d.mkdir(parents=True, exist_ok=True)

CONFIG = {
    'use_log_target': True,
    'clip_negative_predictions_to_zero': True,
    'primary_metric': 'rmse',
    'tscv_splits': 3,
    'random_state': RANDOM_STATE,
    'n_jobs': -1,
}

print('Notebook CWD:', NOTEBOOK_CWD)
print('Project root:', PROJECT_ROOT)
print('Data dir:', DATA_DIR)
print('Models dir:', MODELS_DIR)
print('Reports dir:', REPORTS_DIR)
print('CONFIG:', CONFIG)


Notebook CWD: C:\Projects\FUTURE_ML_01\notebooks
Project root: C:\Projects\FUTURE_ML_01\notebooks
Data dir: C:\Projects\FUTURE_ML_01\notebooks\data
Models dir: C:\Projects\FUTURE_ML_01\notebooks\models
Reports dir: C:\Projects\FUTURE_ML_01\notebooks\reports
CONFIG: {'use_log_target': True, 'clip_negative_predictions_to_zero': True, 'primary_metric': 'rmse', 'tscv_splits': 3, 'random_state': 42, 'n_jobs': -1}


In [2]:
def safe_mape(y_true: np.ndarray, y_pred: np.ndarray, eps: float = 1e-8) -> float:
    y_true = np.asarray(y_true, dtype=float)
    y_pred = np.asarray(y_pred, dtype=float)
    denom = np.maximum(np.abs(y_true), eps)
    return float(np.mean(np.abs((y_true - y_pred) / denom)) * 100.0)


def evaluate_regression(y_true: np.ndarray, y_pred: np.ndarray) -> dict:
    y_true = np.asarray(y_true, dtype=float)
    y_pred = np.asarray(y_pred, dtype=float)

    mae = mean_absolute_error(y_true, y_pred)
    rmse = float(np.sqrt(mean_squared_error(y_true, y_pred)))
    mape = safe_mape(y_true, y_pred)
    r2 = r2_score(y_true, y_pred)

    return {
        'mae': float(mae),
        'rmse': float(rmse),
        'mape_percent': float(mape),
        'r2': float(r2),
    }


def clip_preds(y_pred: np.ndarray) -> np.ndarray:
    if not CONFIG.get('clip_negative_predictions_to_zero', True):
        return y_pred
    return np.maximum(y_pred, 0.0)


def write_text(path: Path, text: str) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(text, encoding='utf-8', newline='\n')


def timer() -> float:
    return time.perf_counter()


def elapsed_seconds(start: float) -> float:
    return float(time.perf_counter() - start)


print('Helper functions ready.')


Helper functions ready.


## Load prepared train/test (preferred)

This project already persisted the Step 7 artifacts. We will use them for modeling and also load the saved train/test dates to support time-aware plots and checks.


In [3]:
X_TRAIN_PATH = DATA_DIR / 'X_train.csv'
X_TEST_PATH = DATA_DIR / 'X_test.csv'
Y_TRAIN_PATH = DATA_DIR / 'y_train.csv'
Y_TEST_PATH = DATA_DIR / 'y_test.csv'
TRAIN_DATES_PATH = DATA_DIR / 'train_dates.csv'
TEST_DATES_PATH = DATA_DIR / 'test_dates.csv'

required_paths = [X_TRAIN_PATH, X_TEST_PATH, Y_TRAIN_PATH, Y_TEST_PATH, TRAIN_DATES_PATH, TEST_DATES_PATH]
missing = [p for p in required_paths if not p.exists()]
if missing:
    raise FileNotFoundError('Missing required Step 7 artifacts: ' + ', '.join(str(p) for p in missing))

X_train = pd.read_csv(X_TRAIN_PATH)
X_test = pd.read_csv(X_TEST_PATH)

y_train = pd.read_csv(Y_TRAIN_PATH).squeeze('columns')
y_test = pd.read_csv(Y_TEST_PATH).squeeze('columns')

train_dates = pd.read_csv(TRAIN_DATES_PATH)
test_dates = pd.read_csv(TEST_DATES_PATH)

# Normalize date column name (Step 7 saved a single date column)
train_date_col = train_dates.columns[0]
test_date_col = test_dates.columns[0]
train_dates_series = pd.to_datetime(train_dates[train_date_col])
test_dates_series = pd.to_datetime(test_dates[test_date_col])

print('X_train:', X_train.shape, 'X_test:', X_test.shape)
print('y_train:', y_train.shape, 'y_test:', y_test.shape)
print('Train dates range:', train_dates_series.min(), 'to', train_dates_series.max())
print('Test dates range:', test_dates_series.min(), 'to', test_dates_series.max())

# Leakage sanity check (strictly earlier train than test)
assert train_dates_series.max() < test_dates_series.min(), 'Time split violation: train overlaps test'
assert len(X_train) == len(y_train) == len(train_dates_series)
assert len(X_test) == len(y_test) == len(test_dates_series)

print('Time split checks passed.')


FileNotFoundError: Missing required Step 7 artifacts: C:\Projects\FUTURE_ML_01\notebooks\data\X_train.csv, C:\Projects\FUTURE_ML_01\notebooks\data\X_test.csv, C:\Projects\FUTURE_ML_01\notebooks\data\y_train.csv, C:\Projects\FUTURE_ML_01\notebooks\data\y_test.csv, C:\Projects\FUTURE_ML_01\notebooks\data\train_dates.csv, C:\Projects\FUTURE_ML_01\notebooks\data\test_dates.csv

## Preprocessing pipeline (ColumnTransformer)

We build a deployment-ready preprocessing step that can handle numeric and categorical columns safely.


In [None]:
def infer_feature_types(df_features: pd.DataFrame) -> tuple[list[str], list[str]]:
    numeric_cols = df_features.select_dtypes(include=[np.number]).columns.tolist()
    categorical_cols = [c for c in df_features.columns if c not in numeric_cols]
    return numeric_cols, categorical_cols


def make_preprocess(scale_numeric: bool) -> ColumnTransformer:
    numeric_cols, categorical_cols = infer_feature_types(X_train)

    numeric_steps = [
        ('imputer', SimpleImputer(strategy='median')),
    ]
    if scale_numeric:
        numeric_steps.append(('scaler', StandardScaler()))

    numeric_transformer = Pipeline(steps=numeric_steps)

    categorical_transformer = Pipeline(
        steps=[
            ('imputer', SimpleImputer(strategy='most_frequent')),
            ('onehot', OneHotEncoder(handle_unknown='ignore')),
        ]
    )

    preprocess = ColumnTransformer(
        transformers=[
            ('num', numeric_transformer, numeric_cols),
            ('cat', categorical_transformer, categorical_cols),
        ],
        remainder='drop',
        verbose_feature_names_out=False,
    )

    return preprocess


def assert_schema_compatible(train_df: pd.DataFrame, test_df: pd.DataFrame) -> None:
    missing_in_test = [c for c in train_df.columns if c not in test_df.columns]
    extra_in_test = [c for c in test_df.columns if c not in train_df.columns]
    if missing_in_test or extra_in_test:
        raise ValueError(
            'Schema mismatch between train and test. '
            f'missing_in_test={missing_in_test}, extra_in_test={extra_in_test}'
        )


assert_schema_compatible(X_train, X_test)
print('Schema check passed. Columns:', len(X_train.columns))


## Target transform (optional)

Sales can be heavy-tailed. We optionally use a log1p transform during training and inverse it during prediction.


In [None]:
def maybe_wrap_target_transform(regressor: Pipeline | object) -> object:
    if not CONFIG.get('use_log_target', True):
        return regressor

    # log1p is valid for non-negative targets; sales should be >= 0
    return TransformedTargetRegressor(
        regressor=regressor,
        func=np.log1p,
        inverse_func=np.expm1,
        check_inverse=False,
    )


print('Target transform enabled:', CONFIG['use_log_target'])


## Baseline + model runner

We use a single runner that:
- fits on the training window only
- evaluates on the untouched test window
- records metrics and training time


In [None]:
@dataclass
class ModelResult:
    name: str
    metrics: dict
    train_seconds: float
    model: object
    notes: str = ''


def fit_evaluate_holdout(name: str, estimator: object, Xtr: pd.DataFrame, ytr: pd.Series, Xte: pd.DataFrame, yte: pd.Series, notes: str = '') -> ModelResult:
    start = timer()
    estimator.fit(Xtr, ytr)
    train_s = elapsed_seconds(start)

    y_pred = estimator.predict(Xte)
    y_pred = clip_preds(np.asarray(y_pred, dtype=float))

    metrics = evaluate_regression(yte, y_pred)
    return ModelResult(name=name, metrics=metrics, train_seconds=train_s, model=estimator, notes=notes)


def make_pipeline(model: object, scale_numeric: bool) -> object:
    preprocess = make_preprocess(scale_numeric=scale_numeric)
    pipe = Pipeline(steps=[('preprocess', preprocess), ('model', model)])
    return maybe_wrap_target_transform(pipe)


results: list[ModelResult] = []

baseline = make_pipeline(LinearRegression(), scale_numeric=True)
results.append(fit_evaluate_holdout('LinearRegression', baseline, X_train, y_train, X_test, y_test, notes='baseline'))

print('Baseline metrics:', results[-1].metrics)
