<a href="https://colab.research.google.com/github/aarontavel/DATA441/blob/main/Tavel_HW3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install skorch

In [None]:
import torch # we are going to use pytorch instead of numpy because it's much faster.
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler, MinMaxScaler, QuantileTransformer
%matplotlib inline
%config InlineBackend.figure_format = 'retina'
import seaborn as sns
import pandas as pd
from sklearn.linear_model import Ridge, Lasso, ElasticNet, LinearRegression
from sklearn.datasets import make_spd_matrix
from scipy.optimize import minimize
from scipy.linalg import toeplitz
from sklearn.metrics import mean_absolute_error as mae, mean_squared_error as mse, r2_score as R2
from sklearn.model_selection import train_test_split as tts, KFold, GridSearchCV
from sklearn.model_selection import GridSearchCV
from scipy.linalg import toeplitz
from sklearn.model_selection import KFold, GridSearchCV


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


##1. (5 points) Create your own PyTorch class that implements the method of SCAD regularization and variable selection (smoothly clipped absolute deviations) for linear models.

In [None]:
class SCAD(nn.Module):

  def __init__(self, input_size, alpha = 0.25, lambda_val = 0.001, epoch = 5000, learning_rate = 0.0001, scaler = None):
    super(SCAD, self).__init__()
    self.input_size = input_size
    self.alpha = alpha
    self.lambda_val = lambda_val
    self.epoch = epoch
    self.learning_rate = learning_rate
    self.scaler = scaler
    self._is_fitted = False

    # Defining Linear Regression
    self.linear = nn.Linear(input_size, 1, bias = False, device = device)

    self.beta_hat = nn.Parameter(torch.randn((input_size,1), dtype = torch.float64, requires_grad=True)) # Initializes weights

  def _scale_data(self, x):
    if self.scaler is not None:
      scaled_data = self.scaler.fit_transform(x)
      return torch.tensor(scaled_data, dtype=torch.float64)
    else:
      return torch.tensor(x, dtype=torch.float64, requires_grad=True)

  def scad_penalty(self):

    is_linear = torch.abs(self.beta_hat) <= self.lambda_val
    is_quadratic = (self.lambda_val < torch.abs(self.beta_hat)) & (torch.abs(self.beta_hat) <= self.alpha * self.lambda_val)
    is_constant = (self.alpha * self.lambda_val) < torch.abs(self.beta_hat)

    linear_part = self.lambda_val * torch.abs(self.beta_hat) * is_linear.float()
    quadratic_part = (2 * self.alpha * self.lambda_val * torch.abs(self.beta_hat) - self.beta_hat**2 - self.lambda_val**2) / (2 * (self.alpha - 1)) * is_quadratic.float()
    constant_part = (self.lambda_val**2 * (self.alpha + 1)) / 2 * is_constant.float()

    return linear_part + quadratic_part + constant_part

  def forward(self, x):
    x_tensor = self._scale_data(x)

    return torch.matmul(x_tensor, self.beta_hat)

  def loss(self, x, y_pred, y_train):

    y_train_tensor = torch.tensor(y_train, dtype = torch.float64).flatten()

    mse_loss = nn.MSELoss()(y_pred.flatten(), y_train_tensor)

    scad_penalty = torch.sum(self.scad_penalty())

    total_loss = mse_loss + scad_penalty

    return total_loss

  def fit(self, x, y_train): #takes y_train

    optimizer = torch.optim.Adam([self.beta_hat], lr = self.learning_rate)

    for epoch in range(self.epoch):

      optimizer.zero_grad()

      predictions = self.forward(x)

      loss = self.loss(x, predictions, y_train)

      loss.backward()

      optimizer.step()

      #if (epoch + 1) % 500 == 0:
        #print(f"Epoch [{epoch + 1}/{self.epoch}], Loss: {loss.item()}")

    self._is_fitted = True

  def predict(self, x):
    x_tensor = self._scale_data(x)

    return self.forward(x_tensor)

  def get_coefficients(self):

    return self.beta_hat.detach().clone()

  def is_fitted(self):
    return self._is_fitted

#### Testing Question 1

In [None]:
device = torch.device("cuda")
dtype = torch.float64

In [None]:
data = pd.read_csv("/content/drive/MyDrive/Sophomore Year/DATA/DATA-310/Module 3/Regression Problems/Data/concrete.csv")

In [None]:
x = data.loc[:,'cement':'age'].values
y = data['strength'].values

In [None]:
from sklearn.model_selection import KFold
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import StandardScaler
import numpy as np

mse_scad = []

scale = StandardScaler()
kf = KFold(n_splits=10, shuffle=True, random_state=1234)

for idxtrain, idxtest in kf.split(x):
    xtrain = x[idxtrain]
    ytrain = y[idxtrain]
    ytest = y[idxtest]
    xtest = x[idxtest]

    xtrain_scaled = scale.fit_transform(xtrain)
    xtest_scaled = scale.transform(xtest)

    model = SCAD(input_size=x.shape[1], scaler=scale, alpha=0.25, lambda_val=0.001, learning_rate=0.0001)
    model.fit(xtrain_scaled, ytrain)
    yhat = model.predict(xtest_scaled).detach().numpy()

    mse_fold = mean_squared_error(ytest, yhat)
    mse_scad.append(mse_fold)

In [None]:
print('The Cross-validated Mean Squared Error for SCAD is:', np.mean(mse_scad))

The Cross-validated Mean Squared Error for SCAD is: 1540.5411640543618


In [None]:
model.get_coefficients()

tensor([[-0.5772],
        [ 1.7974],
        [-1.0598],
        [-1.9131],
        [-0.9208],
        [-0.2894],
        [-0.4967],
        [ 0.3871]], dtype=torch.float64)

##Based on the simulation design explained in class, generate 500 data sets where the input features have a strong correlation structure (you may consider a 0.9) and apply ElasticNet, SqrtLasso and SCAD to check which method produces the best approximation of an ideal solution, such as a "betastar" you design with a sparsity pattern.

In [None]:
class ElasticNet(nn.Module):
    def __init__(self, input_size, alpha=1.0, l1_ratio=0.5):
        """
        Initialize the ElasticNet regression model.

        Args:
            input_size (int): Number of input features.
            alpha (float): Regularization strength. Higher values of alpha
                emphasize L1 regularization, while lower values emphasize L2 regularization.
            l1_ratio (float): The ratio of L1 regularization to the total
                regularization (L1 + L2). It should be between 0 and 1.

        """
        super(ElasticNet, self).__init__()
        self.input_size = input_size
        self.alpha = alpha
        self.l1_ratio = l1_ratio

        # Define the linear regression layer
        self.linear = nn.Linear(input_size, 1)

    def forward(self, x):
        """
        Forward pass of the ElasticNet model.

        Args:
            x (Tensor): Input data with shape (batch_size, input_size).

        Returns:
            Tensor: Predicted values with shape (batch_size, 1).

        """
        return self.linear(x)

    def loss(self, y_pred, y_true):
        """
        Compute the ElasticNet loss function.

        Args:
            y_pred (Tensor): Predicted values with shape (batch_size, 1).
            y_true (Tensor): True target values with shape (batch_size, 1).

        Returns:
            Tensor: The ElasticNet loss.

        """
        mse_loss = nn.MSELoss()(y_pred, y_true)
        l1_reg = torch.norm(self.linear.weight, p=1)
        l2_reg = torch.norm(self.linear.weight, p=2)

        loss = mse_loss + self.alpha * (
            self.l1_ratio * l1_reg + (1 - self.l1_ratio) * l2_reg
        )

        return loss

    def fit(self, X, y, num_epochs=100, learning_rate=0.01):
        """
        Fit the ElasticNet model to the training data.

        Args:
            X (Tensor): Input data with shape (num_samples, input_size).
            y (Tensor): Target values with shape (num_samples, 1).
            num_epochs (int): Number of training epochs.
            learning_rate (float): Learning rate for optimization.

        """
        optimizer = optim.SGD(self.parameters(), lr=learning_rate)

        for epoch in range(num_epochs):
            self.train()
            optimizer.zero_grad()
            y_pred = self(X)
            loss = self.loss(y_pred, y)
            loss.backward()
            optimizer.step()

    def predict(self, X):
        """
        Predict target values for input data.

        Args:
            X (Tensor): Input data with shape (num_samples, input_size).

        Returns:
            Tensor: Predicted values with shape (num_samples, 1).

        """
        self.eval()
        with torch.no_grad():
            y_pred = self(X)
        return y_pred
    def get_coefficients(self):
        """
        Get the coefficients (weights) of the linear regression layer.

        Returns:
            Tensor: Coefficients with shape (output_size, input_size).

        """
        return self.linear.weight


In [36]:
# we can call this version PED_Adam because we use the adaptive momentum gradient descent for optimization
class sqrtLasso(nn.Module):
    def __init__(self, input_size, alpha=0.1):
        """
        Initialize the  regression model.


        """
        super(sqrtLasso, self).__init__()
        self.input_size = input_size
        self.alpha = alpha


        # Define the linear regression layer
        self.linear = nn.Linear(input_size, 1).double()

    def forward(self, x):
        """
        Forward pass of the model.

        Args:
            x (Tensor): Input data with shape (batch_size, input_size).

        Returns:
            Tensor: Predicted values with shape (batch_size, 1).

        """
        return self.linear(x)

    def loss(self, y_pred, y_true):
        """
        Compute the loss function.

        Args:
            y_pred (Tensor): Predicted values with shape (batch_size, 1).
            y_true (Tensor): True target values with shape (batch_size, 1).

        Returns:
            Tensor: The loss.

        """
        mse_loss = nn.MSELoss()(y_pred, y_true)
        l1_reg = torch.norm(self.linear.weight, p=1,dtype=torch.float64)
        # l2_reg = torch.norm(self.linear.weight, p=2,dtype=torch.float64)

        loss = (len(y_true)*mse_loss)**(1/2) + self.alpha * (l1_reg)

        return loss

    def fit(self, X, y, num_epochs=200, learning_rate=0.01):
        """
        Fit the model to the training data.

        Args:
            X (Tensor): Input data with shape (num_samples, input_size).
            y (Tensor): Target values with shape (num_samples, 1).
            num_epochs (int): Number of training epochs.
            learning_rate (float): Learning rate for optimization.

        """
        optimizer = optim.Adam(self.parameters(), lr=learning_rate)

        for epoch in range(num_epochs):
            self.train()
            optimizer.zero_grad()
            y_pred = self(X)
            loss = self.loss(y_pred, y)
            loss.backward()
            optimizer.step()

    def predict(self, X):
        """
        Predict target values for input data.

        Args:
            X (Tensor): Input data with shape (num_samples, input_size).

        Returns:
            Tensor: Predicted values with shape (num_samples, 1).

        """
        self.eval()
        with torch.no_grad():
            y_pred = self(X)
        return y_pred
    def get_coefficients(self):
        """
        Get the coefficients (weights) of the linear regression layer.

        Returns:
            Tensor: Coefficients with shape (output_size, input_size).

        """
        return self.linear.weight

In [None]:
import numpy as np

In [None]:
scale = StandardScaler()

In [None]:
def make_correlated_features(num_samples,p,rho):
  vcor = []
  for i in range(p):
    vcor.append(rho**i)
  r = toeplitz(vcor)
  mu = np.repeat(0,p)
  x = np.random.multivariate_normal(mu, r, size=num_samples)
  return x

In [None]:
rho =0.9 # correlation
p = 200 # predictors
n = 150 # number of variables
vcor = []
for i in range(p):
  vcor.append(rho**i)

In [None]:
beta =np.array([-1,2,3,0,0,0,0,2,-1,4])
beta = beta.reshape(-1,1)
betastar = np.concatenate([beta,np.repeat(0,p-len(beta)).reshape(-1,1)],axis=0)

In [38]:
# SCAD -- This code is taking too long to run. It was completing approximately 5 iterations/minute which would take too long and my computer was overheating.
# I will write code below for what i would do in order to complete the question.

all_betas = []

for i in range(5):
    individual_betas = []
    x = make_correlated_features(n, p, rho)
    y = x @ betastar + 0.5 * np.random.normal(size=(150, 1))

    model = SCAD(input_size=x.shape[1], scaler=scale, alpha=0.25, lambda_val=0.001, learning_rate=0.0001)
    model.fit(x, y)
    coef = model.get_coefficients()

    individual_betas.append(coef)
    all_betas.append(individual_betas)

# Convert the list of tensors to a single tensor
all_betas_tensor = torch.stack([torch.stack(betas) for betas in all_betas])

average_betas_SCAD = torch.mean(all_betas_tensor, dim=0)

print("Average betas across iterations for:", average_betas_SCAD)

Average betas across iterations for: tensor([[[ 0.5080],
         [ 0.8076],
         [ 0.5827],
         [ 0.3673],
         [ 0.5758],
         [ 0.4098],
         [ 0.4298],
         [-0.1010],
         [ 0.2603],
         [ 0.1118],
         [ 0.6125],
         [-0.1074],
         [ 0.1397],
         [-0.2210],
         [-0.3646],
         [ 0.5322],
         [-0.1071],
         [ 0.6398],
         [ 0.1063],
         [-0.1868],
         [ 0.9004],
         [-0.3061],
         [ 0.2371],
         [ 0.1242],
         [-0.2953],
         [ 0.2018],
         [ 0.4716],
         [ 0.2560],
         [ 0.1037],
         [-0.1330],
         [-0.8828],
         [ 0.4840],
         [-0.5099],
         [ 0.0033],
         [ 0.0844],
         [ 0.1751],
         [-0.3028],
         [ 0.3269],
         [-0.1063],
         [ 0.1428],
         [-0.0401],
         [-0.7981],
         [ 0.0357],
         [-0.2224],
         [ 0.1382],
         [-0.3540],
         [ 0.1335],
         [ 0.3392],
   

In [None]:
# ElasticNet -- this one runs much faster for some reason -- unsure why my SCAD takes so long

all_betas = []

for i in range(500):
    individual_betas = []
    x = make_correlated_features(n, p, rho)
    y = x @ betastar + 0.5 * np.random.normal(size=(150, 1))

    x_tensor = torch.tensor(x, dtype = torch.float64).float()
    y_tensor = torch.tensor(y, dtype = torch.float64).float()

    model = ElasticNet(input_size = x_tensor.shape[1])
    model.fit(x_tensor, y_tensor)
    coef = model.get_coefficients()

    individual_betas.append(coef)
    all_betas.append(individual_betas)

all_betas_tensor = torch.stack([torch.stack(betas) for betas in all_betas])

average_betas_eN = torch.mean(all_betas_tensor, dim=0)

print("Average betas across iterations for ENet:", average_betas_eN)

In [None]:
# sqrtLASSO

all_betas = []

for i in range(500):
    individual_betas = []
    x = make_correlated_features(n, p, rho)
    y = x @ betastar + 0.5 * np.random.normal(size=(150, 1))

    x_tensor = torch.tensor(x, dtype = torch.float64).double()
    y_tensor = torch.tensor(y, dtype = torch.float64).double()

    model = sqrtLasso(input_size = x_tensor.shape[1])
    model.fit(x_tensor, y_tensor)
    coef = model.get_coefficients()

    individual_betas.append(coef)
    all_betas.append(individual_betas)

# Convert the list of tensors to a single tensor
all_betas_tensor = torch.stack([torch.stack(betas) for betas in all_betas])

average_betas_sL = torch.mean(all_betas_tensor, dim=0)

print("Average betas across iterations for sqrtLasso:", average_betas_sL)