## Multilayer Perceptron (MLP) based on Pytorch (2)

Multi-class classification problem - using a MLP with configurable number of hidden neurons - with a configurable number of classes (up to 10). It selects them from the (Fashion-)MNIST dataset, splits it up into a train and test part, does normalisation and then trains a classifier using softmax.

Both datasets consist of images with 28x28 = 784 pixel each. The features refer to these pixel values of the images.

You can choose MNIST or Fashion-MNIST data in cell [2]

We use the PyTorch nn-library providing all required layer types and in particular the Sequential Container to set up a MLP [torch.nn](https://pytorch.org/docs/stable/nn.html). In addition we use the DataLoader and the Datset classes for access to the data [Datasets&DataLoaders](https://pytorch.org/tutorials/beginner/basics/data_tutorial.html).

In [None]:
import torch
import torchvision
import numpy as np
import matplotlib.pyplot as plt
import time

from utils import plot_img, plot_tiles, plot_error, plot_cost, plot_parameter_hist

### Preparation for DataLoader below 
We will use a torch DataLoader below, which can do plenty of things - in particular the data normaliation using a transform function

In [None]:
#import shortcut (just for here)
from torchvision.transforms import v2

my_transform = v2.Compose([
    v2.ToImage(),  # Convert to tensor, data are PIL images
    v2.ToDtype(torch.float32, scale=False),  # convert to float; optionally normalize data 
                                            # (if True choose mean=[0.5], std=[0.5] below (why?)
    v2.Normalize(mean=[128.], std=[128.]),
])

In [None]:
#only at first execution data is downloaded, because it is saved in subfolder ../week1/data; 
#note the relative path to the 01.learning-optimization to avoid multiple downloads
data_set = 'FashionMNIST'
    
if data_set == 'MNIST':
    training_data = torchvision.datasets.MNIST(
        root="../week1/data",
        train=True,
        download=True,
        transform=my_transform
    )

    test_data = torchvision.datasets.MNIST(
        root="../week1/data",
        train=False,
        download=True,
        transform=my_transform
    )    

    #labels for MNIST (just for compatibility reasons)
    labels_map = {
        0: "Zero",
        1: "One",
        2: "Two",
        3: "Three",
        4: "Four",
        5: "Five",
        6: "Six",
        7: "Seven",
        8: "Eight",
        9: "Nine",
    }
else:
    training_data = torchvision.datasets.FashionMNIST(
        root="../week1/data",
        train=True,
        download=True,
        transform=my_transform
    )

    test_data = torchvision.datasets.FashionMNIST(
        root="../week1/data",
        train=False,
        download=True,
        transform=my_transform
    )

    #labels for FashionMNIST
    labels_map = {
        0: "T-Shirt",
        1: "Trouser",
        2: "Pullover",
        3: "Dress",
        4: "Coat",
        5: "Sandal",
        6: "Shirt",
        7: "Sneaker",
        8: "Bag",
        9: "Ankle Boot",
    }

### Verify Dataset type

We check that the training data instance is of type Dataset

In [None]:
from torch.utils.data import Dataset

print(isinstance(training_data, Dataset))
print(hasattr(training_data, '__len__'))
print(hasattr(training_data, '__getitem__'))

### Verify transform property

We check that the training data instance has the transform method

In [None]:
print(hasattr(training_data, 'transform'))

### Illustration of DataLoder concept
It is integrated into the processing below

In [None]:
# dataloaders (we use original set (training/test_data); own data has to realize the abstract class representing 'Dataset'
train_loader = torch.utils.data.DataLoader(training_data, batch_size=256, shuffle=True)


In [None]:
#setup iterator
data_iterator = iter(train_loader)

#fetch first batch of images (and corresponding labels)
images, labels = next(data_iterator)

#note that the images are a 4-dim tensor; first dimension is image index in batch, second is number of channels (colors)
print(images.shape)
print(labels.shape)
#print first ten labels -> change 'shuffle=True' to 'shuffle=False' to check that order will be always the same
print(labels[:10])
#plot first 10 image
plot_tiles(images, 1,10)

### Check the application of the transformation

In [None]:
#original data
print('tpye of original data: %r' % training_data.data.dtype)
print('min value of original data: %r' % torch.min(training_data.data).item())
print('max value of original data: %r' % torch.max(training_data.data).item())

#loaded data
print('\ntpye of loaded data: %r' % images.dtype)
print('min value of loaded data: %r' % torch.min(images).item())
print('max value of loaded data: %r' % torch.max(images).item())
#why is upper value not 1?

In [None]:
#setup loop over all batchs
data_iterator = iter(train_loader)

start=time.time()
#note shape of last batch
print('shape of image batches:')
for batch_iter in data_iterator:
    print(batch_iter[0].numpy().shape)
    
end=time.time()
print('time for transforming entire batch: %2.1f s' % (end-start))

### Dataloader inefficiency

Here we only do trivial things within the data loader like type conversion and normalisation. To do this in each epoch is a considerable overhead as we will show below. We will create our own dataset - doing the transformation only once during the setup - and speed up the dataloader iteration considerably.

### Create custom dataset

The following dataset, which realises the interface of the PyTorch Dataset class, does the type conversion and normalisation only once in the constructor and then only gives back the prepared images. It uses our previous method `prepare_data`.

In [None]:
class MyDataset(Dataset):
    """owns dataset."""

    def __init__(self, dataset, classes = torch.tensor([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), min_max_normalise=1, flatten=0):
        """
        Arguments:
        dataset -- a tuple with the [images, labels] of the original dataset
        classes -- list of classes to use for training (at least two classes must be given)
        min_max_normalise -- whether to do min-max-normalisation (1) or rescaling (0)
        flatten -- whether to flatten the 28x28 image to single row (=1); otherwise a new dimension is added at axis=1 (to be compatible with cnn)

        """
        self.prepare_data(dataset, classes, min_max_normalise, flatten)
        
        
    def __len__(self):
        return self.num_samples

    def __getitem__(self, idx):
        #add the missing map-dimension
        return self.x_sel[idx], self.y_sel[idx]


    def prepare_data(self, dataset, classes, min_max_normalise, flatten):
        x = dataset[0]
        y = dataset[1]
    
        if len(classes) < len(labels_map):
            for label in classes:
                print('labels chosen are: %r' % labels_map[label.item()])
    
        ind_sel = torch.isin(y, classes)
    
        x_sel = torch.zeros(x[ind_sel,:].shape, dtype=torch.float)
        x_sel.copy_(x[ind_sel,:])
        y_sel = torch.zeros(y[ind_sel].shape, dtype=y.dtype)
        y_sel.copy_(y[ind_sel])
    
        #replace the labels such that they are in successive order
        for i0 in range(0,len(classes)):
            if i0 != classes[i0]:
                y_sel[y_sel == classes[i0]] = i0
    
        #we give y back as simple vector -> simplifies handling below
        #y_sel = np.reshape(y_sel, (-1,1))
        
        #do train and test split
        self.num_samples = x_sel.shape[0]
            
        #perform normalisation, take care of converting data type to float!
        xmax, xmin = torch.max(x_sel), torch.min(x_sel)
        
        if min_max_normalise:
            x_sel = 2*(x_sel - xmin) / (xmax - xmin) - 1
        else:
            x_sel = x_sel / xmax 
    
        if flatten:
            m = x_sel.shape[0]
            x_sel = x_sel.reshape([m,-1])
        
        self.x_sel = torch.unsqueeze(x_sel,1)
        self.y_sel = y_sel

In [None]:
#now create a custom dataset based on the training dataset (we do the same later for the validation and testset)
my_dataset = MyDataset([training_data.data, training_data.targets])

train_loader = torch.utils.data.DataLoader(my_dataset, batch_size=256, shuffle=True)

#setup iterator
data_iterator = iter(train_loader)

#fetch first batch of images (and corresponding labels)
images, labels = next(data_iterator)

#note that the images are a 4-dim tensor; first dimension is image index in batch, second is number of channels (colors)
print(images.shape)
print(labels.shape)
#print first ten labels -> change 'shuffle=True' to 'shuffle=False' to check that order will be always the same
print(labels[:10])
#plot first 10 image
plot_tiles(images, 1,10)

In [None]:
#setup iterator
data_iterator = iter(train_loader)

#repeat the test to check the required time; it takes only a fraction of the previous data_loader
start=time.time()
#note shape of last batch
print('shape of image batches:')
for batch_iter in data_iterator:
    print(batch_iter[0].numpy().shape)
    
end=time.time()
print('time for transforming entire batch: %2.1f s' % (end-start))

In [None]:
#append rows x cols tiles of images
rows = 4
cols = 11
#figure size can be set
fig_size = [8,8]

#reset iterator (was at the end) and fetch a batch of images
data_iterator = iter(train_loader)
images, labels = next(data_iterator)

#plot a few images
plot_tiles(images, rows, cols, fig_size)

In [None]:
#choose a given class 0..9
digit  = 0

#fetch each time a new batch of images
images, labels = next(data_iterator)

select_images = images[labels==digit]

plot_tiles(select_images, rows, select_images.shape[0] // rows, [6,6])
print(labels_map[digit])

### Class NeuralNetwork

This class constructs a Multilayer Perceptron with a configurable number of hidden layers. Cost function is CE. The method $propagate()$ returns the prediction $$ \hat{y}^{(i)}=h_\theta(\mathbf{x}^{(i)}) $$ on the input data (can be a n x 784 matrix of n images) and $back\_propagate()$ determines the gradients of the cost function with respect to the parameters (weights and bias for all layers) $$ \nabla_{\mathbf{\theta}} J(\mathbf{\theta}) $$
The method $gradient\_descend()$ finally does the correction of the parameters with a step in the negative gradient direction, weighted with the learning rate $$\alpha$$ for all layers.

In [None]:
class NeuralNetwork:
    """
    MLP class handling the layers and doing all propagation and back propagation steps
    all hidden layers are dense (with ReLU activation) and the last layer is softmax
    """
    def __init__(self, list_num_neurons):
        """
        constructor

        Arguments:
        list_num_neurons -- list of layer sizes including in- and output layer
        
        """
        self.model = torch.nn.Sequential()
        #now we require a flatten tensor
        self.model.add_module('flatten', torch.nn.Flatten(start_dim=1, end_dim=-1))
        #first construct dense layers
        for i0 in range(len(list_num_neurons)-2):
            self.model.add_module('dense' + str(i0), torch.nn.Linear(list_num_neurons[i0], list_num_neurons[i0+1]))
            self.model.add_module('act' + str(i0), torch.nn.Sigmoid())
            
        #finally add softmax layer
        #we don't require activation function because it is included (for numerical reasons) in the cross 
        #entropy cost below; alternative is logSoftmax together with NLLLoss cost function
        self.model.add_module('dense' + str(i0+1), torch.nn.Linear(list_num_neurons[-2], list_num_neurons[-1]))                    
        
        self.cost_fn = torch.nn.CrossEntropyLoss(reduction='mean')
        #self.optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

        #used to save results
        self.result_data = torch.tensor([])
        
        #we keep a global step counter, thus that optimise can be called 
        #several times with different settings
        self.epoch_counter = 0 
        
    def propagate(self, x):
        """
        calculates the function estimation based on current parameters
        """    
        y_pred = self.model(x)

        return y_pred
           
     
    def back_propagate(self, cost):
        """
        calculates the backpropagation results based on expected output y
        this function must be performed AFTER the corresponding propagte step
        """    
        #set gradient values to zero
        self.model.zero_grad()
        
        cost.backward()
 

    def cost_funct(self, y_pred, y):
        """
        calculates the MSE loss function
        """
        cost = self.cost_fn(y_pred, y)
        
        return cost
    
         
    def gradient_descend(self, alpha):
        """
        does the gradient descend based on results from last back_prop step with learning rate alpha
        """
        with torch.no_grad():
            for param in self.model.parameters():
                param -= alpha * param.grad
            
         
    def calc_error(self, y_pred, y):
        """
        get error information
        """
        m = y.shape[0]

        y_pred_argmax = torch.argmax(y_pred, dim=1)
        error = torch.sum(y != y_pred_argmax) / m

        return error


    def save_images(self):
        #we save the training and test images for quick access during evaluation
        train_loader = torch.utils.data.DataLoader(self.data['train'], batch_size=len(self.data['train']), shuffle=False)
        train_iterator = iter(train_loader)
        self.train_images, self.train_labels = next(train_iterator)

        valid_loader = torch.utils.data.DataLoader(self.data['valid'], batch_size=len(self.data['valid']), shuffle=False)
        valid_iterator = iter(valid_loader)
        self.valid_images, self.valid_labels = next(valid_iterator)

    
    def append_result(self):
        """
        append cost and error data to output array
        """     
        # determine cost and error functions for train and validation data
        y_pred_train = self.propagate(self.train_images)
        y_pred_val = self.propagate(self.valid_images)

        res_data = torch.tensor([[self.cost_funct(y_pred_train, self.train_labels), 
                                  self.calc_error(y_pred_train, self.train_labels),
                                  self.cost_funct(y_pred_val, self.valid_labels), 
                                  self.calc_error(y_pred_val, self.valid_labels)]])
        
        self.result_data = torch.cat((self.result_data, res_data), 0)

        #increase epoch counter here (used for plot routines below)
        self.epoch_counter += 1 
        
        return res_data

        
    def optimise(self, data, epochs, alpha, batch_size=0, debug=0):
        """
        performs epochs number of gradient descend steps and appends result to output array

        Arguments:
        data -- dictionary with NORMALISED data
        epochs -- number of epochs
        alpha -- learning rate
        batch_size -- size of batches (1 = SGD, 1 < .. < n = mini-batch)
        debug -- integer value; get info on gradient descend step every debug-step (0 -> no output)
        """
        #access to data from other methods
        self.data = data
        #save images
        self.save_images()

        # dataloader for training image
        if batch_size == 0:
            batch_size = len(data['train'])
        train_loader = torch.utils.data.DataLoader(data['train'], batch_size=batch_size, shuffle=True)
        
        # save results before 1st step
        if self.epoch_counter == 0:
            res_data = self.append_result()

        for i0 in range(0, epochs):    
            #measure time for one epoch
            start=time.time()
            #set model to training mode
            self.model.train()
            #setup loop over all batchs
            data_iterator = iter(train_loader)
            for batch_iter in data_iterator:
                #do prediction
                y_pred = self.propagate(batch_iter[0])
                #determine the loss 
                cost = self.cost_funct(y_pred, batch_iter[1])
                #determine the error
                self.back_propagate(cost)
                #do the correction step
                self.gradient_descend(alpha)

            #save result
            self.model.eval()
            res_data = self.append_result()

            #end of time measurement
            end=time.time()
            
            if debug and np.mod(i0, debug) == 0:
                print('result after %d epochs (dt=%1.2f s), train: cost %.5f, error %.5f ; validation: cost %.5f, error %.5f'
                    % (self.epoch_counter-1, end-start, res_data[0, 0].item(), res_data[0, 1].item(), \
                                                                res_data[0, 2].item(), res_data[0, 3].item()))

        if debug:
            print('result after %d epochs, train: cost %.5f, error %.5f ; validation: cost %.5f, error %.5f'
                  % (self.epoch_counter-1, res_data[0, 0].item(), res_data[0, 1].item(), \
                                                                res_data[0, 2].item(), res_data[0, 3].item()))
                        
            

### Sample execution of Neural Network

The cell below shows how to use the class NeuralNetwork and how to perform the optimisation. The training and test data is given as dictionary in the call to the method $optimise()$. The classes (from 2 to 10) can be chosen via the `classes` list. This method can be called several times in a row with different arguments.

In [None]:
#choose the categories
classes = torch.tensor([0,1,2,3,4,5,6,7,8,9])

#split data in train and validation
validation_size = 0.2

#further split in train and validation data
validation_size = 0.2
valid_ind = int(len(training_data)*(1-validation_size))

#create custom training and validation data set
train_dataset = MyDataset([training_data.data[:valid_ind,:], training_data.targets[:valid_ind]], classes=classes)
valid_dataset = MyDataset([training_data.data[valid_ind:,:], training_data.targets[valid_ind:]], classes=classes)


#data is arranged as dictionary with quick access through respective keys
data = {'train' : train_dataset, 'valid' : valid_dataset}

#choose the hyperparameters you want to use for the initialisation
size_in = train_dataset[0][0].flatten().shape[0] #access to first image in torch.Subset train_data 
size_out = 10
list_num_neurons = [size_in, 100, size_out]; 
NNet = NeuralNetwork(list_num_neurons)

#choose the hyperparameters you want to use for training
epochs = 40
batchsize = 16
learning_rate = 0.05
NNet.optimise(data, epochs, learning_rate, batchsize, debug=5)


plot_error(NNet)
plot_cost(NNet)

plot_parameter_hist(NNet)

#also prepare the test dataset
test_dataset = MyDataset([test_data.data, test_data.targets], classes=classes)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=len(test_dataset), shuffle=False)
test_iterator = iter(test_loader)
test_images, test_labels = next(test_iterator)

y_pred = torch.argmax(NNet.propagate(test_images), axis=1)
false_classifications = test_images[(y_pred != test_labels)]

print('test error rate: %.2f %% out of %d' % (100*false_classifications.shape[0]/y_pred.shape[0], y_pred.shape[0]))
print(false_classifications.shape)


In [None]:
#analyse false classified training on test images
#also prepare the test dataset
test_dataset = MyDataset([test_data.data, test_data.targets], classes=classes)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=len(test_dataset), shuffle=False)
test_iterator = iter(test_loader)
test_images, test_labels = next(test_iterator)

y_pred = torch.argmax(NNet.propagate(test_images), axis=1)
false_classifications = test_images[(y_pred != test_labels)]

print('test error rate: %.2f %% out of %d' % (100*false_classifications.shape[0]/y_pred.shape[0], y_pred.shape[0]))
print(false_classifications.shape)

#append rows x cols tiles of digits
rows = 7
cols = 8
#figure size can be set
fig_size = [8,8]

plot_tiles(false_classifications.reshape([-1,28,28]), rows, cols, fig_size)

#print the correct labels (for FashionMNIST)
if rows*cols < false_classifications.shape[0]:
    false_classifications_y = test_labels[y_pred != test_labels][:rows*cols]
else:
    false_classifications_y = np.append(test_labels[y_pred != test_labels], np.ones(rows*cols - false_classifications.shape[0])*-1)
print(false_classifications_y.reshape([cols,rows]).T.to(torch.int32))

In [None]:
#visualise weights of the first layer

print('we have %r weight vectors in layer [0]' % NNet.model[1].weight.shape[1])
print('choose a suitable combination of rows and cols below to plot them')

rows = 5
cols = 20
#figure size can be set
fig_size = [14,6]

plot_tiles(NNet.model[1].weight.detach().reshape([-1,28,28]), rows, cols, fig_size)