<a href="https://colab.research.google.com/github/aegisen/DATA441/blob/main/Homework3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 1

## Imports and supporting functions

In [3]:
import torch
import torch.nn as nn
import torch.optim as optim

In [4]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler, MinMaxScaler, PolynomialFeatures
from sklearn.model_selection import train_test_split as tts, KFold
from sklearn.linear_model import Ridge, Lasso, ElasticNet, LinearRegression
from sklearn.datasets import make_spd_matrix
from scipy.optimize import minimize
from scipy.linalg import toeplitz
from sklearn.metrics import mean_absolute_error as mae, mean_squared_error as mse, r2_score as R2

In [5]:
class StandardScaler:
    def __init__(self):
        self.mean = None
        self.std = None

    def fit(self, data):
        """
        Compute the minimum and maximum value of the data for scaling.

        Args:
        - data (torch.Tensor): Input data tensor.
        """
        self.mean = torch.mean(data, dim=0, keepdim=True)
        self.std = torch.std(data, dim=0, keepdim=True)+1e-10

    def transform(self, data):
        """
        Scale the data based on the computed minimum and maximum values.

        Args:
        - data (torch.Tensor): Input data tensor.

        Returns:
        - torch.Tensor: Scaled data tensor.
        """
        if self.mean is None or self.std is None:
            raise ValueError("Scaler has not been fitted yet. Please call 'fit' with appropriate data.")

        scaled_data = (data - self.mean) / (self.std)
        return scaled_data

    def fit_transform(self, data):
        """
        Fit to data, then transform it.

        Args:
        - data (torch.Tensor): Input data tensor.

        Returns:
        - torch.Tensor: Scaled data tensor.
        """
        self.fit(data)
        return self.transform(data)


## SCAD Model

In [63]:
def scad_penalty(beta_hat, lambda_val, a_val):
    abs_beta_hat = torch.abs(beta_hat)
    is_linear = (abs_beta_hat <= lambda_val)
    is_quadratic = (lambda_val < abs_beta_hat) & (abs_beta_hat <= a_val * lambda_val)
    is_constant = (a_val * lambda_val) < abs_beta_hat

    linear_part = lambda_val * abs_beta_hat * is_linear
    quadratic_part = (a_val * lambda_val * abs_beta_hat - beta_hat**2 - lambda_val**2) / (2 * (a_val - 1)) * is_quadratic
    constant_part = (lambda_val**2 * (a_val + 1)) / 2 * is_constant

    # Print penalty components
    #print("Linear Part:", linear_part)
    #print("Quadratic Part:", quadratic_part)
    #print("Constant Part:", constant_part)

    penalty = linear_part + quadratic_part + constant_part
    return linear_part + quadratic_part + constant_part

In [64]:
class SCAD(nn.Module):
    def __init__(self, input_size, lamb = 1.0, a = 3.7):

        super(SCAD, self).__init__()
        self.input_size = input_size
        self.lamb = lamb
        self.a = a

        # Define the linear regression layer
        self.linear = nn.Linear(input_size, 1, bias = False, device = "cpu", dtype = torch.float64)

    def forward(self, x):
        """
        Forward pass of the SCAD model.

        Args:
            x (Tensor): Input data with shape (batch_size, input_size).

        Returns:
            Tensor: Predicted values with shape (batch_size, 1).

        """
        return self.linear(x)

    def loss(self, y_pred, y_true):
        """
        Compute the SCAD loss function with SCAD penalty.

        Args:
            y_pred (Tensor): Predicted values with shape (batch_size, 1).
            y_true (Tensor): True target values with shape (batch_size, 1).

        Returns:
            Tensor: The SCAD loss.

        """

        mse_loss = nn.functional.mse_loss(y_pred, y_true)
        scad = scad_penalty(self.linear.weight, self.lamb, self.a).sum()

        loss = mse_loss + scad
        return loss

    def fit(self, X, y, num_epochs=100, learning_rate=1e-6):
        """
        Fit the SCAD model to the training data.

        Args:
            X (Tensor): Input data with shape (num_samples, input_size).
            y (Tensor): Target values with shape (num_samples, 1).
            num_epochs (int): Number of training epochs.
            learning_rate (float): Learning rate for optimization.

        """
        optimizer = optim.SGD(self.parameters(), lr=learning_rate)

        for epoch in range(num_epochs):
            optimizer.zero_grad()

            y_pred = self.forward(X)

            loss = self.loss(y_pred, y.reshape(-1, 1))
            loss.backward()

            # Print out gradients
       #     for name, param in self.named_parameters():
       #         if param.grad is not None:
       #             print(f"Parameter: {name}, Mean Gradient: {param.grad.mean()}, Max Gradient: {param.grad.max()}, Min Gradient: {param.grad.min()}")


            optimizer.step()

            if (epoch + 1) % 10 == 0:
                print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item()}")

    def predict(self, X):
        """
        Predict target values for input data.

        Args:
            X (Tensor): Input data with shape (num_samples, input_size).

        Returns:
            Tensor: Predicted values with shape (num_samples, 1).

        """
        self.eval()
        with torch.no_grad():
            y_pred = self.forward(X)
        return y_pred

    def get_coefficients(self):
        """
        Get the coefficients (weights) of the linear regression layer.

        Returns:
            Tensor: Coefficients with shape (output_size, input_size).

        """
        return self.linear.weight.data

## Setup Data

In [65]:
data = pd.read_csv('drive/MyDrive/Adv. Appl. Machine Learning/data/cars.csv')

In [98]:
#get target and features
features = ["CYL", "ENG", "WGT"]

x = data.loc[:,'CYL':'WGT'].values
y = data['MPG'].values

In [99]:
#scale data
from sklearn.preprocessing import StandardScaler, MinMaxScaler

scale = StandardScaler()

xscaled = scale.fit_transform(x)

In [100]:
#put data in torch tensors
x_tensor = torch.tensor(x,device=torch.device("cpu"), dtype=torch.float64)
y_tensor  = torch.tensor(y,device=torch.device("cpu"),dtype=torch.float64)

In [95]:
x.dtype

dtype('float64')

In [96]:
y.shape


(392,)

## Using SCAD Model

In [101]:
# create model for data
model = SCAD(input_size=x.shape[1], lamb=0.5, a = 3.7)
model.double()

SCAD(
  (linear): Linear(in_features=3, out_features=1, bias=False)
)

In [102]:
#fit model on data
model.fit(x_tensor, y_tensor)

Epoch [10/100], Loss: 1.2575668790971418e+29
Epoch [20/100], Loss: 2.1421095449066236e+54
Epoch [30/100], Loss: 3.648818507111473e+79
Epoch [40/100], Loss: 6.215310757330823e+104
Epoch [50/100], Loss: 1.0587012682297839e+130
Epoch [60/100], Loss: 1.8033665879527259e+155
Epoch [70/100], Loss: 3.0718118020034332e+180
Epoch [80/100], Loss: 5.232451244225304e+205
Epoch [90/100], Loss: 8.912833138195066e+230
Epoch [100/100], Loss: 1.518190821882555e+256


In [87]:
model.get_coefficients()[0].cpu().detach().numpy()


array([4.39907321e+122, 1.65495783e+124, 2.39705576e+125])

In [103]:
#get important features

weights = np.abs(model.get_coefficients()[0].cpu().detach().numpy())

significant_feats = np.where(weights > 0)[0]

#Print features that contribute to prediction (weights are non-zero)
for index in significant_feats:
  print(features[index])


CYL
ENG
WGT


All features are involved in the prediction of Gas Mileage for this dataset/model.

# 2

## Imports

In [134]:
import torch # we are going to use pytorch instead of numpy because it's much faster.
import torch.nn as nn
#from ignite.contrib.metrics.regression import R2Score
import torch.optim as optim
import matplotlib.pyplot as plt
%matplotlib inline
%config InlineBackend.figure_format = 'retina'
import seaborn as sns
import numpy as np
import pandas as pd
from sklearn.linear_model import Ridge, Lasso, ElasticNet, LinearRegression
from sklearn.datasets import make_spd_matrix
from scipy.optimize import minimize
from scipy.linalg import toeplitz
from sklearn.model_selection import train_test_split

from sklearn.metrics import mean_absolute_error as mae, mean_squared_error as mse, r2_score as R2


## Define Models

In [155]:
class ElasticNet(nn.Module):
    def __init__(self, input_size, alpha=1.0, l1_ratio=0.5):
        """
        Initialize the ElasticNet regression model.

        Args:
            input_size (int): Number of input features.
            alpha (float): Regularization strength. Higher values of alpha
                emphasize L1 regularization, while lower values emphasize L2 regularization.
            l1_ratio (float): The ratio of L1 regularization to the total
                regularization (L1 + L2). It should be between 0 and 1.

        """
        super(ElasticNet, self).__init__()
        self.input_size = input_size
        self.alpha = alpha
        self.l1_ratio = l1_ratio

        # Define the linear regression layer
        self.linear = nn.Linear(input_size, 1)

    def forward(self, x):
        """
        Forward pass of the ElasticNet model.

        Args:
            x (Tensor): Input data with shape (batch_size, input_size).

        Returns:
            Tensor: Predicted values with shape (batch_size, 1).

        """
        return self.linear(x)

    def loss(self, y_pred, y_true):
        """
        Compute the ElasticNet loss function.

        Args:
            y_pred (Tensor): Predicted values with shape (batch_size, 1).
            y_true (Tensor): True target values with shape (batch_size, 1).

        Returns:
            Tensor: The ElasticNet loss.

        """
        mse_loss = nn.MSELoss()(y_pred, y_true)
        l1_reg = torch.norm(self.linear.weight, p=1)
        l2_reg = torch.norm(self.linear.weight, p=2)

        loss = mse_loss + self.alpha * (
            self.l1_ratio * l1_reg + (1 - self.l1_ratio) * l2_reg
        )

        return loss

    def fit(self, X, y, num_epochs = 100, learning_rate=0.01):
        """
        Fit the ElasticNet model to the training data.

        Args:
            X (Tensor): Input data with shape (num_samples, input_size).
            y (Tensor): Target values with shape (num_samples, 1).
            num_epochs (int): Number of training epochs.
            learning_rate (float): Learning rate for optimization.

        """
        optimizer = optim.SGD(self.parameters(), lr=learning_rate)

        for epoch in range(num_epochs):
            self.train()
            optimizer.zero_grad()
            y_pred = self(X)
            loss = self.loss(y_pred, y)
            loss.backward()
            optimizer.step()

          #  if (epoch + 1) % 10 == 0:
          #      print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item()}")

    def predict(self, X):
        """
        Predict target values for input data.

        Args:
            X (Tensor): Input data with shape (num_samples, input_size).

        Returns:
            Tensor: Predicted values with shape (num_samples, 1).

        """
        self.eval()
        with torch.no_grad():
            y_pred = self(X)
        return y_pred
    def get_coefficients(self):
        """
        Get the coefficients (weights) of the linear regression layer.

        Returns:
            Tensor: Coefficients with shape (output_size, input_size).

        """
        return self.linear.weight


In [230]:
class sqrtLasso(nn.Module):
    def __init__(self, input_size, alpha=0.1):
        """
        Initialize the  regression model.
        """
        super(sqrtLasso, self).__init__()
        self.input_size = input_size
        self.alpha = alpha


        # Define the linear regression layer
        self.linear = nn.Linear(input_size, 1).float()

    def forward(self, x):
        """
        Forward pass of the model.

        Args:
            x (Tensor): Input data with shape (batch_size, input_size).

        Returns:
            Tensor: Predicted values with shape (batch_size, 1).

        """
        return self.linear(x)

    def loss(self, y_pred, y_true):
        """
        Compute the loss function.

        Args:
            y_pred (Tensor): Predicted values with shape (batch_size, 1).
            y_true (Tensor): True target values with shape (batch_size, 1).

        Returns:
            Tensor: The loss.

        """
        mse_loss = nn.MSELoss()(y_pred, y_true)
        l1_reg = torch.norm(self.linear.weight, p=1,dtype=torch.float32)
        # l2_reg = torch.norm(self.linear.weight, p=2,dtype=torch.float32)

        loss = (len(y_true)*mse_loss)**(1/2) + self.alpha * (l1_reg)

        return loss

    def fit(self, X, y, num_epochs = 100, learning_rate=0.01):
        """
        Fit the model to the training data.

        Args:
            X (Tensor): Input data with shape (num_samples, input_size).
            y (Tensor): Target values with shape (num_samples, 1).
            num_epochs (int): Number of training epochs.
            learning_rate (float): Learning rate for optimization.

        """
        optimizer = optim.Adam(self.parameters(), lr=learning_rate)

        for epoch in range(num_epochs):
            self.train()
            optimizer.zero_grad()
            y_pred = self(X)
            loss = self.loss(y_pred, y)
            loss.backward()
            optimizer.step()

            #if (epoch + 1) % 100 == 0:
            #    print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item()}")

    def predict(self, X):
        """
        Predict target values for input data.

        Args:
            X (Tensor): Input data with shape (num_samples, input_size).

        Returns:
            Tensor: Predicted values with shape (num_samples, 1).

        """
        self.eval()
        with torch.no_grad():
            y_pred = self(X)
        return y_pred
    def get_coefficients(self):
        """
        Get the coefficients (weights) of the linear regression layer.

        Returns:
            Tensor: Coefficients with shape (output_size, input_size).

        """
        return self.linear.weight

In [231]:
def scad_penalty(beta_hat, lambda_val, a_val):
    abs_beta_hat = torch.abs(beta_hat)
    is_linear = (abs_beta_hat <= lambda_val)
    is_quadratic = (lambda_val < abs_beta_hat) & (abs_beta_hat <= a_val * lambda_val)
    is_constant = (a_val * lambda_val) < abs_beta_hat

    linear_part = lambda_val * abs_beta_hat * is_linear
    quadratic_part = (a_val * lambda_val * abs_beta_hat - beta_hat**2 - lambda_val**2) / (2 * (a_val - 1)) * is_quadratic
    constant_part = (lambda_val**2 * (a_val + 1)) / 2 * is_constant

    # Print penalty components
    #print("Linear Part:", linear_part)
    #print("Quadratic Part:", quadratic_part)
    #print("Constant Part:", constant_part)

    penalty = linear_part + quadratic_part + constant_part
    return linear_part + quadratic_part + constant_part


class SCAD(nn.Module):
    def __init__(self, input_size, lamb = 1.0, a = 3.7):

        super(SCAD, self).__init__()
        self.input_size = input_size
        self.lamb = lamb
        self.a = a

        # Define the linear regression layer
        self.linear = nn.Linear(input_size, 1, bias = False, device = "cpu", dtype = torch.float32)

    def forward(self, x):
        """
        Forward pass of the SCAD model.

        Args:
            x (Tensor): Input data with shape (batch_size, input_size).

        Returns:
            Tensor: Predicted values with shape (batch_size, 1).

        """
        return self.linear(x)

    def loss(self, y_pred, y_true):
        """
        Compute the SCAD loss function with SCAD penalty.

        Args:
            y_pred (Tensor): Predicted values with shape (batch_size, 1).
            y_true (Tensor): True target values with shape (batch_size, 1).

        Returns:
            Tensor: The SCAD loss.

        """

        mse_loss = nn.functional.mse_loss(y_pred, y_true)
        scad = scad_penalty(self.linear.weight, self.lamb, self.a).sum()

        loss = mse_loss + scad
        return loss

    def fit(self, X, y, num_epochs=100, learning_rate=0.01):
        """
        Fit the SCAD model to the training data.

        Args:
            X (Tensor): Input data with shape (num_samples, input_size).
            y (Tensor): Target values with shape (num_samples, 1).
            num_epochs (int): Number of training epochs.
            learning_rate (float): Learning rate for optimization.

        """
        optimizer = optim.SGD(self.parameters(), lr=learning_rate)

        for epoch in range(num_epochs):
            optimizer.zero_grad()

            y_pred = self.forward(X)

            loss = self.loss(y_pred, y.reshape(-1, 1))
            loss.backward()

            # Print out gradients
       #     for name, param in self.named_parameters():
       #         if param.grad is not None:
       #             print(f"Parameter: {name}, Mean Gradient: {param.grad.mean()}, Max Gradient: {param.grad.max()}, Min Gradient: {param.grad.min()}")


            optimizer.step()

         #   if (epoch + 1) % 10 == 0:
         #       print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item()}")

    def predict(self, X):
        """
        Predict target values for input data.

        Args:
            X (Tensor): Input data with shape (num_samples, input_size).

        Returns:
            Tensor: Predicted values with shape (num_samples, 1).

        """
        self.eval()
        with torch.no_grad():
            y_pred = self.forward(X)
        return y_pred

    def get_coefficients(self):
        """
        Get the coefficients (weights) of the linear regression layer.

        Returns:
            Tensor: Coefficients with shape (output_size, input_size).

        """
        return self.linear.weight.data

## Generate datasets

In [233]:
def make_datasets(num_samples = 100, num_feats = 8, rho = 0.9):
  vcor = []
  for i in range(num_feats):
    vcor.append(rho**i)
  r = toeplitz(vcor)
  mu = np.repeat(0, num_feats)

  # create x
  x = np.random.multivariate_normal(mu, r, size=num_samples)

  # create y

  betastar = np.zeros(num_feats)
  betastar = np.random.uniform(1, 5, num_feats)

  noise = 1.5 * np.random.normal(0, 0.4, num_samples)
  y = x.dot(betastar) + noise

  # return x and y
  return x, y


## Run tests

In [234]:
# fit/train/test model on data

def eval_model(model, x_train, x_test, y_train, y_test):
  x_train = torch.tensor(x_train, dtype = torch.float32)
  #print(x_train.size())

  x_test = torch.tensor(x_test, dtype = torch.float32)
  #print(x_test.size())

  y_train = torch.tensor(y_train, dtype = torch.float32)
  y_train =  y_train.unsqueeze(1)
  #print(y_train.size())

  y_test = torch.tensor(y_test, dtype = torch.float32)
  y_test =  y_test.unsqueeze(1)
  #print(y_test.size())

  # fit model on training data
  model.fit(x_train, y_train, num_epochs = 100)

  # grade on test data and note score
  y_pred = model.predict(x_test)
  mse = torch.nn.functional.mse_loss(y_pred, y_test).item()

  return mse

In [237]:
# params for datasets
num_samples = 100
num_features = 10
num_datasets = 500
rho = 0.9   # correlation strength

elasticScores = []
sqrtLassoScores = []
SCADScores = []

for set in range(num_datasets):
  x, y = make_datasets(num_samples, num_features, rho)
  scale = StandardScaler()
  x = scale.fit_transform(x)

  x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.3, random_state = 442)

  # ElasticNet
  elastic = ElasticNet(num_features)
  elasticScores.append(eval_model(elastic, x_train, x_test, y_train, y_test))

  # sqrtLasso
  sqrt = sqrtLasso(num_features)
  sqrtLassoScores.append(eval_model(sqrt, x_train, x_test, y_train, y_test))

  # SCAD
  scad = SCAD(num_features)
  SCADScores.append(eval_model(scad, x_train, x_test, y_train, y_test))

print("Avg MSE for ElasticNet: ", np.mean(elasticScores))
print("Avg MSE for sqrtLasso: ", np.mean(sqrtLassoScores))
print("Avg MSE for SCAD: ", np.mean(SCADScores))

Avg MSE for ElasticNet:  1.5729286212921143
Avg MSE for sqrtLasso:  310.97240816497805
Avg MSE for SCAD:  8.234107576847077


The best performing model is **ElasticNet**.