In [1]:
import numpy as np
import pickle

In [2]:
config = {}

config['layer_specs'] = [784, 50, 10]  # The length of list denotes number of hidden layers; each element denotes number of neurons in that layer; first element is the size of input layer, last element is the size of output layer.
config['activation'] = 'tanh' # Takes values 'sigmoid', 'tanh' or 'ReLU'; denotes activation function for hidden layers
config['batch_size'] = 1000  # Number of training samples per batch to be passed to network
config['epochs'] = 20  # Number of epochs to train the model
config['early_stop'] = False  # Implement early stopping or not
config['early_stop_epoch'] = 5  # Number of epochs for which validation loss increases to be counted as overfitting
config['L2_penalty'] = 0.001  # Regularization constant
config['momentum'] = True  # Denotes if momentum is to be applied or not
config['momentum_gamma'] = 0.9  # Denotes the constant 'gamma' in momentum expression
config['learning_rate'] = 0.0001 # Learning rate of gradient descent algorithm


In [3]:
def softmax(x):
    return np.exp(x - max(x)) / float(sum(np.exp(x - max(x))))

In [4]:
def load_data(fname):
    data = pickle.load(open(fname, "rb"))
    
    images = data[:, :784]
    labels = data[:, 784]
    
    one_hot = np.zeros((labels.shape[0], 10))
    
    # Encode labels (0-9) to a one-hot encoding
    for i in range(one_hot.shape[0]):
        one_hot[i][int(labels[i])] = 1
        
    labels = one_hot
    
    return images, labels

In [12]:
class Activation:
    def __init__(self, activation_type = "sigmoid"):
        self.activation_type = activation_type
        self.x = None # Save the input 'x' for sigmoid or tanh or ReLU to this variable since it will be used later for computing gradients.
        
    def update(self, config):
        pass
    
    def save_weights(self):
        pass
        
    def restore_best_weights(self):
        pass
    
    def forward_pass(self, a):
        if self.activation_type == "sigmoid":
            return self.sigmoid(a)
    
        elif self.activation_type == "tanh":
            return self.tanh(a)
    
        elif self.activation_type == "ReLU":
            return self.ReLU(a)
        
    def backward_pass(self, delta):
        if self.activation_type == "sigmoid":
            grad = self.grad_sigmoid()
    
        elif self.activation_type == "tanh":
            grad = self.grad_tanh()
    
        elif self.activation_type == "ReLU":
            grad = self.grad_ReLU()
    
        return grad * delta
      
    def sigmoid(self, x):
        self.x = x
        
        # prevent overflow
        self.x = np.clip(self.x, -100, 100)

        return 1.0 / (1.0 + np.exp(-self.x)) 

    def tanh(self, x):
        self.x = x
        return np.tanh(x)

    def ReLU(self, x):
        """
        Write the code for ReLU activation function that takes in a numpy array and returns a numpy array.
        """
        self.x = x
        return x * (x > 0)

    def grad_sigmoid(self):
        return self.sigmoid(self.x) * (1 - self.sigmoid(self.x))

    def grad_tanh(self):
        return 1 - np.power(self.tanh(self.x), 2)

    def grad_ReLU(self):
        """
        Write the code for gradient through ReLU activation function that takes in a numpy array and returns a numpy array.
        """
        return self.x > 0


class Layer():
    def __init__(self, in_units, out_units):
        np.random.seed(42)
        self.w = np.random.randn(in_units, out_units)         # Weight matrix
        self.b = np.zeros((1, out_units)).astype(np.float32)  # Bias
        self.x = None    # Save the input to forward_pass in this
        self.a = None    # Save the output of forward pass in this (without activation)
        self.d_x = None  # Save the gradient w.r.t x in this
        self.d_w = np.zeros((in_units, out_units)).astype(np.float32)  # Save the gradient w.r.t w in this
        self.d_b = np.zeros((1, out_units)).astype(np.float32)  # Save the gradient w.r.t b in this
        
        self.best_w = None
        self.best_b = None
        
        self.v_w = np.zeros((in_units, out_units)).astype(np.float32)
        self.v_b = np.zeros((1, out_units)).astype(np.float32)
        
    def update(self, config):
        if config['momentum']:
            self.v_w = config['learning_rate'] * self.d_w + config['momentum_gamma'] * self.v_w
            self.w += self.v_w
            
            self.v_b = config['learning_rate'] * self.d_b + config['momentum_gamma'] * self.v_b
            self.b += self.v_b
        else:
            self.w += config['learning_rate'] * self.d_w
            self.b += config['learning_rate'] * self.d_b
        
    def save_weights(self):
        self.best_w = self.w
        self.best_b = self.b
        
    def restore_best_weights(self):
        self.w = self.best_w
        self.b = self.best_b

    def forward_pass(self, x):
        self.x = x
        self.a = np.matmul(self.w.T, self.x)
        self.a += self.b[0,:]
        
        return self.a
  
    def backward_pass(self, delta):  
        self.d_w = delta * np.array([self.x]).T - config['L2_penalty'] * self.d_w
        self.d_x = np.matmul(delta, self.w.T)
        self.d_b = delta - config['L2_penalty'] * self.d_b 
                
        return self.d_x

      
class Neuralnetwork():
    def __init__(self, config):
        self.layers = []
        self.x = None        # Save the input to forward_pass in this
        self.y = None        # Save the output vector of model in this
        self.targets = None  # Save the targets in forward_pass in this variable
        
        for i in range(len(config['layer_specs']) - 1):
            self.layers.append( Layer(config['layer_specs'][i], config['layer_specs'][i+1]) )
            
            if i < len(config['layer_specs']) - 2:
                self.layers.append(Activation(config['activation']))  
    
    def loss_func(self, logits, targets):
        total = 0           # total accumulated loss
    
        return - np.log(np.dot(logits,targets))

    def update(self, config):
        for layer in self.layers:
            layer.update(config)
            
    def save_weights(self):
        for layer in self.layers:
            layer.save_weights()
            
    def restore_best_weights(self):
        for layer in self.layers:
            layer.restore_best_weights()
    
    
    def forward_pass(self, x, targets=None):
        x = x[0]
        
        self.x = x
        self.targets = targets
        
        # Propagate the data
        for layer in self.layers:
            x = layer.forward_pass(x)
        self.y = softmax(x)
        
        # Calculate the loss
        if self.targets is None:
            loss = None
        else:
            loss = self.loss_func(self.y, self.targets)
        
        return loss, self.y

    def backward_pass(self):
        delta = np.array([self.targets - self.y])
        
        for layer in reversed(self.layers):
            delta = layer.backward_pass(delta)

In [13]:
def get_random_samples(x, y, percentage):
    number_of_indices = len(x) * percentage / 100
    indices = np.random.permutation(len(x))[: int(number_of_indices)]
    
    randomX = [x[i] for i in indices]
    randomY = [y[i] for i in indices]
    
    randomX = np.array(randomX)
    randomY = np.array(randomY)
    
    return randomX, randomY

def accuracy(predicts, actuals):
    predicts = np.round_(predicts)
    correct = np.equal(actuals, predicts)
    
    return np.sum(correct) * 1.0 / correct.size

def trainer(model, X_train, y_train, X_valid, y_valid, config):   
    min_loss = 99999999
    best_epoch = 0
    early_stop_count = 0
    
    for i in range(1, config['epochs'] + 1):        
        train_predictions = []
        train_loss = 0
        
        valid_predictions = []
        valid_loss = 0
        
        for x, y in zip(X_train, y_train):
            l, p = model.forward_pass([x], y)
            model.backward_pass()
            model.update(config)
            
            train_predictions.append(p)
            train_loss += l
                    
        for x, y in zip(X_valid, y_valid):
            l, p = model.forward_pass([x], y)
            
            valid_predictions.append(p)
            valid_loss += l
            
        if valid_loss > min_loss:
            early_stop_count += 1
            
            if config['early_stop'] and early_stop_count == 5:
                break
            pass
        else:
            early_stop_count = 0
            min_loss = valid_loss
            model.save_weights()
            best_epoch = i
            
        train_accuracy = accuracy(train_predictions, y_train)
        valid_accuracy = accuracy(valid_predictions, y_valid)
            
        print("Epoch " + str(i) + "(Train): Acc = " + str(train_accuracy) + ", Loss = " + str(train_loss))
        print("Epoch " + str(i) + "(Valid): Acc = " + str(valid_accuracy) + ", Loss = " + str(valid_loss))
        
    model.restore_best_weights()
    return best_epoch, min_loss, train_accuracy

def test(model, X_test, y_test, config):
    predictions = []
    loss = 0
    
    for x, y in zip(X_test, y_test):
        l, p = model.forward_pass([x], y)
        
        predictions.append(p)
        loss += l

    return accuracy(predictions, y_test), loss

In [14]:
def est_gradient_given_b(eps, model, X_train, y_train, l_index, b_index):
    og_params = model.layers[l_index].b[0,b_index]
    
    model.layers[l_index].b[0,b_index] += eps
    loss1 = model.forward_pass(X_train, y_train)[0]   
    model.layers[l_index].b[0,b_index] -= (2.0 * eps)
    loss2 = model.forward_pass(X_train, y_train)[0]
    
    model.layers[l_index].b[0,b_index] = og_params
    
    return -(loss1 - loss2) / (2.0 * eps)

def est_gradient_given_w(eps, model, X_train, y_train, l_index, w_index):
    og_params = model.layers[l_index].w[w_index]
    
    model.layers[l_index].w[w_index] += eps
    loss1 = model.forward_pass(X_train, y_train)[0]   
    model.layers[l_index].w[w_index] -= (2.0 * eps)
    loss2 = model.forward_pass(X_train, y_train)[0]
    
    model.layers[l_index].w[w_index] = og_params
    
    return -(loss1 - loss2) / (2.0 * eps)

In [15]:
def calc_gradient_given_b(model, X_train, y_train, l_index, b_index):
    model.forward_pass(X_train, y_train)[0]
    model.backward_pass()
    
    return model.layers[l_index].d_b[0,b_index]

def calc_gradient_given_w(model, X_train, y_train, l_index, w_index):
    model.forward_pass(X_train, y_train)[0]
    model.backward_pass()
    
    return model.layers[l_index].d_w[w_index]

In [16]:
def compare_gradients(eps, model, X_train, y_train, weights, biases):
    est_grads = []
    calc_grads = []
    
    for i in range(len(weights)):
        layer = weights[i][0]
        param = weights[i][1]
        
        est_grads.append(est_gradient_given_w(eps, model, X_train, y_train, layer, param))
        calc_grads.append(calc_gradient_given_w(model, X_train, y_train, layer, param))
    
    for i in range(len(biases)):
        layer = biases[i][0]
        param = biases[i][1]
        
        est_grads.append(est_gradient_given_b(eps, model, X_train, y_train, layer, param))
        calc_grads.append(calc_gradient_given_b(model, X_train, y_train, layer, param))
    
    return est_grads, calc_grads

In [17]:
'''
### To compare approximated gradient to backprop gradient ###

1) Uncomment the below code block
2) Change config['layer_specs'] to [784, 100, 100, 10]
3) Comment-out 'main' 
4) Rerun file
'''

'''
train_data_fname = 'MNIST_train.pkl'
valid_data_fname = 'MNIST_valid.pkl'
test_data_fname = 'MNIST_test.pkl'
    
X_train, y_train = load_data(train_data_fname)
X_valid, y_valid = load_data(valid_data_fname)
X_test, y_test = load_data(test_data_fname)

toy_model = Neuralnetwork(config)

eps = 0.01         # epsilon used to approximate gradient
    
out_b = (4, 3)     # output bias weight 
hid_b = (2, 82)    # hidden bias weight
    
# 2 hidden to output weights
hid_w1 = (2, (62,4))
hid_w2 = (2, (36,7))
    
# 2 input to hidden weights
in_w1 = (0, (539,21))
in_w2 = (0, (420,99))
    
weights = [hid_w1, hid_w2, in_w1, in_w2]
biases  = [out_b, hid_b]
    
grads = compare_gradients(eps, toy_model, X_train, y_train[0], weights, biases)

est_grads = grads[0]
calc_grads = grads[1]

   
### Finish comparisons ###
''';

In [None]:
if __name__ == "__main__":
    train_data_fname = 'MNIST_train.pkl'
    valid_data_fname = 'MNIST_valid.pkl'
    test_data_fname = 'MNIST_test.pkl'
    
    X_train, y_train = load_data(train_data_fname)
    X_valid, y_valid = load_data(valid_data_fname)
    X_test, y_test = load_data(test_data_fname)
        
    epochs = []
    losses = []
    
    train_accs = []
    test_accs = []
    
    ### Train the network ###
    for i in range(10):
        model = Neuralnetwork(config)
        X_t, y_t = get_random_samples(X_train, y_train, 90)

        print("*** Training model ***")
        e, l, accuracy = trainer(model, X_t, y_t, X_valid, y_valid, config)
        epochs.append(e)
        losses.append(l)

        print("*** Testing model ***")
        accuracy, loss = test(model, X_test, y_test, config)
        print("Test: Acc = " + str(accuracy) + ", Loss = " + str(loss))
    
    print(epochs)
    print(np.mean(epochs))  

*** Training model ***
Epoch 1(Train): Acc = 0.9042933333333333, Loss = 96558.67457886791
Epoch 1(Valid): Acc = 0.92564, Loss = 13306.3104379611
Epoch 2(Train): Acc = 0.9322577777777777, Loss = 52644.11345219278
Epoch 2(Valid): Acc = 0.9396, Loss = 10126.30663351192
Epoch 3(Train): Acc = 0.9429888888888889, Loss = 42675.19698394359
Epoch 3(Valid): Acc = 0.9478, Loss = 8721.374934080079
