In [None]:
%matplotlib inline  

Load data

In [1]:
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.datasets import load_boston
from sklearn.model_selection import train_test_split
data = load_boston()
x = MinMaxScaler().fit_transform(data.data)
y = MinMaxScaler().fit_transform(data.target.reshape(-1, 1))
train_x, test_x, train_y, test_y = train_test_split(x, y, test_size=0.2, random_state=0)

Train PLNN

In [3]:
import torch
import torch.nn as nn
import numpy as np
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
from typing import List, Tuple


class ModelDataset(Dataset):

    def __init__(self, data: np.ndarray, labels: np.ndarray):
        """Constructor to initialize data and class labels.
        Parameters
        ----------
        data : np.ndarray
            A numpy ndarray of shape (n_samples,n_features)
        labels : np.ndarray
            A numpy ndarray of shape (n_classes)
        """
        super(ModelDataset, self).__init__()
        self.data = data
        self.labels = labels

    def __getitem__(self, idx) -> torch.tensor:
        """Support the indexing such that data[idx], label[idx] can be used to get ith sample
        Parameters
        ----------
        idx :
            Indices/Keys to get the ith sample of data and labels
        Returns
        -------
        torch.tensor
            data: torch.Tensor of size (batch_size,n_features)
            labels: torch.Tensor of size (batch_size)
        """
        return self.data[idx], self.labels[idx]

    def __len__(self) -> int:
        """To return the size of the dataset using len(data)
        Returns
        -------
        int
            size of dataset
        """
        return len(self.data)

    def get_dataloader(self, batch_size: int, num_workers: int = 0, shuffle: bool = False, batch_first: bool = True, pin_memory: bool = False) -> object:
        """Initializes the  DataLoader class. It combines a dataset and a sampler, and provides an iterable over the given dataset.
        Parameters
        ----------
        batch_size : int
            how many samples per batch to load
        num_workers : int, optional
            how many subprocesses to use for data loading, by default 0
        shuffle : bool, optional
            set to True to have the data reshuffled at every epoch, by default False
        batch_first : bool, optional
            Samples the data such that batch_size should be first dimension, by default True
        pin_memory : bool, optional
            If True, the data loader will copy Tensors into CUDA pinned memory before returning them, by default False
        Returns
        -------
        object
            A DataLoader object to generate batch sized data
        """
        batch_obj = DataLoader(self, batch_size=batch_size, shuffle=shuffle, num_workers=num_workers, pin_memory=pin_memory)
        return batch_obj


class ReLU_DNN(nn.Module):

    def __init__(self, net_size: List[Tuple]):
        """Initializes a Neural Network with ReLU activation (self.net).
        Parameters
        ----------
        net_size : list[tuple], len(list) is n_layers
            Layer sizes (tuple) (n_neurons[n_layer], n_neurons[n_layer+1])
        """
        super(ReLU_DNN, self).__init__()
        self.layers = []
        for size in net_size[:-1]:
            self.layers.append(nn.Linear(size[0], size[1]))
            self.layers.append(nn.ReLU())
        self.layers.append(nn.Linear(net_size[-1][0], net_size[-1][1]))
        self.net = nn.Sequential(*self.layers)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """Performs forward pass through the Neural Network
        Parameters
        ----------
        x : torch.Tensor
            Input data of size (batch_size, n_features)
        Returns
        -------
        torch.Tensor
            Output from the last layer of the Neural Network.
        """
        final_output = self.net(x)
        return final_output


class Trainer():

    def __init__(self, hidden_layer_sizes: list, epochs: int, learning_rate: float, device: str, loss_fn: torch.nn.modules.loss, l1_reg: float, l2_reg: float, batch_size: int):
        """Constructor for Trainer object.
        Parameters
        ----------
        hidden_layer_sizes : list
            A list of hidden layer sizes
        epochs : int
            Number of training epochs
        learning_rate : float
            learning rate for model training
        device : str
            Computational device: cuda or cpu
        loss_fn : torch.nn.modules.loss
            Loss function used by optimizer
            MSELoss() for regression problem
            BCEWithLogitsLoss() for classification problem
        l1_reg : float
            lambda parameter for L1 Regularization
        l2_reg : float
            lambda parameter for L2 Regularization
        batch_size : int
            Batch size for training
        """
        self.hidden_layer_sizes = hidden_layer_sizes
        self.epochs = epochs
        self.lr = learning_rate
        self.loss_fn = loss_fn
        self.l1_lambda = l1_reg
        self.l2_lambda = l2_reg
        self.batch_size = batch_size
        self.device = device

    def validate_input(self, labels: np.ndarray) -> np.ndarray:
        """Reshapes the labels ndarray to (batch_size, 1) and converts entries to float data type
        Parameters
        ----------
        labels : np.ndarray
            Data labels of shape (n_classes)
        Returns
        -------
        np.ndarray
            numpy ndarray with (batch_size, 1) shape
        """
        if labels.ndim != 2:
            labels = np.reshape(labels, (-1, 1))
        return labels.astype(float)

    def get_data_loader(self, x: np.ndarray, y: np.ndarray, shuffle: bool = False) -> object:
        """Initializes the dataset (data, labels) and return a batch sized data loader.
        Parameters
        ----------
        x : np.ndarray
            Data features of shape (n_samples, n_features)
        y : np.ndarray
            Data labels of shape (n_classes)
        shuffle : bool
            set to True to have the data reshuffled at every epoch, by default False
        Returns
        -------
        object
            An iterator object to iterate over the data.
        """
        dataobj = ModelDataset(x, y)
        data_loader = dataobj.get_dataloader(batch_size=self.batch_size, shuffle=shuffle)
        return data_loader
    
    def init_weights(self,layer):
        """Performs weight initialization for the network using Kaiming Normal initializer
        Parameters
        ----------
        layer : model layer
            Perform initialization over model weights with Kaiming Normal initializer
        """
        if type(layer) == nn.Linear:
            nn.init.kaiming_normal_(layer.weight, mode='fan_in', nonlinearity='relu')  
            nn.init.zeros_(layer.bias)

    def build_model(self, data_x: np.ndarray):
        """Builds a ReLU DNN model
        Parameters
        ----------
        data_x : np.ndarray
            Data features of shape (n_samples, n_features)
        """
        input_size = data_x.shape[1]
        output_size = 1
        hidden_list = [input_size] + self.hidden_layer_sizes + [output_size]
        hidden_tuples = [(hidden_list[i], hidden_list[i + 1]) for i in range(len(hidden_list) - 1)]
        model = ReLU_DNN(hidden_tuples)
        self.model = model.net.double()
        self.model.apply(self.init_weights)
        if torch.cuda.is_available():
            self.model = self.model.to(device=self.device)

    def get_params(self):
        """Retrieves the parameters i.e, weights (List[torch.Tensor]) and biases (List[torch.Tensor]) of the model
        Attributes
        ----------
        weights : list of shape (n_layers - 1,).
            The ith element in the list represents the weight matrix corresponding to layer i.
        biases : list of shape (n_layers - 1,).
            The ith element in the list represents the bias vector corresponding to layer i + 1.
        """
        self.weights = []
        self.biases = []
        for name, params in self.model.named_parameters():
            if 'weight' in name:
                self.weights.append(torch.transpose(params, 0, 1).detach())
            elif 'bias' in name:
                self.biases.append(params.detach())

    def calc_l1reg_loss(self, loss: torch.Tensor) -> torch.Tensor:
        """Calculates L1 Regularized loss
        Parameters
        ----------
        loss : torch.Tensor
            A tensor containing loss value

        Returns
        -------
        torch.Tensor
            A tensor containing L1 regularized loss value
        """
        l1_reg = torch.tensor(0.0, dtype=torch.double, device=self.device)
        for name,params in self.model.named_parameters():
            if "weight" in name:
                l1_reg += torch.norm(params, 1)
        loss = loss + self.l1_lambda * l1_reg
        return loss
    
    def calc_l2reg_loss(self, loss: torch.Tensor) -> torch.Tensor:
        """Calculates L2 Regularized loss
        Parameters
        ----------
        loss : torch.Tensor
            A tensor containing L2 regularized loss value
        Returns
        -------
        torch.Tensor
            A tensor containing L2 regularized loss value
        """
        l2_reg = torch.tensor(0.0, dtype=torch.double, device=self.device)
        for name,params in self.model.named_parameters():
            if "weight" in name:
                l2_reg += torch.norm(params, 2)**2
        loss = loss + self.l2_lambda * l2_reg
        return loss   

    def train(self, data_loader: object):
        """Trains a ReLU DNN model for a given no of epochs.
        Parameters
        ----------
        data_loader : object
            An iterator object to iterate over the dataset (batch_size).
        """
        self.model.train()
        self.loss = []
        optimizer = torch.optim.Adam(self.model.parameters(), lr=self.lr)
        for epochs in range(self.epochs):
            epoch_loss = 0
            for batch_no, train_batch in enumerate(data_loader):
                data, labels = train_batch[0].to(device=self.device).double(), train_batch[1].to(device=self.device).double()
                prediction = self.model(data)
                loss = self.loss_fn(prediction, labels)
                if self.l1_lambda > 0.0:
                    loss = self.calc_l1reg_loss(loss)
                if self.l2_lambda > 0.0:
                    loss = self.calc_l2reg_loss(loss)
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
                epoch_loss += loss.item()
            self.loss.append(epoch_loss)
        self.get_params()


class ReluNetRegressor(Trainer):

    def __init__(self, hidden_layer_sizes: list, epochs: int, device: str, learning_rate: float = 0.01, l1_reg: float = 0, l2_reg: float = 0, batch_size: int = 100):
        """Regressor object inherits Trainer object.
           This function is the constructor for both Trainer and Regressor objects.
        Parameters
        ----------
        hidden_layer_sizes : list
            A list of hidden layer sizes
        epochs : int
            Number of training epochs
        device : str
            Computational device: cuda or cpu
        learning_rate : float, optional
            learning rate for model training, by default 0.01
        l1_reg : float, optional
            lambda parameter for L1 Regularization, by default 0
        l2_reg : float, optional
            lambda parameter for L2 Regularization, by default 0
        batch_size : int, optional
            Batch size for training, by default 100
        """
        super(ReluNetRegressor, self).__init__(hidden_layer_sizes=hidden_layer_sizes, epochs=epochs, learning_rate=learning_rate, device=device, loss_fn=nn.MSELoss(), l1_reg=l1_reg, l2_reg=l2_reg, batch_size=batch_size)

    def fit(self, data_x: np.ndarray, labels: np.ndarray):
        """Builds a ReLU DNN model for given data and calls train function to train the model
        Parameters
        ----------
        data_x : np.ndarray
            Training data features of shape (n_samples, n_features)
        labels : np.ndarray
            Training data labels of shape (n_classes)
        """
        labels = self.validate_input(labels)
        self.build_model(data_x)
        data_loader = self.get_data_loader(x=data_x, y=labels, shuffle=True)
        self.train(data_loader)

    def perform_eval(self, data_x: np.ndarray, labels: np.ndarray) -> float:
        """Performs evaluation on the test data using trained model and returns r2 score of the model.
        Parameters
        ----------
        data_x : np.ndarray
            Test data features of shape (n_samples, n_features)
        labels : np.ndarray
            Test data labels of shape (n_classes)
        Returns
        -------
        float
            r2 score of the model on test data
        """
        self.model.eval()
        self.batch_size = data_x.shape[0]
        labels = self.validate_input(labels)
        data_loader = self.get_data_loader(x=data_x, y=labels, shuffle=False)
        for batch_no, val_batch in enumerate(data_loader):
            data, labels = val_batch[0].to(device=self.device).double(), val_batch[1].to(device=self.device).double()
            with torch.no_grad():
                prediction = self.model(data)
        error = torch.sum(torch.pow(prediction - labels, 2)).item()
        y_sum = torch.sum(torch.pow(labels - torch.mean(labels), 2)).item()
        r2_score = 1 - error / (y_sum)
        return r2_score

    def predict(self, data_x: np.ndarray) -> np.ndarray:
        """Returns numpy array of predicted values for test data
        Parameters
        ----------
        data_x : np.ndarray
            Test data features of shape (n_samples, n_features)
        Returns
        -------
        np.ndarray
            numpy array of predicted values
        """
        self.model.eval()
        data = torch.from_numpy(data_x).to(device=self.device)
        with torch.no_grad():
            prediction = self.model(data.double())
        return prediction.cpu().numpy()


class ReluNetClassifier(Trainer):

    def __init__(self, hidden_layer_sizes: list, epochs: int, device: str, learning_rate: float = 0.01, l1_reg: float = 0, l2_reg: float = 0, batch_size: int = 100):
        """Classifier object inherits Trainer object.
           This function is the constructor for both Trainer and Classifier objects.
        Parameters
        ----------
        hidden_layer_sizes : list
            A list of hidden layer sizes
        epochs : int
            Number of training epochs
        device : str
            Computational device: cuda or cpu
        learning_rate : float, optional
            learning rate for model training, by default 0.01
        l1_reg : float, optional
            lambda parameter for L1 Regularization, by default 0
        l2_reg : float, optional
            lambda parameter for L2 Regularization, by default 0
        batch_size : int, optional
            Batch size for training, by default 100
        """
        super(ReluNetClassifier, self).__init__(hidden_layer_sizes=hidden_layer_sizes, epochs=epochs, learning_rate=learning_rate, device=device, loss_fn=nn.BCEWithLogitsLoss(), l1_reg=l1_reg, l2_reg=l2_reg, batch_size=batch_size)

    def fit(self, data_x: np.ndarray, labels: np.ndarray):
        """Builds a ReLU DNN model for given data and calls train function to train the model
        Parameters
        ----------
        data_x : np.ndarray
            Training data features of shape (n_samples, n_features)
        labels : np.ndarray
            Training data labels of shape (n_classes)
        """
        labels = self.validate_input(labels)
        self.build_model(data_x)
        data_loader = self.get_data_loader(x=data_x, y=labels, shuffle=True)
        self.train(data_loader)

    def perform_eval(self, data_x: np.ndarray, labels: np.ndarray) -> float:
        """Performs evaluation on the test data using trained model and returns accuracy of the model.
        Parameters
        ----------
        data_x : np.ndarray
            Test data features of shape (n_samples, n_features)
        labels : np.ndarray
            Test data labels of shape (n_classes)
        Returns
        -------
        float
            Accuracy of the model on test data
        """
        self.model.eval()
        self.batch_size = data_x.shape[0]
        labels = self.validate_input(labels)
        data_loader = self.get_data_loader(x=data_x, y=labels, shuffle=False)
        for batch_no, val_batch in enumerate(data_loader):
            data, labels = val_batch[0].to(device=self.device).double(), val_batch[1].to(device=self.device).double()
            with torch.no_grad():
                prediction = self.model(data)
                pred_labels = torch.round(torch.sigmoid(prediction))
                correct_results_sum = (pred_labels == labels).sum().float()
                acc = correct_results_sum / labels.shape[0]
                acc = torch.round(acc * 100)
        return acc.item()

    def predict_proba(self, data_x: np.ndarray) -> np.ndarray:
        """Returns numpy array of predicted probabilities of being classified as class 1 on test data
        Parameters
        ----------
        data_x : np.ndarray
            Test data features of shape (n_samples, n_features)
        Returns
        -------
        np.ndarray
            numpy array of predicted probabilities
        """
        self.model.eval()
        data = torch.from_numpy(data_x).to(device=self.device)
        with torch.no_grad():
            prediction = self.model(data.double())
            proba = torch.sigmoid(prediction)
        return proba.cpu().numpy()

In [10]:
#Reproducibility seed
def set_seed(seed):
    if torch.cuda.is_available():
        torch.backends.cudnn.benchmark = False
        torch.backends.cudnn.deterministic = True
        torch.cuda.manual_seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)

In [13]:
import aletheia
best_reg = 0.001
set_seed(0)
mlp = ReluNetRegressor(hidden_layer_sizes=[40] * 4, epochs=2000, device="cpu",
                       learning_rate=0.001, l1_reg=best_reg, batch_size=100)
mlp.fit(train_x, train_y)
coefs = [item.numpy() for item in mlp.weights]
intercepts = [item.numpy() for item in mlp.biases]
clf = aletheia.UnwrapperRegressor(coefs, intercepts)
clf.fit(train_x, train_y)
clf.summary()

Unnamed: 0,Count,Response Mean,Response Std,Local MSE,Global MSE
0,136.0,0.278137,0.082941,0.002789,0.028025
1,98.0,0.598912,0.178224,0.006719,0.091237
2,77.0,0.415382,0.101684,0.003404,0.043052
3,35.0,0.149206,0.076133,0.003979,0.044379
4,31.0,0.408244,0.171462,0.008261,0.049419
5,12.0,0.363333,0.216921,0.013247,0.068777
6,8.0,0.829722,0.162723,0.002904,0.045385
7,7.0,0.105079,0.092839,0.008752,0.109257
