In [16]:
import numpy as np
import pickle
from tqdm import tqdm_notebook as tqdm

config = {}
config['layer_specs'] = [784, 100, 100, 10]  # The length of list denotes number of hidden layers; each element denotes number of neurons in that layer; first element is the size of input layer, last element is the size of output layer.
config['activation'] = 'sigmoid' # Takes values 'sigmoid', 'tanh' or 'ReLU'; denotes activation function for hidden layers
config['batch_size'] = 200  # Number of training samples per batch to be passed to network
config['epochs'] = 50  # Number of epochs to train the model
config['early_stop'] = True  # Implement early stopping or not
config['early_stop_epoch'] = 5  # Number of epochs for which validation loss increases to be counted as overfitting
config['L2_penalty'] = 0  # Regularization constant
config['momentum'] = True  # Denotes if momentum is to be applied or not
config['momentum_gamma'] = 0.9  # Denotes the constant 'gamma' in momentum expression
config['learning_rate'] = 0.007 # Learning rate of gradient descent algorithm

def softmax(x):
  """
  Write the code for softmax activation function that takes in a numpy array and returns a numpy array.
  """
  out_exp = np.exp(x)
  sum_out_exp = np.exp(x).sum(axis=1)  
  output = out_exp/sum_out_exp[:,None]  
  return output

def sigmoid(x):
  """
  General Sigmoid function
  """
  return 1./(1. + np.exp(-x))

def load_data(fname):
  """
  Write code to read the data and return it as 2 numpy arrays.
  Make sure to convert labels to one hot encoded format.
  """
  f = open(fname, 'rb')
  data = pickle.load(f)
  f.close()
  images, labels = data[:, :-1], data[:, -1]

  labels = labels.astype(np.int)
  onehotlabels = np.zeros((len(labels), labels.max()+1))
  onehotlabels[np.arange(len(labels)), labels] = 1
  return images, onehotlabels


class Activation:
  def __init__(self, activation_type = "sigmoid"):
    self.activation_type = activation_type
    self.x = None # Save the input 'x' for sigmoid or tanh or ReLU to this variable since it will be used later for computing gradients.
  
  def forward_pass(self, a):
    if self.activation_type == "sigmoid":
      return self.sigmoid(a)
    
    elif self.activation_type == "tanh":
      return self.tanh(a)
    
    elif self.activation_type == "ReLU":
      return self.ReLU(a)
  
  def backward_pass(self, delta):
    if self.activation_type == "sigmoid":
      grad = self.grad_sigmoid()
    
    elif self.activation_type == "tanh":
      grad = self.grad_tanh()
    
    elif self.activation_type == "ReLU":
      grad = self.grad_ReLU()
    
    return grad * delta
      
  def sigmoid(self, x):
    """
    Write the code for sigmoid activation function that takes in a numpy array and returns a numpy array.
    """
    self.x = x
    output = sigmoid(x)
    return output

  def tanh(self, x):
    """
    Write the code for tanh activation function that takes in a numpy array and returns a numpy array.
    """
    self.x = x
    output = np.tanh(x)
    return output

  def ReLU(self, x):
    """
    Write the code for ReLU activation function that takes in a numpy array and returns a numpy array.
    """
    self.x = x
    output = np.maximum(x, 0)
    return output

  def grad_sigmoid(self):
    """
    Write the code for gradient through sigmoid activation function that takes in a numpy array and returns a numpy array.
    """
    sigmoid_x = sigmoid(self.x)
    grad = sigmoid_x * (1-sigmoid_x)
    return grad

  def grad_tanh(self):
    """
    Write the code for gradient through tanh activation function that takes in a numpy array and returns a numpy array.
    """
    tanh_x = np.tanh(self.x)
    grad = 1 - (tanh_x * tanh_x)
    return grad

  def grad_ReLU(self):
    """
    Write the code for gradient through ReLU activation function that takes in a numpy array and returns a numpy array.
    """
    grad = np.where(self.x <= 0, 0, 1)
    return grad


class Layer():
  def __init__(self, in_units, out_units):
    np.random.seed(42)
    self.w = np.random.randn(in_units, out_units)  # Weight matrix
    self.b = np.zeros((1, out_units)).astype(np.float32)  # Bias
    self.x = None  # Save the input to forward_pass in this
    self.a = None  # Save the output of forward pass in this (without activation)
    self.d_x = None  # Save the gradient w.r.t x in this (AKA Delta to pass to previous layer = dE/dx)
    self.d_w = None  # Save the gradient w.r.t w in this (AKA dE/dw = Delta received . dx/dw)
    self.d_b = None  # Save the gradient w.r.t b in this (AKA dE/db = Delta received . 1)

  def forward_pass(self, x):
    """
    Write the code for forward pass through a layer. Do not apply activation function here.
    """
    self.x = x
    self.a = x @ self.w + self.b
    return self.a
  
  def backward_pass(self, delta):
    """
    Write the code for backward pass. This takes in gradient from its next layer as input,
    computes gradient for its weights and the delta to pass to its previous layers.
    """

    # This is dE/dw = delta received . dx/dw
    self.d_w = (np.swapaxes(self.x.T[:,:,np.newaxis],1,2) * np.swapaxes(delta[:,:,np.newaxis], 0, 2)).sum(axis=2)

    # This is dE/db = delta received . 1
    self.d_b = delta.sum(axis=0)
                
    # This is delta to be passed to previous layer
    self.d_x = delta @ self.w.T
    return self.d_x

      
class Neuralnetwork():
  def __init__(self, config):
    self.layers = []
    self.x = None  # Save the input to forward_pass in this
    self.y = None  # Save the output vector of model in this
    self.targets = None  # Save the targets in forward_pass in this variable
    for i in range(len(config['layer_specs']) - 1):
      self.layers.append( Layer(config['layer_specs'][i], config['layer_specs'][i+1]) )
      
      # Unless it's output unit, add Activation layer on top
      if i < len(config['layer_specs']) - 2:
        self.layers.append(Activation(config['activation']))  
    
  def forward_pass(self, x, targets=None):
    """
    Write the code for forward pass through all layers of the model and return loss and predictions.
    If targets == None, loss should be None. If not, then return the loss computed.
    """
    self.x = x
    self.targets = targets
    
    # Input layer
    out = self.layers[0].forward_pass(x)
    
    # Forward...
    for layer in self.layers[1:]:
      out = layer.forward_pass(out)
      
    # Softmax
    self.y = softmax(out)
        
    # Cross-entropy loss
    if targets is not None:
      return self.loss_func(self.y, targets), self.y
    else:    
      return None, self.y

  def loss_func(self, logits, targets):
    '''
    find cross entropy loss between logits and targets
    '''
    output = -(targets * np.log(logits)).sum()/len(targets)
    return output
    
  def backward_pass(self):
    '''
    implement the backward pass for the whole network. 
    hint - use previously built functions.
    '''
    delta = self.targets - self.y
    
    for layer in reversed(self.layers):
      delta = layer.backward_pass(delta)
    return delta

def trainer(model, X_train, y_train, X_valid, y_valid, config):
  """
  Write the code to train the network. Use values from config to set parameters
  such as L2 penalty, number of epochs, momentum, etc.
  """
  
  BATCH_SIZE = config['batch_size']
  N_EPOCHS = config['epochs']
  LEARNING_RATE = config['learning_rate']
  
  N_BATCHES = len(X_train) // BATCH_SIZE

  EPOCHS_THRESHOLD = config['early_stop_epoch']

  GAMMA = 1
  if config['momentum']:
    GAMMA = config['momentum_gamma']
  
  print("N Batches: ",N_BATCHES, "Batch size", BATCH_SIZE)

  best_weight_layers = []
  min_loss = float('inf')
  prev_loss = float('inf')
  consecutive_epochs = 0
  
  for i_epoch in tqdm(range(N_EPOCHS)):
    for i_minibatch in range(0, len(X_train), BATCH_SIZE):
      X_batch = X_train[i_minibatch:i_minibatch + BATCH_SIZE]
      y_batch = y_train[i_minibatch:i_minibatch + BATCH_SIZE]
      
      loss, y = model.forward_pass(X_batch, y_batch)
      delta = model.backward_pass()
            
      # Weight updates
      for l in model.layers:
        if type(l) is Layer:
          l.w += GAMMA*LEARNING_RATE * l.d_w
          l.b += GAMMA*LEARNING_RATE * l.d_b

    print('Epoch:', i_epoch, 'loss:', loss)
    
    if config['early_stop']:
      loss_valid, _ = model.forward_pass(X_valid, y_valid)
      if loss_valid < min_loss:
        min_loss = loss_valid
        best_weight_layers = model.layers
      
      if loss_valid > prev_loss:
        consecutive_epochs += 1
        if consecutive_epochs == EPOCHS_THRESHOLD:
          model.layers = best_weight_layers
          print('Stop training as validation loss increases for {} epochs'.format(EPOCHS_THRESHOLD))
          return
      else: 
        consecutive_epochs = 0
      
      prev_loss = loss_valid
        
      
  
def test(model, X_test, y_test, config):
  """
  Write code to run the model on the data passed as input and return accuracy.
  """
  _, logits = model.forward_pass(X_test)
  predictions = np.argmax(logits, axis=1) # inds that we made predictions (0,1,2,3, ...)
  
  # convert y_test from one-hot to actual target inds
  targets = y_test.nonzero()[1]
  accuracy = (predictions == targets).sum()/len(targets)
  return accuracy

if __name__ == "__main__":
  train_data_fname = 'data/MNIST_train.pkl'
  valid_data_fname = 'data/MNIST_valid.pkl'
  test_data_fname = 'data/MNIST_test.pkl'
  
  ### Train the network ###
  model = Neuralnetwork(config)
  X_train, y_train = load_data(train_data_fname)
  X_valid, y_valid = load_data(valid_data_fname)
  X_test, y_test = load_data(test_data_fname)
  trainer(model, X_train, y_train, X_valid, y_valid, config)
  test_acc = test(model, X_test, y_test, config)

N Batches:  250 Batch size 200


HBox(children=(IntProgress(value=0, max=50), HTML(value='')))

Epoch: 0 loss: 0.6553940811135778
Epoch: 1 loss: 0.5601473517941171
Epoch: 2 loss: 0.4862528791589618
Epoch: 3 loss: 0.4452262810945276
Epoch: 4 loss: 0.42111847408171
Epoch: 5 loss: 0.41542858381066183
Epoch: 6 loss: 0.3858189624478935
Epoch: 7 loss: 0.34932670438611163
Epoch: 8 loss: 0.31427097354997374
Epoch: 9 loss: 0.29282778464482057
Epoch: 10 loss: 0.2707385761536318
Epoch: 11 loss: 0.25947934031121705
Epoch: 12 loss: 0.24750761272356314
Epoch: 13 loss: 0.23617834719924566
Epoch: 14 loss: 0.23574686315535828
Epoch: 15 loss: 0.2364457681458223
Epoch: 16 loss: 0.2391321238810243
Epoch: 17 loss: 0.23064789175290315
Epoch: 18 loss: 0.22140325615770487
Epoch: 19 loss: 0.21574885869364505
Epoch: 20 loss: 0.21202004932558474
Epoch: 21 loss: 0.20447383909091033
Epoch: 22 loss: 0.19332376835154044
Epoch: 23 loss: 0.1856987126102387
Epoch: 24 loss: 0.180413119625561
Epoch: 25 loss: 0.17239145695746624
Epoch: 26 loss: 0.16590773335400438
Epoch: 27 loss: 0.15717924787377793
Epoch: 28 loss: 

In [14]:
test_acc

0.9126

## b) Check with gradient approximation

### Bias

In [187]:
X_test, y_test = load_data(test_data_fname)

EPSILON = 1e-9

def test_approximation_bias(model, layer_ind, unit_ind, epsilon=1e-2):
  sample_test = np.random.randint(len(X_test))
  X_sample = X_test[0,:].reshape((1,-1))
  y_sample = y_test[0,:].reshape((1,-1))

  original_w = model.layers[layer_ind].b[0,unit_ind]
  
  # Approximation
  model.layers[layer_ind].b[0,unit_ind] = original_w + epsilon
  w_plus_epsilon_loss, _ = model.forward_pass(X_sample, y_sample)
  model.layers[layer_ind].b[0,unit_ind] = original_w - epsilon
  w_minus_epsilon_loss, _ = model.forward_pass(X_sample, y_sample)
  
  estimated_dE_dw = (w_plus_epsilon_loss - w_minus_epsilon_loss)/(2*epsilon)
  
  # Restore
  model.layers[layer_ind].b[0,unit_ind] = original_w
  
  # Actual BPP gradient
  model.forward_pass(X_sample, y_sample)
  model.backward_pass()
  bpp_dE_dw = -model.layers[layer_ind].d_b[unit_ind]/len(X_sample)
  
  return abs(estimated_dE_dw - bpp_dE_dw), abs(estimated_dE_dw - bpp_dE_dw) <= 9*epsilon**2
  
# BIAS CHECK
checks = []
model = Neuralnetwork(config)
for layer_i in [0, 2, 4]:
  for unit_i in range(model.layers[layer_i].b.shape[1]):
    raw_diff, is_close_enough = test_approximation_bias(model, layer_i, unit_i, 1e-4)
    checks.append(is_close_enough)
    
print(sum(checks)/len(checks))

0.8285714285714286


### Weights

In [171]:
X_test, y_test = load_data(test_data_fname)

def test_approximation_weight(model, layer_ind, unit_in, unit_out, epsilon=1e-2):
  sample_test = np.random.randint(len(X_test))
  X_sample = X_test[sample_test,:].reshape((1,-1))
  y_sample = y_test[sample_test,:].reshape((1,-1))

  original_w = model.layers[layer_ind].w[unit_in, unit_out]
  
  # Approximation
  model.layers[layer_ind].w[unit_in, unit_out] = original_w + epsilon
  w_plus_epsilon_loss, _ = model.forward_pass(X_sample, y_sample)
  model.layers[layer_ind].w[unit_in, unit_out] = original_w - epsilon
  w_minus_epsilon_loss, _ = model.forward_pass(X_sample, y_sample)
  
  estimated_dE_dw = (w_plus_epsilon_loss - w_minus_epsilon_loss)/(2*epsilon)
  
  # Restore
  model.layers[layer_ind].w[unit_in, unit_out] = original_w
  
  # Actual BPP gradient
  model.forward_pass(X_sample, y_sample)
  model.backward_pass()
  bpp_dE_dw = -model.layers[layer_ind].d_w[unit_in, unit_out]/len(X_sample)
  
  return abs(estimated_dE_dw - bpp_dE_dw), abs(estimated_dE_dw - bpp_dE_dw) <= 9*epsilon**2
  
# BIAS CHECK
checks = []
model = Neuralnetwork(config)
for layer_i in tqdm([0, 2, 4]):
  for unit_in in tqdm(range(min(model.layers[layer_i].w.shape[0], 100))):
    for unit_out in range(min(model.layers[layer_i].w.shape[1], 100)):
      raw_diff, is_close_enough = test_approximation_weight(model, layer_i, unit_in, unit_out, 1e-6)
      checks.append(is_close_enough)
    
print(sum(checks)/len(checks))

HBox(children=(IntProgress(value=0, max=3), HTML(value='')))

HBox(children=(IntProgress(value=0), HTML(value='')))

HBox(children=(IntProgress(value=0), HTML(value='')))

HBox(children=(IntProgress(value=0), HTML(value='')))

0.3242857142857143
