In [1]:
import random
import os
import shutil
import numpy as np
from sklearn import datasets
from sklearn.model_selection import train_test_split

import torch
from torch import nn, Tensor
from torch.utils.data import Dataset, DataLoader, Subset
from torch.utils.tensorboard import SummaryWriter  # log writer to visualize the loss functions




---

In [2]:
base_dir = "NN_outputs/"
model_dir = base_dir + "models/"
runs_dir = base_dir + "runs/"

shutil.rmtree(model_dir, ignore_errors=True)
shutil.rmtree(runs_dir, ignore_errors=True)
os.makedirs(model_dir, exist_ok=True)
os.makedirs(runs_dir, exist_ok=True)

#### Set seeds

In [3]:
# For reproducibility, fix all the seeds


def fix_random(seed: int) -> None:
    """Fix all the possible sources of randomness.

    Args:
        seed: the seed to use.
    """
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)

    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True  # slower

#### Data Layer

In [4]:
# Data Layer class
# Extend the abstract class "Dataset"


class MyDataset(Dataset):
    # Save X and y as Tensors, accordingly to the type of the data
    # https://pytorch.org/docs/stable/tensors.html
    def __init__(self, X, y):
        self.X = torch.FloatTensor(X)
        self.y = torch.LongTensor(y)

        # Useful attributes
        self.num_features = X.shape[1]
        self.num_classes = len(np.unique(y))

    # Dataset size
    def __len__(self):
        return self.X.shape[0]

    # Fetch a data sample (single sample or batch) for a given index/es
    # (if the dataset is not in memory, it can read from file system and return the object)
    def __getitem__(self, idx):
        return self.X[idx, :], self.y[idx]

#### Neural Network class

In [5]:
# Neural Network class
# Extend the abstract class "Module"


class FeedForward(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super(FeedForward, self).__init__()

        # Useful attributes
        self.input_size = input_size
        self.hidden_size = hidden_size

        # Definition of Layers
        self.fc1 = nn.Linear(self.input_size, self.hidden_size)  # input to hidden
        self.fc2 = nn.Linear(self.hidden_size, num_classes)  # hidden to output

        # Activation Function
        self.relu = nn.ReLU()

    # How layers are connected between them
    # This even defines the graph of backpropagation
    def forward(self, x):
        h = self.fc1(x)  # first layer
        h = self.relu(h)  # activation function
        output = self.fc2(h)  # second layer
        return output

    def _get_name(self):
        return "FeedForward"

#### Training function

In [6]:
# Function for the training process


def train_model(
    model: nn.Module,  # instance of class to train
    criterion,  # instance of loss function
    optimizer,  # instance of optimizer
    epochs,  # number of
    train_loader: DataLoader,
    val_loader: DataLoader,
    device,  # to train on
    log_writer,
    log_name,
):
    n_iter = 0
    best_valid_loss = float("inf")  # initialized to worst possible value

    # EPOCHS
    for epoch in range(epochs):
        model.train()  # activate training mode (for BatchNorm or Dropout)

        # BATCHES
        for data, targets in train_loader:  # get_item from MyDataset class (single item or batch)
            data, targets = data.to(device), targets.to(device)  # move data and targets to cpu/gpu

            optimizer.zero_grad()  # gradient to zero

            # Forward pass
            y_pred = model(data)

            # Compute Loss
            loss = criterion(y_pred, targets)
            log_writer.add_scalar("Loss/train", loss, n_iter)  # plot the batches

            # Backward pass
            loss.backward()
            optimizer.step()

            n_iter += 1

        # Valuation
        y_test, _, y_pred = test_model(model, val_loader, device)
        loss_val = criterion(y_pred, y_test)
        log_writer.add_scalar("Loss/val", loss_val, epoch)  # plot the epochs

        # Save the model with best loss through the epochs
        if loss_val.item() < best_valid_loss:
            best_valid_loss = loss_val.item()
            torch.save(model.state_dict(), model_dir + log_name)

#### Test function

In [7]:
# Function to evaluate the performance on Validation and Test sets


def test_model(model: nn.Module, data_loader: DataLoader, device) -> tuple[Tensor, Tensor, Tensor]:
    """return:
    - y_test - true lables
    - y_pred_c - has 1 column, where each element is the predicted lable with bigger probability among the "c" predicted
    - y_pred - has "c" columns as the number of classes of the test set
    """
    model.eval()  # activate evaluation mode (for BatchNorm or Dropout)

    y_pred = []
    y_test = []

    for data, targets in data_loader:
        data, targets = data.to(device), targets.to(device)  # move data and targets to cpu/gpu
        y_pred.append(model(data))  # accumulate predictions
        y_test.append(targets)  # accumulate labels

    y_test = torch.stack(y_test).squeeze()  # it's one column (each row is a different sample)
    y_pred = torch.stack(
        y_pred
    ).squeeze()  # there are "c" columns as the number of classes. Each column is the probability (as float number) to that class (each row is a different sample)
    y_pred_c = y_pred.argmax(
        dim=1, keepdim=True
    ).squeeze()  # return max position of prediction array: that is the class I will associate with the sample
    return y_test, y_pred_c, y_pred

#### Device

In [8]:
# look for GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device: {}".format(device))

Device: cpu


---
## Train

#### Hyperparameters

In [9]:
seed = 42

num_epochs = 500
learning_rate = 0.01
batch_size = 32  # the SIZE of one batch, not the total number of batches

#### DataLoaders preparation

In [10]:
# Prepare train/val/test DataLoaders

# Load
data = datasets.load_iris()
X = data["data"]
y = data["target"]
indices = np.arange(X.shape[0])  # useful later to split the data in train/val/test


# Separate indices in train/val/set
# "stratify=y" makes sure to keep the classes proportions on the dataset (useful on imbalanced classes)
train_idx, test_idx = train_test_split(indices, test_size=0.2, stratify=y, random_state=seed)
train_idx, val_idx = train_test_split(train_idx, test_size=0.2, stratify=y[train_idx], random_state=seed)


# Scale data
train_mean = np.mean(X[train_idx, :], axis=0)
train_std = np.std(X[train_idx, :], axis=0)  # use only train
X = (X - train_mean) / train_std  # but apply to all dataset


# DataLoaders
my_dataset = MyDataset(X, y)

train_subset = Subset(my_dataset, train_idx)
train_loader = DataLoader(train_subset, batch_size=batch_size, shuffle=True)

val_subset = Subset(my_dataset, val_idx)
val_loader = DataLoader(val_subset, batch_size=1)

test_subset = Subset(my_dataset, test_idx)
test_loader = DataLoader(test_subset, batch_size=1)

#### Model, Criterion, Optimizer

In [11]:
fix_random(seed)

hidden_size = 32
model = FeedForward(my_dataset.num_features, hidden_size, my_dataset.num_classes)
model.to(device)  # move the NN to device

criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)

log_writer = SummaryWriter(runs_dir + model._get_name())  # Start tensorboard

### Run
Run Tensorboard from the command line:

> tensorboard --logdir nn/runs/

In [12]:
# Test before the training
y_test, y_pred_c, _ = test_model(model, test_loader, device)
acc = (y_test == y_pred_c).float().sum() / y_test.shape[0]
print("Accuracy before training:", acc.cpu().numpy())


# Train
train_model(
    model,
    criterion,
    optimizer,
    num_epochs,
    train_loader,
    val_loader,
    device,
    log_writer,
    model._get_name(),
)


# Load best model
model.load_state_dict(torch.load(model_dir + model._get_name()))
model.to(device)


# Test after the training
y_test, y_pred_c, _ = test_model(model, test_loader, device)
acc = (y_test == y_pred_c).float().sum() / y_test.shape[0]
print("Accuracy after training:", acc.cpu().numpy())

print(model)


# Close tensorboard writer after a training
log_writer.flush()
log_writer.close()

Accuracy before training: 0.46666667
Accuracy after training: 0.93333334
FeedForward(
  (fc1): Linear(in_features=4, out_features=32, bias=True)
  (fc2): Linear(in_features=32, out_features=3, bias=True)
  (relu): ReLU()
)


---
---
---
---

# Other Example

In [13]:
class FeedForwardDeep(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes, depth=1):
        super(FeedForwardDeep, self).__init__()

        model = [
            nn.Linear(input_size, hidden_size),
            nn.BatchNorm1d(hidden_size),
            nn.ReLU(),
        ]

        # Set of pytorch modules
        block = [
            nn.Linear(hidden_size, hidden_size),
            nn.BatchNorm1d(hidden_size),  # BatchNorm 1 dimention
            nn.ReLU(),
        ]

        for i in range(depth):
            model += block

        # Create sequential graph
        self.model = nn.Sequential(*model)

        # Output layer
        self.output = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        h = self.model(x)
        out = self.output(h)
        return out

    def _get_name(self):
        return "FeedForwardDeep"

In [14]:
seed = 42
fix_random(seed)


model = FeedForwardDeep(my_dataset.num_features, hidden_size, my_dataset.num_classes, depth=2)
model.to(device)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)

# Start tensorboard
log_writer = SummaryWriter(runs_dir + model._get_name())

In [15]:
# Test before the training
y_test, y_pred_c, _ = test_model(model, test_loader, device)
acc = (y_test == y_pred_c).float().sum() / y_test.shape[0]
print("Accuracy before training:", acc.cpu().numpy())


# Train the model
train_model(
    model,
    criterion,
    optimizer,
    num_epochs,
    train_loader,
    val_loader,
    device,
    log_writer,
    model._get_name(),
)


# Load best model
model.load_state_dict(torch.load(model_dir + model._get_name()))
model.to(device)


# Test after the training
y_test, y_pred_c, _ = test_model(model, test_loader, device)
acc = (y_test == y_pred_c).float().sum() / y_test.shape[0]
print("Accuracy after training:", acc.cpu().numpy())
# from sklearn.metrics import classification_report
# print(classification_report(y_test.cpu(), y_pred_c.cpu()))
print(model)


# Close tensorboard writer after a training
log_writer.flush()
log_writer.close()

Accuracy before training: 0.33333334
Accuracy after training: 1.0
FeedForwardDeep(
  (model): Sequential(
    (0): Linear(in_features=4, out_features=32, bias=True)
    (1): BatchNorm1d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): Linear(in_features=32, out_features=32, bias=True)
    (4): BatchNorm1d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): ReLU()
    (6): Linear(in_features=32, out_features=32, bias=True)
    (7): BatchNorm1d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (8): ReLU()
  )
  (output): Linear(in_features=32, out_features=3, bias=True)
)


---
---
---
# GridSearch over hyperparameters

In [16]:
import itertools

hidden_sizes = [16, 32]
depths = [2, 4]
num_epochs = 1000
learning_rate = 0.01

hyperparameters = itertools.product(hidden_sizes, depths)

# grid search loop
for hidden_size, depth in hyperparameters:
    fix_random(seed)

    # Define architecture, loss and optimizer
    model = FeedForwardDeep(my_dataset.num_features, hidden_size, my_dataset.num_classes, depth)
    model.to(device)
    criterion = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)

    log_name = (
        model._get_name()
        + "_"
        + "dim"
        + str(hidden_size)
        + "-dp"
        + str(depth)
        + "-ep"
        + str(num_epochs)
        + "-lr"
        + str(learning_rate)
    )

    # Start tensorboard
    log_writer = SummaryWriter(runs_dir + log_name)

    # Train
    train_model(
        model,
        criterion,
        optimizer,
        num_epochs,
        train_loader,
        val_loader,
        device,
        log_writer,
        log_name,
    )

    log_writer.flush()
log_writer.close()

In [17]:
# Choose and load the best model and evaluate it on the test set

# Re-instantiate the model and read best weights
model = FeedForwardDeep(my_dataset.num_features, 16, my_dataset.num_classes, 4)
model.load_state_dict(torch.load(model_dir + model._get_name() + "_" + "dim16-dp4-ep1000-lr0.01"))
model.to(device)

y_test, y_pred_c, _ = test_model(model, test_loader, device)
acc = (y_test == y_pred_c).float().sum() / y_test.shape[0]

print("Accuracy of the best model on the test set:", acc.cpu().numpy())

Accuracy of the best model on the test set: 0.93333334
