## Exercise 1

In [3]:
import torch
import torchvision
import numpy as np
import matplotlib.pyplot as plt
import time

from torch.utils.tensorboard import SummaryWriter

In [None]:
training_data = torchvision.datasets.CIFAR10(
    root="data",
    train=True,
    download=True,
    transform=torchvision.transforms.ToTensor()
)

test_data = torchvision.datasets.CIFAR10(
    root="data",
    train=False,
    download=True,
    transform=torchvision.transforms.ToTensor()
)

labels_map = {
    0: "Airplane",
    1: "Automobile",
    2: "Bird",
    3: "Cat",
    4: "Deer",
    5: "Dog",
    6: "Frog",
    7: "Horse",
    8: "Ship",
    9: "Truck",
}

### Create custom dataset

The following dataset does the type conversion and normalisation only once in the constructor and then only gives back the prepared images. It uses our previous method `prepare_data`.

In [None]:
from torch.utils.data import Dataset

class MyDataset(Dataset):
    """owns dataset."""

    def __init__(self, dataset, classes = torch.tensor([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), min_max_normalise=1, flatten=0):
        """
        Arguments:
        dataset -- a tuple with the [images, labels] of the original dataset
        classes -- list of classes to use for training (at least two classes must be given)
        min_max_normalise -- whether to do min-max-normalisation (1) or rescaling (0)
        flatten -- whether to flatten the 28x28 image to single row (=1);

        """
        self.prepare_data(dataset, classes, min_max_normalise, flatten)
        
        
    def __len__(self):
        return self.num_samples

    def __getitem__(self, idx):
        #add the missing map-dimension
        return self.x_sel[idx], self.y_sel[idx]


    def prepare_data(self, dataset, classes, min_max_normalise, flatten):
        x = dataset[0]
        y = dataset[1]
    
        if len(classes) < len(labels_map):
            for label in classes:
                print('labels chosen are: %r' % labels_map[label.item()])
    
        ind_sel = torch.isin(y, classes)
    
        x_sel = torch.zeros(x[ind_sel,:].shape, dtype=torch.float)
        x_sel.copy_(x[ind_sel,:])
        y_sel = torch.zeros(y[ind_sel].shape, dtype=y.dtype)
        y_sel.copy_(y[ind_sel])
    
        #replace the labels such that they are in successive order
        for i0 in range(0,len(classes)):
            if i0 != classes[i0]:
                y_sel[y_sel == classes[i0]] = i0
    
        #we give y back as simple vector -> simplifies handling below
        #y_sel = np.reshape(y_sel, (-1,1))
        
        #do train and test split
        self.num_samples = x_sel.shape[0]
            
        #perform normalisation, take care of converting data type to float!
        xmax, xmin = torch.max(x_sel), torch.min(x_sel)
        
        if min_max_normalise:
            x_sel = 2*(x_sel - xmin) / (xmax - xmin) - 1
        else:
            x_sel = x_sel / xmax 
    
        if flatten:
            m = x_sel.shape[0]
            x_sel = x_sel.reshape([m,-1])
        
        self.x_sel = torch.unsqueeze(x_sel,1)
        self.y_sel = y_sel

### Set output directory for tensorboard

This folder is relative to the working path on the hard disk

In [None]:
writer = SummaryWriter('tensorboard/fashion_mnist_experiment')

### we can send images to tensorboard

In [None]:
# get some random training images (add map dimension at position 1)
my_images = torch.unsqueeze(training_data.data[:20],1)

# create grid of images
img_grid = torchvision.utils.make_grid(my_images)


# write to tensorboard
writer.add_image('a_set_of_fashion_mnist_images', img_grid)

### Class NeuralNetwork

This class constructs a Multilayer Perceptron with a configurable number of hidden layers. Cost function is CE. The method $propagate()$ returns the prediction $$ \hat{y}^{(i)}=h_\theta(\mathbf{x}^{(i)}) $$ on the input data (can be a n x 784 matrix of n images) and $back\_propagate()$ determines the gradients of the cost function with respect to the parameters (weights and bias for all layers) $$ \nabla_{\mathbf{\theta}} J(\mathbf{\theta}) $$
The method $gradient\_descend()$ finally does the correction of the parameters with a step in the negative gradient direction, weighted with the learning rate $$\alpha$$ for all layers.

In [None]:
class NeuralNetwork:
    """
    MLP class handling the layers and doing all propagation and back propagation steps
    all hidden layers are dense (with ReLU activation) and the last layer is softmax
    """
    def __init__(self, list_num_neurons):
        """
        constructor

        Arguments:
        list_num_neurons -- list of layer sizes including in- and output layer
        
        """
        self.model = torch.nn.Sequential()
        #now we require a flatten tensor
        self.model.add_module('flatten', torch.nn.Flatten(start_dim=1, end_dim=-1))
        #first construct dense layers
        for i0 in range(len(list_num_neurons)-2):
            self.model.add_module('dense' + str(i0), torch.nn.Linear(list_num_neurons[i0], list_num_neurons[i0+1]))
            self.model.add_module('act' + str(i0), torch.nn.ReLU())
            
        #finally add softmax layer
        self.model.add_module('dense' + str(i0+1), torch.nn.Linear(list_num_neurons[-2], list_num_neurons[-1]))
        self.model.add_module('act' + str(i0+1), torch.nn.Softmax(dim=1))
                         
        
        self.cost_fn = torch.nn.CrossEntropyLoss(reduction='mean')
        
        #used to save results
        self.result_data = torch.tensor([])
        
        #we keep a global step counter, thus that optimise can be called 
        #several times with different settings
        self.epoch_counter = 0 
        
    def propagate(self, x):
        """
        calculates the function estimation based on current parameters
        """            
        y_pred = self.model(x)

        return y_pred
           
     
    def back_propagate(self, cost):
        """
        calculates the backpropagation results based on expected output y
        this function must be performed AFTER the corresponding propagte step
        """    
        #set gradient values to zero
        self.model.zero_grad()
              
        cost.backward()
 

    def cost_funct(self, y_pred, y):
        """
        calculates the MSE loss function
        """
        cost = self.cost_fn(y_pred, y)
        
        return cost
    
         
    def gradient_descend(self, alpha):
        """
        does the gradient descend based on results from last back_prop step with learning rate alpha
        """
        with torch.no_grad():
            self.optimizer.step()
            
         
    def calc_error(self, y_pred, y):
        """
        get error information
        """
        m = y.shape[0]

        y_pred_argmax = torch.argmax(y_pred, dim=1)
        error = torch.sum(y != y_pred_argmax) / m

        return error

    
    def append_result(self):
        """
        append cost and error data to output array
        """
        #this takes quite a long time (transform is applied to all images) but is only executed once 
        #then the images are available for quick execution of propagation step
        if self.epoch_counter == 0: 
            # dataloaders (we use original set (training/test_data); own data has to realize the abstract class representing 'Dataset'
            train_loader = torch.utils.data.DataLoader(self.data['train'], batch_size=len(self.data['train']), shuffle=False)
            train_iterator = iter(train_loader)
            self.train_images, self.train_labels = next(train_iterator)
    
            valid_loader = torch.utils.data.DataLoader(self.data['valid'], batch_size=len(self.data['valid']), shuffle=False)
            valid_iterator = iter(valid_loader)
            self.valid_images, self.valid_labels = next(valid_iterator)
      
        # determine cost and error functions for train and validation data
        y_pred_train = self.propagate(self.train_images)
        y_pred_val = self.propagate(self.valid_images)

        res_data = torch.tensor([[self.cost_funct(y_pred_train, self.train_labels), 
                                  self.calc_error(y_pred_train, self.train_labels),
                                  self.cost_funct(y_pred_val, self.valid_labels), 
                                  self.calc_error(y_pred_val, self.valid_labels)]])
        
        self.result_data = torch.cat((self.result_data, res_data), 0)

        #send data to tensorboard   
        writer.add_scalars('loss', {'train': res_data[0, 0].item(), \
                                   'validate': res_data[0, 2].item()}, self.epoch_counter)

        writer.add_scalars('error',{'train': res_data[0, 1].item(), \
                                   'validate': res_data[0, 3].item()}, self.epoch_counter)

        #increase epoch counter here (used for plot routines below)
        self.epoch_counter += 1 
        
        return res_data

        
    def optimise(self, data, epochs, alpha, batch_size=0, debug=0):
        """
        performs epochs number of gradient descend steps and appends result to output array

        Arguments:
        data -- dictionary with NORMALISED data
        epochs -- number of epochs
        alpha -- learning rate
        batch_size -- size of batches (1 = SGD, 1 < .. < n = mini-batch)
        debug -- integer value; get info on gradient descend step every debug-step (0 -> no output)
        """
        #access to data from other methods
        self.data = data

        #we define the optimiser
        self.optimizer = torch.optim.SGD(self.model.parameters(), lr=alpha, momentum=0.)
        #self.optimizer = torch.optim.Adam(self.model.parameters(), lr=alpha)

        # dataloader for training image
        train_loader = torch.utils.data.DataLoader(data['train'], batch_size=batch_size, shuffle=True)
        
        # save results before 1st step
        if self.epoch_counter == 0:
            res_data = self.append_result()

        for i0 in range(0, epochs):    
            #measure time for one epoch
            start=time.time()
            #setup loop over all batchs
            data_iterator = iter(train_loader)
            for batch_iter in data_iterator:
                #do prediction
                y_pred = self.propagate(batch_iter[0])
                #determine the loss 
                cost = self.cost_funct(y_pred, batch_iter[1])
                #determine the error
                self.back_propagate(cost)
                #do the correction step
                self.gradient_descend(alpha)

            #save result
            res_data = self.append_result()

            #end of time measurement
            end=time.time()
            
            if debug and np.mod(i0, debug) == 0:
                print('result after %d epochs (dt=%1.2f s)' % (self.epoch_counter-1, end-start))

        if debug:
            print('result after %d epochs, train: cost %.5f, error %.5f ; validation: cost %.5f, error %.5f'
                  % (self.epoch_counter-1, res_data[0, 0].item(), res_data[0, 1].item(), \
                                                                res_data[0, 2].item(), res_data[0, 3].item()))
                        
            

### Sample execution of Neural Network

#### We split the creation and optimisation

The cells below shows how to use the class NeuralNetwork and how to perform the optimisation. The training and test data is given as dictionary in the call to the method $optimise()$. The classes (from 2 to 10) can be chosen via the `classes` list. This method can be called several times in a row with different arguments.

In [None]:
#choose the categories
classes = torch.tensor([0,1,2,3,4,5,6,7,8,9])

#split data in train and validation
validation_size = 0.2

#further split in train and validation data
validation_size = 0.2
valid_ind = int(len(training_data)*(1-validation_size))

#create custom training and validation data set
train_dataset = MyDataset([training_data.data[:valid_ind,:], training_data.targets[:valid_ind]], classes=classes)
valid_dataset = MyDataset([training_data.data[valid_ind:,:], training_data.targets[valid_ind:]], classes=classes)


#data is arranged as dictionary with quick access through respective keys
data = {'train' : train_dataset, 'valid' : valid_dataset}

#choose the hyperparameters you want to use for the initialisation
size_in = train_dataset[0][0].flatten().shape[0] #access to first image in torch.Subset train_data 
size_out = 10
list_num_neurons = [size_in, 100, size_out]; 
NNet = NeuralNetwork(list_num_neurons)



### Send the graph to tensorboard

In [None]:
writer.add_graph(NNet.model, my_images.float())
writer.close()

### Add data for embedding to tensorboard 

Its more a gadget but nice to see 
(you may have to reload the tensorboard page or even restart the tensorboard in the console to see the `projector` icon on the task bar

In [None]:
#get a larger set of images and labels
num_samples = 200
my_images = training_data.data[:num_samples]
my_labels = training_data.targets[:num_samples]

# log embeddings
writer.add_embedding(my_images.view(-1, 28 * 28),
                     metadata=my_labels,
                     label_img=my_images.unsqueeze(1))
writer.close()

### Now run the training and observe the scalar output on tensorboard

We see, that we can keep the code clean of any output and rely completely on tensorboard for that

In [None]:
#choose the hyperparameters you want to use for training
epochs = 100
batchsize = 16
learning_rate = 0.05
NNet.optimise(data, epochs, learning_rate, batchsize, debug=5)

#also prepare the test dataset
test_dataset = MyDataset([test_data.data, test_data.targets], classes=classes)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=len(test_dataset), shuffle=False)
test_iterator = iter(test_loader)
test_images, test_labels = next(test_iterator)

y_pred = torch.argmax(NNet.propagate(test_images), axis=1)
false_classifications = test_images[(y_pred != test_labels)]

print('test error rate: %.2f %% out of %d' % (100*false_classifications.shape[0]/y_pred.shape[0], y_pred.shape[0]))
print(false_classifications.shape)
