In [1]:
import warnings
warnings.filterwarnings('ignore')

import numpy as np
import pandas as pd
from sklearn.linear_model import Lasso, Ridge, LassoCV, RidgeCV, LassoLarsIC
from sklearn.metrics import mean_squared_error
from tqdm import tqdm
from joblib import Parallel, delayed

In [2]:
# Part (a): Function to calculate required sigma^2 for given R^2

def calculate_sigma2(X_cov, beta_star, r_square):
    """
    Calculate sigma^2 to achieve desired R^2
    Args:
        X_cov: design matrix
        beta_star: true coefficients
        r: desired R^2 value
    Returns:
        sigma2: required noise variance
    """

    return (1 - r_square) * beta_star.T @ X_cov @ beta_star / r_square

# Generate data function

def generate_data(n, p, rho, beta_type='sparse', seed=42):
    """
    Generate synthetic data
    Args:
        n: sample size
        p: dimension
        rho: correlation parameter
        beta_type: 'sparse' or 'dense'
    Returns:
        X: design matrix
        y: response vector
        beta_star: true coefficients
    """
    if seed is not None:
        np.random.seed(seed)

    Sigma = np.zeros((p, p))
    for i in range(p):
        for j in range(p):
            Sigma[i, j] = rho ** abs(i-j)

    X = np.random.multivariate_normal(np.zeros(p), Sigma, size=n)

    if beta_type == 'sparse':
        beta_star = np.array(
            [2 / np.sqrt(n) if j < np.sqrt(p) else 0 for j in range(1, p + 1)])
    elif beta_type == 'dense':  # dense
        beta_star = np.array([5 / j * np.sqrt(n) for j in range(1, p + 1)])
    else:
        raise ValueError(f"Invalid beta_type: {beta_type}")

    return X, Sigma, beta_star

def ridge_with_ic(X, y, criterion='aic'):
    """
    Ridge regression with AIC or BIC criterion
    Args:
        X: design matrix (n × p)
        y: response vector (n × 1)
        criterion: 'aic' or 'bic'
    Returns:
        fitted Ridge model with best alpha
    """
    n, _ = X.shape
    alphas = np.logspace(-6, 6, 10)
    best_ic = np.inf
    best_alpha = None

    for alpha in alphas:
        ridge = Ridge(alpha=alpha)
        ridge.fit(X, y)

        y_pred = ridge.predict(X)
        rss = np.sum((y - y_pred) ** 2)

        svd = np.linalg.svd(X, compute_uv=False)
        df = np.sum(svd**2 / (svd**2 + alpha))

        if criterion.lower() == 'aic':
            ic = n * np.log(rss/n) + 2 * df
        elif criterion.lower() == 'bic':  # BIC
            ic = n * np.log(rss/n) + np.log(n) * df
        else:
            raise ValueError(f"Invalid criterion: {criterion}")

        if ic < best_ic:
            best_ic = ic
            best_alpha = alpha

    best_model = Ridge(alpha=best_alpha)
    best_model.fit(X, y)

    best_model.criterion_value = best_ic
    best_model.best_alpha = best_alpha

    return best_model


def fit_model(X, y, model_type, tuning_method):
    """
    Fit model with specified tuning method
    Args:
        X: design matrix
        y: response vector
        model_type: 'Lasso', 'Ridge', 'Adaptive Lasso', or 'Adaptive Ridge'
        tuning_method: 'AIC', 'BIC', or 'LOO-CV'
    Returns:
        fitted model
    """
    if model_type == 'Lasso':
        if tuning_method == 'LOO-CV':
            model = LassoCV(cv=len(X))
        elif tuning_method in ['AIC', 'BIC']:
            model = LassoLarsIC(criterion=tuning_method.lower())
        model.fit(X, y)

    elif model_type == 'Ridge':
        alphas = np.logspace(-6, 6, 10)
        if tuning_method == 'LOO-CV':
            model = RidgeCV(alphas=alphas, cv=len(X), scoring='neg_mean_squared_error')
        elif tuning_method in ['AIC', 'BIC']:
            model = ridge_with_ic(X, y, criterion=tuning_method)
        model.fit(X, y)

    elif model_type == 'Adaptive Lasso':
        lasso = Lasso(alpha=1.0)
        lasso.fit(X, y)
        weights = 1 / np.abs(lasso.coef_ + 1e-6)
        weights = weights / np.mean(weights)
        X_weighted = X * weights

        if tuning_method == 'LOO-CV':
            model = LassoCV(cv=len(X))
        elif tuning_method in ['AIC', 'BIC']:
            model = LassoLarsIC(criterion=tuning_method.lower())

        model.fit(X_weighted, y)
        model.weights = weights

    elif model_type == 'Adaptive Ridge':
        ridge = Ridge(alpha=1.0)
        ridge.fit(X, y)
        weights = 1 / np.abs(ridge.coef_ + 1e-6)
        weights = weights / np.mean(weights)
        X_weighted = X * weights

        alphas = np.logspace(-6, 6, 10)
        if tuning_method == 'LOO-CV':
            model = RidgeCV(alphas=alphas, cv=len(X), scoring='neg_mean_squared_error')
        elif tuning_method in ['AIC', 'BIC']:
            model = ridge_with_ic(X, y, criterion=tuning_method)

        model.fit(X_weighted, y)
        # Store weights for prediction
        model.weights = weights

    else:
        raise ValueError(f"Invalid model_type: {model_type}")

    if model_type in ['Adaptive Lasso', 'Adaptive Ridge']:
        original_predict = model.predict
        def new_predict(self, X_new):
            return original_predict(X_new * model.weights)
        model.predict = new_predict.__get__(model)

    return model

def run_simulation(n, p, rho, beta_type, n_datasets=1000):
    def single_simulation(sim_id):
        X, X_cov, beta_star = generate_data(n, p, rho, beta_type, seed=sim_id)

        sigma2 = calculate_sigma2(X_cov, beta_star, 0.8)

        epsilon = np.random.normal(0, np.sqrt(sigma2), n)
        y = X @ beta_star + epsilon

        results = []
        models = [
            (i, j)
            for i in ['Lasso', 'Ridge', 'Adaptive Lasso', 'Adaptive Ridge']
            for j in ['AIC', 'BIC', 'LOO-CV']
        ]

        for model_type, tuning in models:
            try:
                model = fit_model(X, y, model_type, tuning)
                mse = mean_squared_error(y, model.predict(X))
                results.append({
                    'model': model_type,
                    'tuning': tuning,
                    'mse': mse,
                    'n': n,
                    'p': p,
                    'rho': rho,
                    'beta_type': beta_type
                })
            except Exception as e:
                print(f"Error in simulation {sim_id} with {model_type}, {tuning}: {str(e)}")

        return results

    results = Parallel(n_jobs=-1)(
        delayed(single_simulation)(i)
        for i in tqdm(range(n_datasets))
    )

    flat_results = [item for sublist in results for item in sublist]
    return pd.DataFrame(flat_results)

In [21]:
parameter_settings = [
    {'n': 100, 'p': p, 'rho': rho, 'beta_type': beta_type}
    for p in [10, 25, 50]
    for rho in [0, 0.25, 0.5]
    for beta_type in ['sparse', 'dense']
]

all_results = []
for params in parameter_settings:
    results = run_simulation(**params)
    all_results.append(results)

final_results = pd.concat(all_results, axis=0).reset_index(drop=True)
print(final_results)

average_results = (final_results
    .groupby(['model', 'tuning', 'p', 'rho', 'beta_type'])['mse']
    .mean()
    .reset_index()
)

table = average_results.pivot_table(
    index=['p', 'rho', 'beta_type'],
    columns=['model', 'tuning'],
    values='mse'
)

print("\nAverage MSE across 1000 simulations:")
print(table)

 94%|█████████▍| 944/1000 [01:54<00:07,  7.69it/s]