# Lecture 11: Neural Nets, a practical approach

This notebook presents a general OO implementation of feed forward neural networks. As of now, it supports fully connected layers with different activation functions (i.e., logits, ReLU, and linear). It also supports the softmax operation for calssification purposes. Two loss functions are currently implemented:
- MSE
- Cross Entropy with logits (Multiclass)

Backpropagation is implemented using automatic differentiation. This is achieved by registering in each leayer the corresponding derivative.

In [1]:
import numpy as np
import utils
import matplotlib.pyplot as plt

%matplotlib inline

### Vectorized ReLU and the corresponding gradient

In [2]:
def ReLU(x):
    return np.maximum(x, 0)

def ReLU_d(x):
    return np.array(x > 0, dtype=float)

### Vectorized logits and the corresponding gradient

In [3]:
def logits(x):
    return  1 / (1 + np.exp(-x))

def logits_d(x):
    return np.exp(x) / np.square((1 + np.exp(x)))

### Vectorized logits and the corresponding gradient

In [4]:
def linear(x):
    return x

def linear_d(x):
    return np.ones(x.shape)

### Vectorized softmax

In [5]:
def softmax(x):
    return np.exp(x - np.max(x)) / np.sum(np.exp(x - np.max(x)))

### Squared Cost, and corresponding error in the last layer (delta_L)

In [6]:
def MSE(predictions, targets):
    return np.square(predictions-targets)

def MSE_dL(predictions, targets, z=None):
    return -2 * (targets - predictions)

### Cross Entropy, and corresponding error in the last layer (delta_L)
Refer to the lecture notes to see how delta_L is computed for cross entropy loss with logits and softmax

In [7]:
def cross_entropy(predictions, targets):
    predictions = softmax(predictions)
    return -np.sum(targets * np.log(predictions))

def cross_entropy_dL_with_logits(predictions, targets, z):
    classes = len(targets)
    error = np.zeros(classes)
    norm_factor = np.sum(np.exp(predictions))
    for j in range(classes):
        tmp = 0
        for k in range(classes):
            if k == j:
                tmp -= targets[k] * (1 - np.exp(predictions[k]) / norm_factor)
            else:
                tmp -= targets[k] * (-np.exp(predictions[k]) / norm_factor)
        tmp *= logits(z[j]) * (1 - logits(z[j]))
        error[j] = tmp
    return error
    

### Class implementing a fully connected layer. 
This implementation is agnostic of the specific activation function, and uses the concept of higher order functions to provide a general framework to define layers. Note that in this way you can implement new activation functions without having to change the class implementing the Layers

In [8]:
class Layer(object):
    def __init__(self, input_size, layer_size, activation=linear, activation_d=linear_d):
        
        # here the constructor takes as parameters the activation function and the corresponding gradient,
        # this keeps the implementation general, and, in turn, allows to implement a generic version of
        # the backpropagation algorithm
        
        self.activation_function = activation
        self.activation_function_d = activation_d
        
        # Size of the weight matrix and bias vector
        self.input_size = input_size # K
        self.layer_size = layer_size # J
        
        # Weigth Matrix has input_size columns and layer_size rows (J x K)
        # I initialize the weights using He initialization
        self.W = np.random.randn(self.layer_size, self.input_size) * np.sqrt(2 / self.input_size)
        
        # Bias Vector, I initialize it to zero
        self.b = np.zeros(self.layer_size)
        
        # Input to activation function, I initialize it to zero
        self.z = np.zeros(self.layer_size)
        
        # Activations 
        self.a = np.zeros(self.layer_size)
        
        # Gradients
        self.W_grad = np.zeros((self.layer_size, self.input_size))
        self.b_grad = np.zeros(self.layer_size)
    
    # reset the gradients to zero (to be called between batches)
    def reset(self):
        self.W_grad = np.zeros((self.layer_size, self.input_size))
        self.b_grad = np.zeros(self.layer_size)
    
    # perform a forward pass
    def forward(self, x):
        self.z = np.matmul(self.W, x) + self.b
        self.a = self.activation_function(self.z)
        return self.a

### Class implementing a trainable feed forward neural network

In [9]:
class Network(object):
    def __init__(self, trainFlag):
        self.trainFlag = trainFlag
        self.layers = []
        self.deltas = []
        self.input = None
    
    # to be called to define
    def add_layer(self, l):
        self.layers.append(l)
        
    def forward_pass(self, x):
        self.deltas = []
        a = x.copy()
        self.input = a
        for layer_dict in self.layers:
            layer = layer_dict['layer']
            a = layer.forward(a)
        return a
    
    # Reset the gradients between mini batches
    def reset(self):
        for layer_dict in self.layers:
            layer_dict['layer'].reset()
    
    def aggregate(self, batch_size):
        for layer_dict in self.layers:
            layer = layer_dict['layer']
            layer.W_grad /= batch_size
            layer.b_grad /= batch_size
    
    # compute all the gradients of the cost function
    def backward_pass(self, deltaL):
        self.deltas = [deltaL]
        size = len(self.layers) - 1
        for i,j in zip(reversed(range(size)), [i for i in range(size)]):
            W_lp1 = self.layers[i+1]['layer'].W
            d_lp1 = self.deltas[j]
            z_l = self.layers[i]['layer'].z
            derivative = self.layers[i]['layer'].activation_function_d
            
            d_l = np.matmul(d_lp1, W_lp1) * derivative(z_l)
            self.deltas.append(d_l)
            
        for layer_idx, i in zip(range(len(self.layers)), reversed(range(len(self.deltas)))):
            prev_activations = self.layers[layer_idx - 1]['layer'].a if layer_idx > 0 else self.input
            layer = self.layers[layer_idx]['layer']
            layer.b_grad += self.deltas[i]
            layer.W_grad += np.array([prev_activations * d for d in self.deltas[i]])

### Vanilla stochastic gradient descent

In [10]:
def optimize(model, batch_size, learning_rate):
    model.aggregate(batch_size)
    layers = model.layers
    for layer_dict in layers:
        layer = layer_dict['layer']
        layer.W -= learning_rate * layer.W_grad
        layer.b -= learning_rate * layer.b_grad
    model.reset()
    
    

In [18]:
train_X, train_y, test_X, test_y = utils.load_data_class(fname='dataset.csv')

BATCH_SIZE = 128
LEARNING_RATE = 0.1

NORM_FACTOR = 1000

EPOCHS = 10

loss = []
accuracy_train = []
accuracy_test = []

### Network definition

In [19]:
net = Network(True)
l_1 = {
    'layer': Layer(2, 64, ReLU, ReLU_d),
    'input_size': 2,
    'layer_size': 64,
    'activation': 'relu'
}

l_2 = {
    'layer': Layer(64, 64, ReLU, ReLU_d),
    'input_size': 64,
    'layer_size': 64,
    'activation': 'relu'
}

l_3 = {
    'layer': Layer(64, 2, logits, logits_d),
    'input_size': 64,
    'layer_size': 2,
    'activation': 'logits'
}

net.add_layer(l_1)
net.add_layer(l_2)
net.add_layer(l_3)


accuracy = 0
for sample, target in zip(test_X, test_y):
        prediction = (np.argmax(softmax(net.forward_pass(sample))))
        true_class = int(np.argmax(target))
        accuracy += (prediction == true_class)

accuracy /= test_X.shape[0]

print('*' * 10 + ' ACCURACY ON TEST {:.3f}% [untrained network]'.format(accuracy * 100))
accuracy_test.append(accuracy)

********** ACCURACY ON TEST 49.586% [untrained network]


### Training Loop, and accuracy checking

In [20]:
for e in range(EPOCHS):
    for i in range(train_X.shape[0] // BATCH_SIZE):
        batch_samples = train_X[i * BATCH_SIZE:(i+1) * BATCH_SIZE]
        batch_targets = train_y[i * BATCH_SIZE:(i+1) * BATCH_SIZE]
        tmp_loss = []
        for sample, target in zip(batch_samples, batch_targets):
            prediction = net.forward_pass(sample / NORM_FACTOR)
            tmp_loss.append(cross_entropy(prediction, target))
            deltaL = cross_entropy_dL_with_logits(predictions=prediction, 
                                                  targets=target, 
                                                  z=net.layers[-1]['layer'].z)
            net.backward_pass(deltaL=deltaL)
            
        loss.append(np.mean(tmp_loss))
        
        net.aggregate(BATCH_SIZE)
        
        optimize(model=net, 
                 batch_size=BATCH_SIZE, 
                 learning_rate=LEARNING_RATE)
        #print(net.layers[0]['layer'].W_grad)
        
    print('DONE EPOCH {}, LOSS {:.3f}'.format(e + 1, loss[-1]))
    
    accuracy = 0
    for sample, target in zip(test_X, test_y):
        prediction = (np.argmax(softmax(net.forward_pass(sample / NORM_FACTOR))))
        true_class = int(np.argmax(target))
        accuracy += (prediction == true_class)

    accuracy /= test_X.shape[0]

    print('*' * 10 + ' ACCURACY ON TEST {:.3f}%'.format(accuracy * 100))
    accuracy_test.append(accuracy)

DONE EPOCH 1, LOSS 0.679
********** ACCURACY ON TEST 49.633%
DONE EPOCH 2, LOSS 0.633
********** ACCURACY ON TEST 73.227%
DONE EPOCH 3, LOSS 0.588
********** ACCURACY ON TEST 94.742%
DONE EPOCH 4, LOSS 0.558
********** ACCURACY ON TEST 99.219%
DONE EPOCH 5, LOSS 0.534
********** ACCURACY ON TEST 98.727%
DONE EPOCH 6, LOSS 0.513
********** ACCURACY ON TEST 99.000%
DONE EPOCH 7, LOSS 0.495
********** ACCURACY ON TEST 99.320%
DONE EPOCH 8, LOSS 0.480
********** ACCURACY ON TEST 99.594%
DONE EPOCH 9, LOSS 0.467
********** ACCURACY ON TEST 99.578%
DONE EPOCH 10, LOSS 0.457
********** ACCURACY ON TEST 99.555%
