# Linear Model Selection and Regularization

## Initial Setup

In [None]:
# Reload modules automatically
%load_ext autoreload
%autoreload 2

In [None]:
# Builtin imports
import sys
import warnings

from functools import partial
# from typing import Tuple, List, Dict

# Data science imports
import numpy as np
import pandas as pd

# Natsort for natural sorting
# from natsort import natsorted

# Optimization imports
from l0bnb import fit_path

# Visualization imports
from matplotlib.pyplot import subplots
from matplotlib import pyplot as plt

# Linear regression from statsmodels
from statsmodels.api import OLS

# Main scikit-learn imports
import sklearn.linear_model as skl
import sklearn.model_selection as skm
from sklearn.cross_decomposition import PLSRegression
from sklearn.decomposition import PCA
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
# from sklearn.cross_decomposition import PLSRegression
# from sklearn.linear_model import LassoCV, Ridge
# from sklearn.model_selection import train_test_split, GridSearchCV
# from sklearn.metrics import mean_squared_error

# Introduction to Statistical Learning in Python (ISLP) imports
from ISLP import load_data
from ISLP.models import ModelSpec as MS
from ISLP.models import Stepwise, sklearn_selected, sklearn_selection_path 

warnings.filterwarnings("ignore")

In [None]:
# Add project root to sys.path for module imports
if "/statapp/islp" not in sys.path:
    sys.path.append("/statapp/islp")

In [None]:
# Ancillar module imports
import ancillar as aux

## Laboratory: Linear Models and Regularization Methods

### Subset Selection Methods

#### Forward Selection

In [None]:
Hitters = load_data("Hitters")

print(f">>> Count missing values in Hitters dataset: {Hitters.isnull().sum().sum()}")

In [None]:
Hitters: pd.DataFrame = Hitters.dropna()

# Show it.
Hitters

In [None]:
def nCp(sigma2, estimator, X, Y):
    """
    Negative Cp statistic.
    """
    
    n, p = X.shape
    Yhat = estimator.predict(X)
    RSS = np.sum((Y - Yhat)**2)
    
    return -(RSS + 2 * p * sigma2) / n

In [None]:
design = MS(Hitters.columns.drop("Salary")).fit(Hitters)

Y: np.array = np.array(Hitters["Salary"])
X: pd.DataFrame = design.transform(Hitters)

sigma2: float = float(OLS(Y,X).fit().scale)

sigma2

In [None]:
neg_Cp = partial(nCp, sigma2)
neg_Cp?

In [None]:
strategy = Stepwise.first_peak(
    model_spec=design,
    direction="forward",
    max_terms=len(design.terms)
)

In [None]:
%%time

hitters_MSE = sklearn_selected(OLS, strategy)
hitters_MSE.fit(Hitters, Y)
hitters_MSE.selected_state_

In [None]:
%%time

hitters_Cp = sklearn_selected(OLS, strategy, scoring=neg_Cp)
hitters_Cp.fit(Hitters, Y)
hitters_Cp.selected_state_

#### Choosing Among Models Using the Validation Set Approach and Cross-Validation

In [None]:
strategy = Stepwise.fixed_steps(
    design,
    len(design.terms),
    direction='forward'
)

full_path = sklearn_selection_path(OLS, strategy)
full_path

In [None]:
%%time

full_path.fit(Hitters, Y)
Yhat_in: np.ndarray = full_path.predict(Hitters)
Yhat_in.shape

In [None]:
mse_fig, ax = subplots(figsize=(12, 3))

insample_mse: np.ndarray = ((Yhat_in - Y[:, None]) ** 2).mean(0)
n_steps: int = insample_mse.shape[0]

ax.plot(
    np.arange(n_steps),
    insample_mse,
    marker="o",
    color="k",  # color black
    label="In-sample"
)
ax.set_ylabel("MSE", fontsize=20)
ax.set_xlabel("# steps of forward stepwise", fontsize=20)
ax.set_xticks(np.arange(n_steps))
ax.set_xticks(np.arange(n_steps)[::2])
ax.legend()
ax.grid()
ax.set_ylim([50000, 250000]);

In [None]:
%%time

K: int = 5
kfold = skm.KFold(K, random_state=0, shuffle=True)
Yhat_cv: np.ndarray = skm.cross_val_predict (full_path, Hitters, Y, cv=kfold)

Yhat_cv.shape

In [None]:
cv_mse = []
for train_idx, test_idx in kfold.split(Y):

    errors = (Yhat_cv[test_idx] - Y[test_idx, None])**2
    
    # Column means.
    cv_mse.append(errors.mean(0))  

cv_mse = np.array(cv_mse).T

cv_mse.shape

In [None]:
ax.errorbar(
    np.arange(n_steps),
    cv_mse.mean(axis=1),
    cv_mse.std(axis=1) / np.sqrt(K),
    label="Cross-validated",
    color="red",
    marker="o"
)

ax.set_ylim([50000, 250000])
ax.legend()
mse_fig

In [None]:
validation = skm.ShuffleSplit(
    n_splits=1, test_size=0.2, random_state=0
)

for train_idx, test_idx in validation.split(Y):
    
    full_path.fit(Hitters.iloc[train_idx], Y[train_idx])
    Yhat_val = full_path.predict(Hitters.iloc[test_idx])
    errors = (Yhat_val - Y[test_idx, None])**2
    
    validation_mse = errors.mean(0)

In [None]:
ax.plot(
    np.arange(n_steps), 
    validation_mse, 
    marker="o",
    color="blue",
    linestyle="--", # color blue, broken line
    label="Validation")

ax.set_xticks(np.arange(n_steps)[::2])
ax.set_ylim([50000, 250000])
ax.legend()
mse_fig

#### Best Subset Selection

In [None]:
D = design.fit_transform(Hitters)
D = D.drop("intercept", axis=1)
X = np.asarray(D)

X.shape

In [None]:
%%time

# TODO: error in this cell:
# AttributeError: `np.Inf` was removed in the NumPy 2.0 release. Use `np.inf` instead.
# path = fit_path(X, Y, max_nonzeros=X.shape[1])
# path[3]

### Ridge Regression and the Lasso

#### Ridge Regression

In [None]:
Xs = X - X.mean(axis=0)[None, :]
X_scale = X.std(axis=0)
Xs = Xs / X_scale[None,:]

print(f">>> Scaled data shape: {Xs.shape}")

lambdas = 10 ** np.linspace(8, -2, 100) / Y.std() 
soln_array = skl.ElasticNet.path(X=Xs, y=Y, l1_ratio=0., alphas=lambdas)[1]

print(f">>> Number of lambdas: {lambdas.shape[0]}")
print(f">>> Solution array shape: {soln_array.shape}")

In [None]:
plt.figure(figsize=(10, 2.5))
plt.plot(lambdas, marker="o")
plt.ylabel("Lambda", weight="bold")
plt.title("Ridge Regression: Lambda Path", weight="bold")
plt.yscale("log");

In [None]:
soln_path: pd.DataFrame = pd.DataFrame(soln_array.T, columns=D.columns, index=-np.log(lambdas))
soln_path.index.name = "negative log(lambda)"

# Show the solution path.
soln_path

In [None]:
path_fig, ax = subplots(figsize=(9, 6)) 
soln_path.plot(ax=ax, legend=False, marker=".")
ax.set_xlabel("$-\log(\lambda)$")
ax.set_ylabel("Standardized coefficients") 

# Set legend ouside plot.
ax.legend(loc="upper left", bbox_to_anchor=(1, 1))
ax.grid(alpha=0.10);

In [None]:
soln_path2 = soln_path.copy()
soln_path2.index = lambdas
soln_path2.index.name = "lambda"
soln_path2

In [None]:
path_fig, ax = subplots(figsize=(9, 6)) 
soln_path2.plot(ax=ax, legend=False, marker=".")
ax.set_xlabel("$\\lambda$")
ax.set_ylabel("Standardized coefficients") 

# Set legend ouside plot.
ax.legend(loc="upper left", bbox_to_anchor=(1, 1))
ax.grid(alpha=0.10);
ax.set_xscale("log")

In [None]:
ridge = skl.ElasticNet(alpha=lambdas[59], l1_ratio=0)
scaler = StandardScaler(with_mean=True, with_std=True)

pipe = Pipeline(
    steps=[
        ("scaler", scaler), 
        ("ridge", ridge)
    ]
) 

pipe.fit(X, Y)

#### Estimating Test Error of Ridge Regression

In [None]:
validation = skm.ShuffleSplit(
    n_splits=1, 
    test_size=0.5,
    random_state=0
)

ridge.alpha = 0.01

results = skm.cross_validate(
    estimator=ridge,
    X=X,
    y=Y,
    scoring="neg_mean_squared_error",
    cv=validation,
)

# Show it.
results

In [None]:
ridge.alpha = 1e10

results = skm.cross_validate(
    estimator=ridge,
    X=X,
    y=Y,
    scoring="neg_mean_squared_error",
    cv=validation,
)

# Show it.
results

In [None]:
for train_idx, test_idx in validation.split(X, Y):
    prediction = Y[train_idx].mean()
    print(">>> Mean of Y in training set: {prediction}", )

# Calculate the MSE of this prediction in test split.
mse = ((prediction - Y[test_idx])**2).mean()
print(f">>> MSE of this prediction: {mse}")

In [None]:
# Percentual error.
np.abs((- results["test_score"] - mse) / (- results["test_score"]) * 100)[0]

In [None]:
param_grid = {
    "ridge__alpha": lambdas
}

grid = skm.GridSearchCV(
    estimator=pipe,
    param_grid=param_grid,
    scoring="neg_mean_squared_error",
    cv=validation,
)

grid.fit(X, Y)

# Show best parameters.
grid.best_params_

In [None]:
# Show best estimator.
grid.best_estimator_

In [None]:
%%time

grid = skm.GridSearchCV(pipe, param_grid, cv=kfold, scoring='neg_mean_squared_error')

grid.fit(X, Y) 

In [None]:
ridge_fig, ax = subplots(figsize=(7, 4)) 

ax.errorbar(
    lambdas,
    -grid.cv_results_["mean_test_score"],
    yerr=grid.cv_results_["std_test_score"] / np.sqrt(K)
)

ax.set_xscale("log")
ax.set_xlabel("$\lambda$", fontsize=10)
ax.set_ylabel("Cross-validated MSE", fontsize=10);

In [None]:
ridge_fig, ax = subplots(figsize=(7, 4)) 

ax.errorbar(
    -np.log(lambdas),
    -grid.cv_results_["mean_test_score"],
    yerr=grid.cv_results_["std_test_score"] / np.sqrt(K)
)

ax.set_ylim([50000,250000])
ax.set_xlabel("$-\log(\lambda)$", fontsize=10)
ax.set_ylabel("Cross-validated MSE", fontsize=10);

In [None]:
grid_r2 = skm.GridSearchCV(pipe, param_grid, cv=kfold, scoring="r2")

grid_r2.fit(X, Y);

In [None]:
ridge_fig, ax = subplots(figsize=(7, 4)) 

ax.errorbar(
    -np.log(lambdas),
    grid_r2.cv_results_["mean_test_score"], 
    yerr=grid_r2.cv_results_["std_test_score"] / np.sqrt(K)
)

ax.set_xlabel("$-\log(\lambda)$", fontsize=10)
ax.set_ylabel("Cross-validated $R^2$", fontsize=10);

#### Fast Cross-Validation for Solution Paths

In [None]:
ridgeCV = skl.ElasticNetCV(
    alphas=lambdas, 
    l1_ratio=0,
    cv=kfold
)

pipeCV = Pipeline(steps=[
    ("scaler", scaler),
    ("ridge", ridgeCV)
])

pipeCV.fit(X, Y)

In [None]:
tuned_ridge = pipeCV.named_steps["ridge"]

ridgeCV_fig, ax = subplots(figsize=(7, 4))

ax.errorbar(
    -np.log(lambdas),
    tuned_ridge.mse_path_.mean(1),
    yerr=tuned_ridge.mse_path_.std(1) / np.sqrt(K)
)

ax.axvline(-np.log(tuned_ridge.alpha_), c="k", ls="--")
ax.set_ylim([50000, 250000])
ax.set_xlabel("$-\log(\lambda)$", fontsize=10)
ax.set_ylabel("Cross-validated MSE", fontsize=10);

In [None]:
np.min(tuned_ridge.mse_path_.mean(1))

In [None]:
tuned_ridge.coef_

#### Evaluating Test Error of Cross-Validated Ridge

In [None]:
outer_valid = skm.ShuffleSplit(n_splits=1, test_size=0.25, random_state=1)

inner_cv = skm.KFold(n_splits=5, shuffle=True, random_state=2)
ridgeCV = skl.ElasticNetCV(alphas=lambdas, l1_ratio=0, cv=inner_cv)

pipeCV = Pipeline(steps=[
    ("scaler", scaler), 
    ("ridge", ridgeCV)
]);

In [None]:
results = skm.cross_validate(pipeCV, X, Y, cv=outer_valid, scoring='neg_mean_squared_error')

-results['test_score'][0]

#### The Lasso

In [None]:
lassoCV = skl.ElasticNetCV(n_alphas=100, l1_ratio=1, cv=kfold)

pipeCV = Pipeline(steps=[
    ("scaler", scaler), 
    ("lasso", lassoCV)
])

pipeCV.fit(X, Y)
tuned_lasso = pipeCV.named_steps["lasso"]

tuned_lasso.alpha_

In [None]:
lambdas, soln_array = skl.Lasso.path(Xs, Y, l1_ratio=1, n_alphas=100)[:2]
soln_path = pd.DataFrame(soln_array.T, columns=D.columns, index=-np.log(lambdas))

soln_path

In [None]:
path_fig, ax = subplots(figsize=(9, 5))

soln_path.plot(ax=ax, marker='.', legend=False)

ax.legend(loc="upper left", bbox_to_anchor=(1, 1))
ax.set_xlabel("$-\log(\lambda)$", fontsize=10)
ax.set_ylabel("Standardized coefficiients", fontsize=10);

In [None]:
lassoCV_fig, ax = subplots(figsize=(7, 4))

ax.errorbar(
    -np.log(tuned_lasso.alphas_),
    tuned_lasso.mse_path_.mean(1),
    yerr=tuned_lasso.mse_path_.std(1) / np.sqrt(K)
)

ax.axvline(-np.log(tuned_lasso.alpha_), c="k", ls="--")
ax.set_ylim([50000,250000])
ax.set_xlabel("$-\log(\lambda)$", fontsize=10)
ax.set_ylabel("Cross-validated MSE", fontsize=10);

In [None]:
tuned_lasso.coef_

### PCR and PLS Regression

#### Principal Components Regression

In [None]:
pca = PCA(n_components=2)
linreg = skl.LinearRegression()

pipe = Pipeline([("pca", pca), ("linreg", linreg)])
pipe.fit(X, Y)

pipe.named_steps["linreg"].coef_

In [None]:
pipe = Pipeline([("scaler", scaler), ("pca", pca), ("linreg", linreg)])

pipe.fit(X, Y)

pipe.named_steps["linreg"].coef_

In [None]:
param_grid = {"pca__n_components": range(1, 20)}

grid = skm.GridSearchCV(
    estimator=pipe,
    param_grid=param_grid,
    cv=kfold,
    scoring="neg_mean_squared_error"
)

grid.fit(X, Y)

In [None]:
pcr_fig, ax = subplots(figsize=(7, 4))
n_comp = param_grid["pca__n_components"]

ax.errorbar(
    n_comp,
    -grid.cv_results_["mean_test_score"],
    grid.cv_results_["std_test_score"] / np.sqrt(K)
)

ax.set_ylabel("Cross-validated MSE", fontsize=10)
ax.set_xlabel("# principal components", fontsize=10)
ax.set_xticks(n_comp[::2])
ax.set_ylim([50000,250000]);

print(f">>> Best number of principal components: {grid.best_params_['pca__n_components']}")

# Plot vertical orange dashed line at best number of components
ax.axvline(grid.best_params_['pca__n_components'], color='orange', linestyle='--');

In [None]:
Xn: np.ndarray = np.zeros((X.shape[0], 1))

cv_null = skm.cross_validate(
    estimator=linreg,
    X=Xn,
    y=Y,
    cv=kfold,
    scoring="neg_mean_squared_error"
)

float(-cv_null["test_score"].mean())

In [None]:
pipe.named_steps["pca"]. explained_variance_ratio_

#### Partial Least Squares

In [None]:
pls = PLSRegression(n_components=2, scale=True)

pls.fit(X, Y)

In [None]:
param_grid = {"n_components":range(1, 20)}

grid = skm.GridSearchCV(
    estimator=pls,
    param_grid=param_grid,
    cv=kfold,
    scoring="neg_mean_squared_error"
)

grid.fit(X, Y)

In [None]:
pls_fig, ax = subplots(figsize=(7, 4))

n_comp = param_grid["n_components"]

ax.errorbar(
    n_comp,
    -grid.cv_results_["mean_test_score"],
    grid.cv_results_["std_test_score"] / np.sqrt(K)
)

ax.set_ylabel("Cross-validated MSE", fontsize=10)
ax.set_xlabel("# principal components", fontsize=10)
ax.set_xticks(n_comp[::2])
ax.set_ylim([50000,250000]);

## Exercises

### Conceptual

#### 1

(a) Best subset selection has the smallest training RSS bacause it searchs for all possible combinations that has exactly $k$ predictors, while forward and backward stepwise selection don't. 

(b) We cannot state for sure which of thre three models with $k$ predictors will have the smallest test RSS;

(c) 
- i. True;
- ii. True;
- iii. False;
- iv. False;
- v. False; 

#### 2

(a)False; False; True; False;

(b)False; False; True; False;

(c) False; True; False; False;

### Applied

#### 8

In [None]:
# a)

# Random predictors and noise.
X: np.ndarray = np.random.normal(loc=0.0, scale=1.0, size=(100, 1))
eps: np.ndarray = np.random.normal(loc=0.0, scale=1.0, size=(100, 1))

In [None]:
# b)

# Real parameters that generate data.
B0: float = 5
B1: float = -10
B2: float = 3
B3: float = 14

# Response.
Y: np.ndarray = B0 + B1 * X + B2 * (X ** 2) + B3 * (X ** 3) + eps

# To dataframes.
dfY: pd.DataFrame = pd.DataFrame(Y, columns=["Y"])
dfY

In [None]:
# c)

# As dataframe with intercept.
Features: np.ndarray = np.hstack([X ** i for i in range(0, 11)])
dfX = pd.DataFrame(Features, columns=[f"X{i}" for i in range(0, 11)])
dfX

In [None]:
# Adjust a linear model with all predictors.
linear_model = OLS(endog=dfY, exog=dfX).fit()
linear_model.summary()

In [None]:
# For model selection criterion scorer.
sigma2 = linear_model.scale

In [None]:
# Run forward stepwise selection.
n_features, rss_list, r2_list, bic_list, aic_list, cp_list, best_models = aux.forward_stepwise_selection(dfX, df, sigma2)

In [None]:
# Make figures.
fig, axs = subplots(5, 1, figsize=(6, 8), sharex=True)

# Plot metrics.
axs[0].plot(n_features, rss_list, marker="o")
axs[0].set_ylabel("RSS")
axs[1].plot(n_features, r2_list, marker="o")
axs[1].set_ylabel("R2")
axs[2].plot(n_features, cp_list, marker="o")
axs[2].set_ylabel("Cp")
axs[3].plot(n_features, bic_list, marker="o")
axs[3].set_ylabel("BIC")
axs[4].plot(n_features, aic_list, marker="o")
axs[4].set_ylabel("AIC")
axs[4].set_xlabel("Number of features")

# Plot minimum RSS as a red cross.
min_rss_idx = np.argmin(rss_list)
axs[0].plot(n_features[min_rss_idx], rss_list[min_rss_idx], marker="x", color="red", markersize=15)

# Plot maximum R2 as a red cross.
max_r2_idx = np.argmax(r2_list)
axs[1].plot(n_features[max_r2_idx], r2_list[max_r2_idx], marker="x", color="red", markersize=15)

# Plot maximum cp as a red cross.
max_cp_idx = np.argmax(cp_list)
axs[2].plot(n_features[max_cp_idx], cp_list[max_cp_idx], marker="x", color="red", markersize=15)

# Plot minimum BIC as a red cross.
min_bic_idx = np.argmin(bic_list)
axs[3].plot(n_features[min_bic_idx], bic_list[min_bic_idx], marker="x", color="red", markersize=15)

# Plot minimum AIC as a red cross.
min_aic_idx = np.argmin(aic_list)
axs[4].plot(n_features[min_aic_idx], aic_list[min_aic_idx], marker="x", color="red", markersize=15)

# X labels.
axs[4].set_xticks(n_features)

# Set grid for all axes.
for ax in axs:
    ax.grid(alpha=0.15)

# Title.
axs[0].set_title("Forward Stepwise Selection Metrics", fontsize=16)

plt.tight_layout()

In [None]:
# Expected coefficients.
df_compare_forward: pd.DataFrame = pd.DataFrame({
    "true": [B0, B1, B2, B3] + [0.0]*7
}, index=[f"X{i}" for i in range(0, 11)])

# Adjusted coefficients of the best model according to Cp, BIC, AIC, and R2.
df_cp: pd.DataFrame = best_models[max_cp_idx].params.to_frame(name="forward_cp")
df_bic: pd.DataFrame = best_models[min_bic_idx].params.to_frame(name="forward_bic")
df_aic: pd.DataFrame = best_models[min_aic_idx].params.to_frame(name="forward_aic")
df_r2: pd.DataFrame = best_models[max_r2_idx].params.to_frame(name="forward_r2")

# Join all dataframes.
df_compare_forward = df_compare_forward.join(df_cp, how="outer")
df_compare_forward = df_compare_forward.join(df_bic, how="outer")
df_compare_forward = df_compare_forward.join(df_aic, how="outer")  
df_compare_forward = df_compare_forward.join(df_r2, how="outer")

# Reduce float precision.
df_compare_forward = df_compare_forward.round(3)

# Better sorting of the index.
df_compare_forward = df_compare_forward.reindex(natsorted(df_compare_forward.index))

df_compare_forward

In [None]:
# c)

# Run backward stepwise selection.
n_features, rss_list, r2_list, bic_list, aic_list, cp_list, best_models = backward_stepwise_selection(dfX, dfY)

In [None]:
# Make figures.
fig, axs = subplots(5, 1, figsize=(6, 8), sharex=True)

# Plot metrics.
axs[0].plot(n_features, rss_list, marker="o")
axs[0].set_ylabel("RSS")
axs[1].plot(n_features, r2_list, marker="o")
axs[1].set_ylabel("R2")
axs[2].plot(n_features, cp_list, marker="o")
axs[2].set_ylabel("Cp")
axs[3].plot(n_features, bic_list, marker="o")
axs[3].set_ylabel("BIC")
axs[4].plot(n_features, aic_list, marker="o")
axs[4].set_ylabel("AIC")
axs[4].set_xlabel("Number of features")

# Plot minimum RSS as a red cross.
min_rss_idx = np.argmin(rss_list)
axs[0].plot(n_features[min_rss_idx], rss_list[min_rss_idx], marker="x", color="red", markersize=15)

# Plot maximum R2 as a red cross.
max_r2_idx = np.argmax(r2_list)
axs[1].plot(n_features[max_r2_idx], r2_list[max_r2_idx], marker="x", color="red", markersize=15)

# Plot maximum cp as a red cross.
max_cp_idx = np.argmax(cp_list)
axs[2].plot(n_features[max_cp_idx], cp_list[max_cp_idx], marker="x", color="red", markersize=15)

# Plot minimum BIC as a red cross.
min_bic_idx = np.argmin(bic_list)
axs[3].plot(n_features[min_bic_idx], bic_list[min_bic_idx], marker="x", color="red", markersize=15)

# Plot minimum AIC as a red cross.
min_aic_idx = np.argmin(aic_list)
axs[4].plot(n_features[min_aic_idx], aic_list[min_aic_idx], marker="x", color="red", markersize=15)

# X labels.
axs[4].set_xticks(n_features)

# Set grid for all axes.
for ax in axs:
    ax.grid(alpha=0.15)

# Title.
axs[0].set_title("Backward Stepwise Selection Metrics", fontsize=16)

plt.tight_layout()

In [None]:
# Initialize.
df_compare_backward: pd.DataFrame = pd.DataFrame({
    "true": [B0, B1, B2, B3] + [0.0]*7
}, index=[f"X{i}" for i in range(0, 11)])

# Adjusted coefficients of the best model according to Cp, BIC, AIC, and R2.
df_cp: pd.DataFrame = best_models[max_cp_idx].params.to_frame(name="backward_cp")
df_bic: pd.DataFrame = best_models[min_bic_idx].params.to_frame(name="backward_bic")
df_aic: pd.DataFrame = best_models[min_aic_idx].params.to_frame(name="backward_aic")
df_r2: pd.DataFrame = best_models[max_r2_idx].params.to_frame(name="backward_r2")

# Join all dataframes.
df_compare_backward = df_compare_backward.join(df_cp, how="outer")
df_compare_backward = df_compare_backward.join(df_bic, how="outer")
df_compare_backward = df_compare_backward.join(df_aic, how="outer")  
df_compare_backward = df_compare_backward.join(df_r2, how="outer")

# Reduce float precision.
df_compare_backward = df_compare_backward.round(3)

# Better sorting of the index.
df_compare_backward = df_compare_backward.reindex(natsorted(df_compare_backward.index))

# Show it.
df_compare_backward

In [None]:
# Join results dataframes.
df_results: pd.DataFrame = pd.concat([
        df_compare_forward, 
        df_compare_backward.drop(columns=["true"])
    ], 
    axis=1
).T

# Show it.
df_results

In [None]:
%%time

# e)

# Exclude constant term.
X: np.array = np.asarray(dfX.drop(columns=["X0"]))
y: np.array = dfY["Y"].values

# Hyperparameters.
lambdas: np.array = 10 ** np.linspace(6, -5, 100) / y.std() 
K: int = 10
kfold = skm.KFold(K, random_state=0, shuffle=True)

# Lasso with cross-validation.
lassocv = skl.ElasticNetCV(l1_ratio=1.0, alphas=lambdas, cv=kfold)
scaler = StandardScaler(with_mean=True, with_std=True)
pipelinecv = Pipeline(steps=[("scaler", scaler), ("lassocv", lassocv)])

# Fit the models.
pipelinecv.fit(X, y)

# Show results.
tunned_lasso = pipelinecv.named_steps["lassocv"]
tunned_lasso

In [None]:
# Mean and error of the MSE path.
cv_mse: np.array = tunned_lasso.mse_path_.mean(axis=1)
cv_mse_err: np.array = tunned_lasso.mse_path_.std(1) / np.sqrt(K)

# Find better results.
best_idx: int = np.argmin(cv_mse)

print(f">>> Best lambda: {lambdas[best_idx]:.5e}")
print(f">>> Best CV MSE: {cv_mse[best_idx]:.4f} +/- {cv_mse_err[best_idx]:.4f}")

# Create figure.
path_fig, ax = subplots(figsize=(9, 4.5)) 

# Plot it.
ax.errorbar(
    lambdas, 
    cv_mse, 
    yerr=cv_mse_err,
    marker="."
)

# Plot better lambda as a red cross.
ax.plot(
    lambdas[best_idx],
    cv_mse[best_idx],
    marker="x",
    color="red",
    markersize=10,
    markeredgewidth=2
)

# Labels.
ax.set_xscale("log")
ax.set_yscale("log")
ax.set_xlabel("$\\lambda$")
ax.set_ylabel("Mean Squared Error (MSE)")
ax.set_title("Lasso: Cross-validated MSE", weight="bold");

In [None]:
# Lasso results.
new_row = pd.DataFrame(
    np.hstack([tunned_lasso.intercept_, tunned_lasso.coef_])[None, :], 
    columns=dfX.columns, 
    index=["lasso_cv"]
)

# Add lasso results to the results dataframe.
df_results = pd.concat([df_results, new_row], axis=0)

# Show it.
df_results

In [None]:
# f)

# Random predictors and noise.
X: np.ndarray = np.random.normal(loc=0.0, scale=1.0, size=(100, 1))
eps: np.ndarray = np.random.normal(loc=0.0, scale=1.0, size=(100, 1))

# Real parameters that generate data.
B0: float = 10
B7: float = 2.0

# Response.
Y: np.ndarray = B0 + B7 * (X ** 7) + eps

# To dataframes.
dfY: pd.DataFrame = pd.DataFrame(Y, columns=["Y"])

# As dataframe with intercept.
Features: np.ndarray = np.hstack([X ** i for i in range(0, 11)])
dfX = pd.DataFrame(Features, columns=[f"X{i}" for i in range(0, 11)])
dfX

# Adjust a linear model with all predictors.
linear_model = OLS(endog=dfY, exog=dfX).fit()

# Model selection criterion scorer.
sigma2 = linear_model.scale
neg_Cp = partial(nCp , sigma2)

# Run forward stepwise selection.
n_features, rss_list, r2_list, bic_list, aic_list, cp_list, best_models = forward_stepwise_selection(dfX, dfY)

In [None]:
# Make figures.
fig, axs = subplots(5, 1, figsize=(6, 8), sharex=True)

# Plot metrics.
axs[0].plot(n_features, rss_list, marker="o")
axs[0].set_ylabel("RSS")
axs[1].plot(n_features, r2_list, marker="o")
axs[1].set_ylabel("R2")
axs[2].plot(n_features, cp_list, marker="o")
axs[2].set_ylabel("Cp")
axs[3].plot(n_features, bic_list, marker="o")
axs[3].set_ylabel("BIC")
axs[4].plot(n_features, aic_list, marker="o")
axs[4].set_ylabel("AIC")
axs[4].set_xlabel("Number of features")

# Plot minimum RSS as a red cross.
min_rss_idx = np.argmin(rss_list)
axs[0].plot(n_features[min_rss_idx], rss_list[min_rss_idx], marker="x", color="red", markersize=15)

# Plot maximum R2 as a red cross.
max_r2_idx = np.argmax(r2_list)
axs[1].plot(n_features[max_r2_idx], r2_list[max_r2_idx], marker="x", color="red", markersize=15)

# Plot maximum cp as a red cross.
max_cp_idx = np.argmax(cp_list)
axs[2].plot(n_features[max_cp_idx], cp_list[max_cp_idx], marker="x", color="red", markersize=15)

# Plot minimum BIC as a red cross.
min_bic_idx = np.argmin(bic_list)
axs[3].plot(n_features[min_bic_idx], bic_list[min_bic_idx], marker="x", color="red", markersize=15)

# Plot minimum AIC as a red cross.
min_aic_idx = np.argmin(aic_list)
axs[4].plot(n_features[min_aic_idx], aic_list[min_aic_idx], marker="x", color="red", markersize=15)

# X labels.
axs[4].set_xticks(n_features)

# Set grid for all axes.
for ax in axs:
    ax.grid(alpha=0.15)

# Title.
axs[0].set_title("Forward Stepwise Selection Metrics", fontsize=16)

plt.tight_layout()

In [None]:
%%time

# Exclude constant term.
X: np.array = np.asarray(dfX.drop(columns=["X0"]))
y: np.array = dfY["Y"].values

# Hyperparameters.
lambdas: np.array = 10 ** np.linspace(10, -12, 200) / y.std() 
K: int = 10
kfold = skm.KFold(K, random_state=0, shuffle=True)

# Lasso with cross-validation.
lassocv = skl.ElasticNetCV(l1_ratio=1.0, alphas=lambdas, cv=kfold)
scaler = StandardScaler()
pipelinecv = Pipeline(steps=[("scaler", scaler), ("lassocv", lassocv)])

# Fit the models.
pipelinecv.fit(X, y)

# Show results.
tunned_lasso = pipelinecv.named_steps["lassocv"]
tunned_lasso

In [None]:
# Mean and error of the MSE path.
cv_mse: np.array = tunned_lasso.mse_path_.mean(axis=1)
cv_mse_err: np.array = tunned_lasso.mse_path_.std(1) / np.sqrt(K)

# Find better results.
best_idx: int = np.argmin(cv_mse)

print(f">>> Best lambda: {lambdas[best_idx]:.5e}")
print(f">>> Best CV MSE: {cv_mse[best_idx]:.4f} +/- {cv_mse_err[best_idx]:.4f}")

# Create figure.
path_fig, ax = subplots(figsize=(9, 4.5)) 

# Plot it.
ax.errorbar(
    lambdas, 
    cv_mse, 
    yerr=cv_mse_err,
    marker="."
)

# Plot better lambda as a red cross.
ax.plot(
    lambdas[best_idx],
    cv_mse[best_idx],
    marker="x",
    color="red",
    markersize=10,
    markeredgewidth=2
)

# Labels.
ax.set_xscale("log")
ax.set_yscale("log")
ax.set_xlabel("$\\lambda$")
ax.set_ylabel("Mean Squared Error (MSE)")
ax.set_title("Lasso: Cross-validated MSE", weight="bold");

In [None]:
# Expected coefficients.
df_compare_forward: pd.DataFrame = pd.DataFrame({
    "true": [B0, 0, 0, 0, 0, 0, 0, B7, 0, 0, 0]
}, index=[f"X{i}" for i in range(0, 11)])

# Adjusted coefficients of the best model according to Cp, BIC, AIC, and R2.
df_cp: pd.DataFrame = best_models[max_cp_idx].params.to_frame(name="forward_cp")
df_bic: pd.DataFrame = best_models[min_bic_idx].params.to_frame(name="forward_bic")
df_aic: pd.DataFrame = best_models[min_aic_idx].params.to_frame(name="forward_aic")
df_r2: pd.DataFrame = best_models[max_r2_idx].params.to_frame(name="forward_r2")

# Join all dataframes.
df_compare_forward = df_compare_forward.join(df_cp, how="outer")
df_compare_forward = df_compare_forward.join(df_bic, how="outer")
df_compare_forward = df_compare_forward.join(df_aic, how="outer")  
df_compare_forward = df_compare_forward.join(df_r2, how="outer")

# Reduce float precision.
df_compare_forward = df_compare_forward.round(3)

# Better sorting of the index.
df_compare_forward = df_compare_forward.reindex(natsorted(df_compare_forward.index))

# Traspose it.
df_results = df_compare_forward.T

# Lasso results.
new_row = pd.DataFrame(
    np.hstack([tunned_lasso.intercept_, tunned_lasso.coef_])[None, :], 
    columns=dfX.columns, 
    index=["lasso_cv"]
)

# Add lasso results to the results dataframe.
df_results = pd.concat([df_results, new_row], axis=0)

# Show it.
df_results

#### 9

In [None]:
# Load into memory.
college_df: pd.DataFrame = pd.read_csv("/statapp/islp/data/College.csv")

# Show it.
college_df

In [None]:
# Histogram target variable.
fig, ax = subplots(figsize=(8, 4))
college_df["Apps"].hist(bins=50, ax=ax)
ax.set_xlabel("Number of applications received")
ax.set_ylabel("Frequency")

# Mean and standard deviation.
print(f">>> Mean: {college_df['Apps'].mean():.2f}")
print(f">>> Median: {college_df['Apps'].median():.2f}")
print(f">>> Standard deviation: {college_df['Apps'].std():.2f}")

In [None]:
# (a)

# Features. Drop categorial and identifier columns
dfX: pd.DataFrame = college_df.drop(
    columns=["Unnamed: 0", "Private", "Apps",]
)

# Add a first column of ones for the intercept.
dfX.insert(0, "Intercept", 1.0)

# Target
sy: pd.Series = college_df["Apps"]

# Data split.
dfX_train, dfX_test, sy_train, sy_test = train_test_split(dfX, sy, train_size=0.75, random_state=56)

print(f">>> Training set shape: {dfX_train.shape}")
print(f">>> Test set shape: {dfX_test.shape}")

In [None]:
# b) 

# Fit a linear model using least squares with all predictors
linear_model = OLS(endog=sy_train, exog=dfX_train).fit()

# Predictions in training sample
sy_train_hat = linear_model.predict(dfX_train)

# Predictions in test sample
sy_test_hat = linear_model.predict(dfX_test)

# Calculate mean squared error
mse_train: float = ((sy_train_hat - sy_train) ** 2).mean()
mse_test: float = ((sy_test_hat - sy_test) ** 2).mean()

# Show it
print(f">>> Training MSE: {mse_train:.2e}")
print(f">>> Test MSE: {mse_test:.2e}")

In [None]:
%%time

# c)

# Training data as numpy arrays. Ignore first column (intercept)
X_train: np.ndarray = dfX_train.iloc[:, 1:].values
y_train: np.ndarray = sy_train.values

# Test data as numpy arrays
X_test: np.ndarray = dfX_test.iloc[:, 1:].values
y_test: np.ndarray = sy_test.values

# Define lambdas based on complete training target standard deviation
lambdas: np.ndarray = 10 ** np.linspace(11, -3, 100) / y_train.std() 

print(f">>> Number of lambdas: {lambdas.shape[0]}")
print(f">>> Lambda max: {lambdas.max():.2e}")
print(f">>> Lambda min: {lambdas.min():.2e}\n")

# The pipeline first scales the features and then applies Ridge regression
pipeline = Pipeline([("scaler", StandardScaler()),("ridge_regression", Ridge())])

# Alpha is the parameter for the Ridge model
param_grid: Dict[str, np.ndarray] = {"ridge_regression__alpha": lambdas}

# GridSearchCV will perform an exhaustive search over the defined alpha values
grid_search = GridSearchCV(
    estimator=pipeline,
    param_grid=param_grid,
    cv=5, 
    scoring='neg_mean_squared_error', # or 'r2'
    verbose=1,
    n_jobs=-1 # Use all available cores
)

# This step runs the cross-validation for all alpha values within the pipeline
grid_search.fit(X_train, y_train)

# Evaluate the best model on the test set using MSE
best_model_ridge = grid_search.best_estimator_
y_test_pred: np.ndarray = best_model_ridge.predict(X_test)
test_mse: float = mean_squared_error(y_test, y_test_pred)

# Retrieve the best alpha and model score
print(f">>> Best alpha found by `GridSearchCV`: {grid_search.best_params_['ridge_regression__alpha']:.2e}")
print(f">>> Best cross-validation score MSE: {(-1.0) * grid_search.best_score_:.2e}")
print(f">>> Test set MSE of the best model: {test_mse:.2e}\n")

In [None]:
# Create figure
path_fig, ax = subplots(figsize=(9, 4.5)) 

# Plot it
ax.errorbar(
    grid_search.cv_results_["param_ridge_regression__alpha"].data, 
    (-1.0) * grid_search.cv_results_["mean_test_score"], 
    yerr=grid_search.cv_results_["std_test_score"],
    marker=".",
    label="Cross-validated MSE (Training Set)"
)

# Plot better lambda as a red cross
ax.plot(
    grid_search.best_params_['ridge_regression__alpha'],
    (-1.0) * grid_search.best_score_,
    marker="x",
    color="red",
    markersize=10,
    markeredgewidth=2
)

# Plot Linear Regression MSE as a horizontal green dashed line
ax.axhline(y=mse_test, color="green", linestyle="--", label="Linear Regression MSE on Test Set")

# Plot Ridge Regression MSE on Test Set as a horizontal orange dashed line
ax.axhline(y=test_mse, color="orange", linestyle="--", label="Ridge Regression MSE on Test Set")

# Labels
ax.legend()
ax.set_xscale("log")
ax.set_yscale("log")
ax.set_xlabel("$\\lambda$")
ax.set_ylabel("Mean Squared Error (MSE)")
ax.set_title("Ridge Regression\nCross-validated MSE", weight="bold");

In [None]:
%%time

# d)

# The pipeline ensures scaling happens inside the cross-validation loops
lasso_pipeline = Pipeline([
    ("scaler", StandardScaler()),
    ("lasso_cv", LassoCV(cv=5, random_state=0, alphas=lambdas)) 
])

# The fit method performs the cross-validation to find the optimal alpha
lasso_pipeline.fit(X_train, y_train)

# Evaluate the best model on the test set using MSE
y_test_pred_lasso: np.ndarray = lasso_pipeline.predict(X_test)
test_mse_lasso: float = mean_squared_error(y_test, y_test_pred_lasso)

# Retrieve the best alpha and model score
best_model_lasso = lasso_pipeline.named_steps["lasso_cv"]
print(f">>> Best alpha found by `LassoCV`: {best_model_lasso.alpha_:.2e}")
print(f">>> Best cross-validation score MSE: {best_model_lasso.mse_path_.mean(axis=1).min():.2e}")
print(f">>> Test set MSE of the best model: {test_mse:.2e}\n")

In [None]:
# Create figure
path_fig, ax = subplots(figsize=(9, 4.5)) 

# Plot it
ax.errorbar(
    lasso_pipeline.named_steps['lasso_cv'].alphas, 
    lasso_pipeline.named_steps['lasso_cv'].mse_path_.mean(axis=1), 
    yerr=lasso_pipeline.named_steps['lasso_cv'].mse_path_.std(axis=1),
    marker=".",
    label="Cross-validated MSE (Training Set)"
)

# Plot better lambda as a red cross
ax.plot(
    best_model_lasso.alpha_,
    best_model_lasso.mse_path_.mean(axis=1).min(),
    marker="x",
    color="red",
    markersize=10,
    markeredgewidth=2
)

# Plot Linear Regression MSE as a horizontal green dashed line
ax.axhline(y=mse_test, color="green", linestyle="--", label="Linear Regression MSE on Test Set")

# Plot Lasso MSE on Test Set as a horizontal orange dashed line
ax.axhline(y=test_mse_lasso, color="orange", linestyle="--", label="Lasso Regression MSE on Test Set")

# Labels
ax.legend()
ax.set_xscale("log")
ax.set_yscale("log")
ax.set_xlabel("$\\lambda$")
ax.set_ylabel("Mean Squared Error (MSE)")
ax.set_title("Lasso Regression\nCross-validated MSE", weight="bold");

In [None]:
# Columns for coefficients DataFrame
columns: List[str] = dfX_train.columns.to_list()

# Grab coefficients of least squares model
ols_coefs: np.ndarray = linear_model.params.values[None, :]

# Grab coefficients of best Ridge model. Scale parameters back to original data scale
scaler = best_model_ridge.named_steps["scaler"]
mu = scaler.mean_
sigma = np.sqrt(scaler.var_)
ridge_coefs: np.ndarray = best_model_ridge.named_steps["ridge_regression"].coef_ / sigma
ridge_intercept: float = best_model_ridge.named_steps["ridge_regression"].intercept_ - np.sum((mu / sigma) * best_model_ridge.named_steps["ridge_regression"].coef_)

# Grab coefficients of best Lasso model in original data scale
scaler = lasso_pipeline.named_steps["scaler"]
mu = scaler.mean_
sigma = np.sqrt(scaler.var_)
lasso_coefs: np.ndarray = best_model_lasso.coef_ / sigma
lasso_intercept: float = best_model_lasso.intercept_ - np.sum((mu / sigma) * best_model_lasso.coef_)

# Create a DataFrame to compare coefficients
df_coefs: pd.DataFrame = pd.DataFrame(
    ols_coefs, 
    columns=columns, 
    index=["least_squares"]
)

# Add Ridge coefficients
df_coefs = pd.concat([
    df_coefs,
    pd.DataFrame(
        np.hstack([ridge_intercept, ridge_coefs])[None, :], 
        columns=columns, 
        index=["ridge_cv"]
    )
], axis=0)

# Add Lasso coefficients
df_coefs = pd.concat([
    df_coefs,
    pd.DataFrame(
        np.hstack([lasso_intercept, lasso_coefs])[None, :], 
        columns=columns, 
        index=["lasso_cv"]
    )
], axis=0)


# Show it
df_coefs