# Lab 08: Regularization — LASSO, Ridge, and Elastic Net

**BSAD 8310: Business Forecasting | University of Nebraska at Omaha**

## Objectives

1. Build a lag-feature matrix for RSXFS retail sales
2. Fit Ridge, LASSO, and Elastic Net regressions with a leakage-free `sklearn` Pipeline
3. Tune `λ` via `TimeSeriesSplit` cross-validation
4. Visualise Ridge and LASSO coefficient paths
5. Compare regularised forecasts against the SARIMA baseline

## Packages Required
```
numpy, pandas, matplotlib, scikit-learn, statsmodels
pandas_datareader (optional — FRED data)
```

In [None]:
# =============================================================================
# Section 1: Setup
# =============================================================================
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.ticker as mticker
import warnings
warnings.filterwarnings('ignore')

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import (
    Ridge, Lasso, ElasticNet,
    RidgeCV, LassoCV, ElasticNetCV,
    lasso_path
)
from sklearn.model_selection import TimeSeriesSplit, GridSearchCV
from sklearn.metrics import mean_squared_error

np.random.seed(42)

# UNO color palette
UNO = {
    'blue':       '#005CA9',
    'red':        '#E41C38',
    'gray':       '#525252',
    'green':      '#15803d',
    'lightblue':  '#cce0f5',
    'lightred':   '#fde8eb',
    'lightgreen': '#dcfce7',
    'lightgray':  '#e5e5e5',
}

# Publication-ready plot defaults
plt.rcParams.update({
    'figure.dpi':       150,
    'axes.spines.top':  False,
    'axes.spines.right':False,
    'font.size':        11,
    'axes.titlesize':   13,
    'axes.labelsize':   11,
})

FIGURE_DIR = '../Figures'
import os; os.makedirs(FIGURE_DIR, exist_ok=True)
print('Setup complete.')

In [None]:
# =============================================================================
# Section 2: Load Data
# =============================================================================
# Primary: US Monthly Retail Sales (RSXFS) from FRED
# Fallback: statsmodels macrodata (quarterly — will be resampled)

try:
    import pandas_datareader.data as web
    from datetime import datetime
    raw = web.DataReader('RSXFS', 'fred',
                         start='1992-01-01',
                         end='2024-12-01')
    y_all = raw['RSXFS'].dropna()
    y_all.index = pd.to_datetime(y_all.index).to_period('M')
    data_source = 'FRED RSXFS'
    print(f'Loaded FRED RSXFS: {len(y_all)} monthly observations')
except Exception:
    import statsmodels.api as sm
    macro = sm.datasets.macrodata.load_pandas().data
    macro.index = pd.period_range('1959Q1', periods=len(macro), freq='Q')
    y_all = macro['realgdp']
    data_source = 'Macrodata (realgdp, quarterly fallback)'
    print(f'Loaded statsmodels macrodata: {len(y_all)} quarters (fallback)')

print(f'Source: {data_source}')
print(f'Range:  {y_all.index[0]} to {y_all.index[-1]}')
y_all.plot(title='RSXFS: US Retail Sales', color=UNO['blue'],
           ylabel='Millions USD', figsize=(10, 3))
plt.tight_layout()
plt.show()

In [None]:
# =============================================================================
# Section 3: Feature Engineering
# =============================================================================
# Build lag features BEFORE rolling windows to prevent leakage:
# Rolling stats must use .shift(1) first so they don't see y_t.

def make_features(y, n_lags=12, roll_windows=(3, 6, 12), add_calendar=True):
    """
    Construct a supervised feature matrix from a univariate time series.

    Leakage prevention:
    - All lag features use .shift(k) so lag k = y_{t-k}
    - Rolling stats are computed on the lagged series (.shift(1))
      so they only see y_{t-1}, y_{t-2}, ...

    Returns
    -------
    X : pd.DataFrame  Feature matrix (aligned, NaN rows dropped)
    y_aligned : pd.Series  Target aligned with X
    """
    df = pd.DataFrame({'y': y})

    # Lag features: y_{t-1}, ..., y_{t-n_lags}
    for k in range(1, n_lags + 1):
        df[f'lag_{k}'] = y.shift(k)

    # Rolling stats on y_{t-1} to prevent leakage
    y_lagged = y.shift(1)
    for w in roll_windows:
        df[f'roll_mean_{w}'] = y_lagged.rolling(w).mean()
        df[f'roll_std_{w}']  = y_lagged.rolling(w).std()

    # Calendar dummies (month-of-year, 11 dummies — Jan is baseline)
    if add_calendar:
        if hasattr(y.index, 'month'):
            month = y.index.month
        elif hasattr(y.index, 'to_timestamp'):
            month = y.index.to_timestamp().month
        else:
            month = None
        if month is not None:
            for m in range(2, 13):
                df[f'month_{m}'] = (month == m).astype(int)

    df.dropna(inplace=True)
    X = df.drop(columns=['y'])
    y_aligned = df['y']
    return X, y_aligned


X, y = make_features(y_all, n_lags=12, roll_windows=(3, 6, 12))
print(f'Feature matrix shape: {X.shape}')
print(f'Features: {list(X.columns)}')
X.head(3)

In [None]:
# =============================================================================
# Section 4: Three-Way Chronological Split
# =============================================================================
# Split: 70% train | 15% validation | 15% test
# No random shuffling — time ordering must be preserved.

n = len(y)
n_test = int(0.15 * n)
n_val  = int(0.15 * n)
n_train = n - n_val - n_test

X_train = X.iloc[:n_train]
y_train = y.iloc[:n_train]

X_val   = X.iloc[n_train:n_train+n_val]
y_val   = y.iloc[n_train:n_train+n_val]

X_test  = X.iloc[n_train+n_val:]
y_test  = y.iloc[n_train+n_val:]

# Train+val combined for final refit after CV
X_trainval = X.iloc[:n_train+n_val]
y_trainval = y.iloc[:n_train+n_val]

print(f'Train:    {X_train.shape[0]} obs  ({y_train.index[0]} – {y_train.index[-1]})')
print(f'Val:      {X_val.shape[0]} obs  ({y_val.index[0]} – {y_val.index[-1]})')
print(f'Test:     {X_test.shape[0]} obs  ({y_test.index[0]} – {y_test.index[-1]})')

# Visualise the split
fig, ax = plt.subplots(figsize=(10, 3))
ax.plot(y_train.index.astype(str), y_train.values,
        color=UNO['blue'],  lw=1.5, label='Train')
ax.plot(y_val.index.astype(str),   y_val.values,
        color=UNO['green'], lw=1.5, label='Validation')
ax.plot(y_test.index.astype(str),  y_test.values,
        color=UNO['red'],   lw=1.5, label='Test')
ax.set_title('Three-Way Chronological Split')
ax.set_xlabel('Period')
ax.legend()
ax.xaxis.set_major_locator(mticker.MaxNLocator(8))
plt.xticks(rotation=30)
plt.tight_layout()
plt.show()

In [None]:
# =============================================================================
# Section 5: Ridge and LASSO — Validation Curve
# =============================================================================
# Use GridSearchCV with TimeSeriesSplit to select optimal alpha (= lambda).
# StandardScaler is INSIDE the Pipeline — no leakage.

tscv = TimeSeriesSplit(n_splits=5, gap=0)
alpha_grid = np.logspace(-3, 4, 80)

def fit_regularised(model_class, X_tr, y_tr, alpha_grid, tscv):
    """Fit regularised model via GridSearchCV; return best estimator + CV results."""
    pipe = Pipeline([
        ('scaler', StandardScaler()),
        ('model',  model_class())
    ])
    gs = GridSearchCV(
        pipe,
        {'model__alpha': alpha_grid},
        cv=tscv,
        scoring='neg_root_mean_squared_error',
        refit=True,
        n_jobs=-1
    )
    gs.fit(X_tr, y_tr)
    return gs

print('Fitting Ridge via TimeSeriesSplit CV...')
gs_ridge = fit_regularised(Ridge, X_trainval, y_trainval, alpha_grid, tscv)
print(f'  Ridge best alpha: {gs_ridge.best_params_["model__alpha"]:.4f}')

print('Fitting LASSO via TimeSeriesSplit CV...')
gs_lasso = fit_regularised(Lasso, X_trainval, y_trainval, alpha_grid, tscv)
print(f'  LASSO best alpha: {gs_lasso.best_params_["model__alpha"]:.4f}')

# Plot validation curves
fig, axes = plt.subplots(1, 2, figsize=(12, 4))

for ax, gs, title, color in zip(
    axes,
    [gs_ridge, gs_lasso],
    ['Ridge', 'LASSO'],
    [UNO['blue'], UNO['red']]
):
    results = gs.cv_results_
    alphas  = results['param_model__alpha'].astype(float)
    val_rmse  = -results['mean_test_score']
    best_alpha = gs.best_params_['model__alpha']

    ax.semilogx(alphas, val_rmse, color=color, lw=2, label='Val RMSE')
    ax.axvline(best_alpha, color='black', ls='--', lw=1.2,
               label=f'λ* = {best_alpha:.3f}')
    ax.set_title(f'{title}: Validation Curve')
    ax.set_xlabel('α (= λ)')
    ax.set_ylabel('RMSE')
    ax.legend()

plt.tight_layout()
plt.savefig(f'{FIGURE_DIR}/lecture08_validation_curve.png', dpi=150, bbox_inches='tight')
plt.show()
print('Saved lecture08_validation_curve.png')

In [None]:
# =============================================================================
# Section 6: Coefficient Paths
# =============================================================================
# Ridge path: refit at each alpha on train+val (scaled once for illustration)
# LASSO path: use sklearn's lasso_path (coordinate descent, warm starts)

scaler = StandardScaler()
X_tv_sc = scaler.fit_transform(X_trainval)

alpha_path = np.logspace(-2, 4, 60)

# Ridge coefficient path
ridge_coefs = np.array([
    Ridge(alpha=a).fit(X_tv_sc, y_trainval).coef_
    for a in alpha_path
])

# LASSO coefficient path (lasso_path returns alphas, coefs, gaps)
lasso_alphas, lasso_coefs, _ = lasso_path(
    X_tv_sc, y_trainval,
    alphas=alpha_path[::-1],  # lasso_path expects decreasing alphas
    max_iter=5000
)

feat_names = X_trainval.columns.tolist()
n_show = min(8, len(feat_names))   # show first 8 features for clarity

fig, axes = plt.subplots(1, 2, figsize=(13, 4.5))

# Ridge path
ax = axes[0]
colors = plt.cm.tab10(np.linspace(0, 1, n_show))
for i in range(n_show):
    ax.semilogx(alpha_path, ridge_coefs[:, i],
                color=colors[i], lw=1.5, label=feat_names[i])
ax.axvline(gs_ridge.best_params_['model__alpha'],
           color='black', ls='--', lw=1.2, label='λ*')
ax.axhline(0, color='gray', lw=0.8, ls=':')
ax.set_title('Ridge: Coefficient Path')
ax.set_xlabel('α (= λ)')
ax.set_ylabel('Standardised coefficient')
ax.legend(fontsize=7, ncol=2, loc='upper right')

# LASSO path
ax = axes[1]
for i in range(n_show):
    ax.semilogx(lasso_alphas, lasso_coefs[i, :],
                color=colors[i], lw=1.5, label=feat_names[i])
ax.axvline(gs_lasso.best_params_['model__alpha'],
           color='black', ls='--', lw=1.2, label='λ*')
ax.axhline(0, color='gray', lw=0.8, ls=':')
ax.set_title('LASSO: Coefficient Path')
ax.set_xlabel('α (= λ)')
ax.set_ylabel('Standardised coefficient')
ax.legend(fontsize=7, ncol=2, loc='upper right')

plt.tight_layout()
plt.savefig(f'{FIGURE_DIR}/lecture08_ridge_path.png', dpi=150, bbox_inches='tight')
plt.savefig(f'{FIGURE_DIR}/lecture08_lasso_path.png', dpi=150, bbox_inches='tight')
plt.show()
print('Saved lecture08_ridge_path.png and lecture08_lasso_path.png')

In [None]:
# =============================================================================
# Section 7: Elastic Net
# =============================================================================
# Try three mix ratios (l1_ratio = alpha in sklearn ElasticNet notation).
# Then select best via TimeSeriesSplit CV.

# Note: sklearn ElasticNet uses l1_ratio (= our alpha) and alpha (= our lambda)
en_param_grid = {
    'model__alpha':    np.logspace(-3, 4, 40),
    'model__l1_ratio': [0.1, 0.5, 0.9],
}

en_pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('model',  ElasticNet(max_iter=5000))
])

print('Fitting Elastic Net via GridSearchCV (alpha x l1_ratio grid)...')
gs_en = GridSearchCV(
    en_pipe, en_param_grid,
    cv=tscv,
    scoring='neg_root_mean_squared_error',
    refit=True,
    n_jobs=-1
)
gs_en.fit(X_trainval, y_trainval)

best = gs_en.best_params_
print(f'  Best lambda (alpha):   {best["model__alpha"]:.4f}')
print(f'  Best mix   (l1_ratio): {best["model__l1_ratio"]}')

# Count non-zero coefficients at best settings
en_coef = gs_en.best_estimator_.named_steps['model'].coef_
n_active = np.sum(en_coef != 0)
print(f'  Active features: {n_active} / {X_trainval.shape[1]}')

# Bar chart of coefficients
coef_df = pd.Series(en_coef, index=feat_names).sort_values()
fig, ax = plt.subplots(figsize=(8, max(4, n_active * 0.3 + 1)))
coef_df[coef_df != 0].plot.barh(
    ax=ax, color=UNO['blue'], edgecolor='white'
)
ax.axvline(0, color='black', lw=0.8)
ax.set_title(f'Elastic Net Coefficients (λ={best["model__alpha"]:.3f}, '
             f'l1_ratio={best["model__l1_ratio"]})')
ax.set_xlabel('Standardised coefficient')
plt.tight_layout()
plt.show()

In [None]:
# =============================================================================
# Section 8: SARIMA Baseline
# =============================================================================
import statsmodels.api as sm

# Fit SARIMA(1,1,1)(1,1,1)_12 on train+val; forecast test period
# Use train period only for fitting (consistent with regularised models)
try:
    from statsmodels.tsa.statespace.sarimax import SARIMAX
    sarima_mod = SARIMAX(
        y_trainval,
        order=(1, 1, 1),
        seasonal_order=(1, 1, 1, 12),
        enforce_stationarity=False,
        enforce_invertibility=False
    )
    sarima_res = sarima_mod.fit(disp=False)
    y_pred_sarima = sarima_res.forecast(len(y_test))
    sarima_ok = True
    print('SARIMA fit complete.')
except Exception as e:
    print(f'SARIMA failed ({e}); using seasonal naive baseline.')
    sarima_ok = False
    # Seasonal naive: y_{t+h} = y_{t+h-12}
    y_pred_sarima = pd.Series(
        y_trainval.values[-12:len(y_test) if len(y_test) <= 12 else 12],
        index=y_test.index[:12]
    )

# Predictions from regularised models
y_pred_ridge = pd.Series(
    gs_ridge.best_estimator_.predict(X_test),
    index=y_test.index
)
y_pred_lasso = pd.Series(
    gs_lasso.best_estimator_.predict(X_test),
    index=y_test.index
)
y_pred_en = pd.Series(
    gs_en.best_estimator_.predict(X_test),
    index=y_test.index
)

# Evaluation helper
def rmse(actual, predicted):
    pred = np.asarray(predicted)[:len(actual)]
    return np.sqrt(mean_squared_error(actual[:len(pred)], pred))

def mae(actual, predicted):
    pred = np.asarray(predicted)[:len(actual)]
    return np.mean(np.abs(np.asarray(actual[:len(pred)]) - pred))

results = pd.DataFrame({
    'Model': ['SARIMA(1,1,1)(1,1,1)_12', 'Ridge', 'LASSO', 'Elastic Net'],
    'RMSE': [
        rmse(y_test, y_pred_sarima) if sarima_ok else float('nan'),
        rmse(y_test, y_pred_ridge),
        rmse(y_test, y_pred_lasso),
        rmse(y_test, y_pred_en),
    ],
    'MAE': [
        mae(y_test, y_pred_sarima) if sarima_ok else float('nan'),
        mae(y_test, y_pred_ridge),
        mae(y_test, y_pred_lasso),
        mae(y_test, y_pred_en),
    ],
})
results['RMSE'] = results['RMSE'].round(1)
results['MAE']  = results['MAE'].round(1)
print('\nTest-set evaluation:')
print(results.to_string(index=False))

In [None]:
# =============================================================================
# Section 9: Forecast Comparison Plot
# =============================================================================

fig, ax = plt.subplots(figsize=(11, 4))

# Last 24 train+val obs for context
context = y_trainval.iloc[-24:]
ax.plot(context.index.astype(str), context.values,
        color=UNO['lightgray'], lw=1.5, label='History')

# Actuals
ax.plot(y_test.index.astype(str), y_test.values,
        color='black', lw=2, label='Actual', zorder=5)

# Forecasts
test_idx = y_test.index.astype(str)

if sarima_ok:
    sarima_vals = np.asarray(y_pred_sarima)[:len(y_test)]
    ax.plot(test_idx[:len(sarima_vals)], sarima_vals,
            color=UNO['gray'],  lw=1.5, ls='--', label='SARIMA')

ax.plot(test_idx, y_pred_ridge.values,
        color=UNO['blue'],  lw=1.5, ls='-.', label='Ridge')
ax.plot(test_idx, y_pred_lasso.values,
        color=UNO['red'],   lw=1.5, ls=':',  label='LASSO')
ax.plot(test_idx, y_pred_en.values,
        color=UNO['green'], lw=2.0,           label='Elastic Net')

ax.set_title('Forecast Comparison: SARIMA vs. Regularised Models (Test Set)')
ax.set_xlabel('Period')
ax.set_ylabel('Retail Sales (Millions USD)')
ax.legend(loc='upper left', fontsize=9)
ax.xaxis.set_major_locator(mticker.MaxNLocator(8))
plt.xticks(rotation=30)

# Annotate RMSE
for i, (model_name, color, pred) in enumerate([
    ('Ridge',       UNO['blue'],  y_pred_ridge),
    ('LASSO',       UNO['red'],   y_pred_lasso),
    ('Elastic Net', UNO['green'], y_pred_en),
]):
    r = rmse(y_test, pred)
    ax.annotate(f'{model_name}: RMSE={r:,.0f}',
                xy=(0.99, 0.97 - i * 0.09),
                xycoords='axes fraction',
                ha='right', va='top',
                fontsize=9, color=color)

plt.tight_layout()
plt.savefig(f'{FIGURE_DIR}/lecture08_forecast_comparison.png',
            dpi=150, bbox_inches='tight')
plt.show()
print('Saved lecture08_forecast_comparison.png')

print('\n=== Final Results ===')
print(results.to_string(index=False))