# Colab Notebook for Neuromorphic Project

## Execution functions
- Includes main function, import statements and additional functions required for the program execution (such as accuracy calculator, testing/validation functions, etc)

In [None]:
import pickle as cPickle, gzip, numpy
import numpy as np
import torch
import torch.utils.data as td
import torch.nn.functional as F
import time, random
import matplotlib.pyplot as plt
from typing import Tuple
from torch import nn
from torch.utils.data.dataset import TensorDataset
from torchvision import datasets, transforms
import logging
logging.basicConfig(filename='std.log', filemode='w', level=logging.DEBUG)

# Setting random seed for reproducability
manualSeed = 5000
device = 'cuda' if torch.cuda.is_available() else 'cpu'
def set_seed():
  if device == 'cuda':
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.enabled = False 
  torch.manual_seed(manualSeed)
  np.random.seed(manualSeed)
  random.seed(manualSeed)

sparsity = 0

In [None]:
# Loading the datasets
def data_processing(cnn):
  f = gzip.open('mnist.pkl.gz', 'rb')
  train_set, valid_set, test_set = cPickle.load(f, encoding='latin1') 
  f.close()
  # Converting dataset from numpy to Tensor
  train_t = torch.from_numpy(train_set[0])
  valid_t = torch.from_numpy(valid_set[0])
  test_t  = torch.from_numpy(test_set[0])
  # Converting labels to Tensor
  train_label= torch.from_numpy(train_set[1])
  valid_label= torch.from_numpy(valid_set[1])
  test_label = torch.from_numpy(test_set[1])
  if cnn:
    # resize and normalize
    train_t = train_t.reshape((train_t.shape[0], 1, 28, 28))
    valid_t = valid_t.reshape((valid_t.shape[0], 1, 28, 28))
    test_t = test_t.reshape((test_t.shape[0], 1, 28, 28))
    input_shape = (1, 28, 28)
  # Wrapping it
  trainset = TensorDataset(train_t, train_label)
  validset = TensorDataset(valid_t, valid_label)
  testset = TensorDataset(test_t, test_label)

  return trainset, validset, testset


In [None]:
# Loss Curve grapher
def loss_curve(train_loss, val_loss, y_range, name):
  plt.figure(figsize=(10,5))
  plt.xlabel("Epochs")
  plt.ylabel("Loss")
  plt.title(name)
  plt.plot(train_loss, label="Training Loss")
  plt.plot(val_loss, label="Validation Loss")
  plt.legend()
  plt.savefig(name+".png", bbox_inches='tight', pad_inches=0.1)
  plt.show()


In [None]:
# Accuracy Calculator
def acc_calc(pred, actual):
  best = pred.argmax(1)
  comp = best.eq(actual.view_as(best)).float().sum()
  return comp


In [None]:
# Training function
def train_loop(data_loader, NN_model, loss_fn, optimizer):
  size = len(data_loader.dataset)
  loss_val = 0
  loss, acc = 0,0
  NN_model.train()
  for batch, (X, y) in enumerate(data_loader):
    if torch.cuda.is_available():
      X = X.cuda()
      y = y.cuda()
    # Compute prediction and loss
    pred = NN_model(X)
    loss = loss_fn(pred, y)
    
    # Backpropagation
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    # Accuracy
    acc = acc+acc_calc(nn.Softmax(dim=1)(pred), y)
    loss_val = loss_val+loss.item()
  return loss_val/size, (acc/size)


In [None]:
# Function for validation and testing
def eval_loop(data_loader, model, loss_fn):
  size = len(data_loader.dataset)
  loss, acc, loss_val = 0, 0, 0
  model.eval()
  torch.no_grad() 
  for batch, (X, y) in enumerate(data_loader):
    if torch.cuda.is_available():
      X = X.cuda()
      y = y.cuda()
    
    target = model(X)
    loss = loss_fn(target, y).item()
    # Accuracy
    acc = acc+acc_calc(nn.Softmax(dim=1)(target), y)
    loss_val = loss_val+loss

  return loss_val/size, (acc/size)


In [None]:
# Primary function for execution of the neural networks
def nn_run(nn_model, trainset, validset, testset, batch, learn_rate, epoch_max, mom, model, decay):
  
  # Loading datasets
  train_loader = td.DataLoader(trainset, batch_size=batch, shuffle=True, worker_init_fn=np.random.seed(manualSeed),num_workers=0,pin_memory=True)
  valid_loader = td.DataLoader(validset, batch_size=batch, shuffle=True, worker_init_fn=np.random.seed(manualSeed),num_workers=0,pin_memory=True)
  test_loader  = td.DataLoader(testset, batch_size=batch, shuffle=True, worker_init_fn=np.random.seed(manualSeed),num_workers=0,pin_memory=True)
  print(test_loader)

  # Loading neural network model to device
  loss_fn = nn.CrossEntropyLoss()
  optimizer = torch.optim.SGD(nn_model.parameters(), lr=learn_rate, momentum=mom, weight_decay=decay)
  # optimizer = torch.optim.SGD(nn_model.parameters(), lr=learn_rate, weight_decay=decay)
  scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=60, gamma=0.1)

  print(f'\n=============={model}==============')
  print(f'Starting training using {device} device...')
  timer = time.perf_counter()
  valid_loss = 1
  valid_loss_list = []
  train_loss_list = []
  valid_acc_list = []
  train_acc_list = []
  for epoch in range(epoch_max):
    train_loss, train_acc = train_loop(train_loader, nn_model, loss_fn, optimizer)
    loss_temp, acc_temp = eval_loop(valid_loader, nn_model, loss_fn)

    valid_loss_list.append(loss_temp)
    train_loss_list.append(train_loss)
    valid_acc_list.append(acc_temp.cpu())
    train_acc_list.append(train_acc.cpu())
    
    if loss_temp < valid_loss:
      valid_acc = acc_temp
      valid_loss = loss_temp
    print("", end=f"\rEpoch {epoch+1} ({round((epoch+1)/epoch_max * 100, 2)}%) complete. Runtime: {round(time.perf_counter()-timer, 2)}s      ")


  timer = round(time.perf_counter()-timer, 2)

  print('\n\n--------Training complete.--------')
  print(f'Model                    : {model}')
  print(f'Training loss            : {train_loss:>7f}')
  print(f'Batch Size               : {batch}')
  print(f'Training accuracy        : {np.round(train_acc.cpu().detach().numpy()*100, 3)}%') if device == 'cuda' else print(f'Training accuracy        : {np.round(train_acc.detach().numpy()*100, 3)}%')
  print(f'Training time            : {int(timer/60)}m {int(timer%60)}s')
  print(f'Best validation loss     : {valid_loss:>7f}')
  print(f'Best validation accuracy : {np.round(valid_acc.cpu().detach().numpy()*100, 3)}%') if device == 'cuda' else print(f'Best validation accuracy : {np.round(valid_acc.detach().numpy()*100, 3)}%')
  print(f'Number of epochs         : {epoch_max}')
  print(f'Learning rate            : {learn_rate}')
  print(f'Training dataset size    : {len(train_loader.dataset)}')
  print(f'Device                   : {torch.cuda.get_device_name(device)}')  if device=='cuda' else print(f'Device                   : {device}')
  print('----------------------------------')
  
  #=========TESTING=========
  print(f'\nTesting using {device} device.')
  timer = time.perf_counter()
  test_loss, test_acc = eval_loop(test_loader, nn_model, loss_fn)
  timer = round(time.perf_counter()-timer, 2)

  print('--------Test complete.--------')
  print(f'Model       : {model}')
  print(f'Device      : {device}')
  print(f'Loss        : {test_loss:>7f}')
  print(f'Accuracy    : {np.round(test_acc.cpu().detach().numpy()*100, 3)}%') if device == 'cuda' else print(f'Accuracy    : {np.round(test_acc.detach().numpy()*100, 3)}%')
  print(f'Runtime     : {timer}s')
  print(f'Dataset size: {len(test_loader.dataset)}')
  print('------------------------------\n')
  
  loss_curve(train_loss_list, valid_loss_list, epoch_max, model+" Loss Curve")
  loss_curve(train_acc_list, valid_acc_list, epoch_max, model+" Accuracy Curve")

  # Returning test accuracy to main function
  return np.round(test_acc.cpu().detach().numpy()*100, 3) if device == 'cuda' else np.round(test_acc.detach().numpy()*100, 3)


### Neural Network Definition

In [None]:
# Defining the convolutional neural network (CNN)
class CNN(nn.Module):
  def __init__(self):
    super(CNN, self).__init__()
    first_layer_features = 16
    second_layer_features = 32
    first_layer_sparsity = 0
    second_layer_sparsity = 0.5
    wbit = 4
    abit = 4
    self.cnn_layers = nn.Sequential(
      # Defining first custom 2D convolution layer
      QConv2d(1, first_layer_features, kernel_size=3, stride=1, padding=1, wbit=wbit, abit=abit, sparsity_fraction=first_layer_sparsity),
      nn.ReLU(inplace=True),
      nn.MaxPool2d(kernel_size=2, stride=2),
      # Defining second custom 2D convolution layer
      QConv2d(first_layer_features, second_layer_features, kernel_size=3, stride=1, padding=1, wbit=wbit, abit=abit, sparsity_fraction=second_layer_sparsity),
      nn.ReLU(inplace=True),
      nn.MaxPool2d(kernel_size=2, stride=2),
    )
    # Defining fully-connected linear layer
    self.linear_layers = nn.Sequential(
      QLinear(second_layer_features * 7 * 7, 10)
    )

  # Defining the forward pass    
  def forward(self, x):
    x = self.cnn_layers(x)
    x = x.view(x.size(0), -1)
    x = self.linear_layers(x)
    return x

## Quantized Convolutional Layer


In [None]:
# Defining quantization nodes
class QSTE(torch.autograd.Function):
  @staticmethod
  def forward(ctx, x, scale):
    # Quantization
    x = x / scale
    x = torch.round(x)
    # Dequantization
    xdeq = x * scale
    return xdeq

  @staticmethod
  def backward(ctx, grad_output):
    return grad_output, None


# Define your convolutional layer
class QConv2d(nn.Conv2d):
  def __init__(self, in_channels, out_channels, kernel_size, stride=1, padding=0, dilation=1, groups=1, bias=False, padding_mode='zeros', device=None, dtype=None, wbit=4, abit=4, sparsity_fraction=0.333):
      super().__init__(in_channels, out_channels, kernel_size, stride=stride, padding=padding, dilation=dilation, groups=groups, bias=bias, padding_mode=padding_mode, device=device, dtype=dtype)

      self.register_buffer('mask', torch.ones(self.weight.size()))
      self.wbit = wbit
      self.abit = abit
      self.sparsity_fraction = sparsity_fraction
      global sparsity

  # Weight Quantization
  def wquant(self):
    # Defining quantization boundaries
    self.alpha_w = 2*self.weight.abs().mean()
    wc = self.weight.clamp(-self.alpha_w, self.alpha_w)
    # Calculating scaling factor
    scaling_factor = (self.alpha_w) / (2**(self.wbit-1)-1)
    # step 2: quantization
    wq = QSTE.apply(wc, scaling_factor)
    return wq
  
  # Input Quantization
  def xquant(self, x):
    # Defining quantization boundaries: 
    self.alpha_w_upper = 6
    self.alpha_w_lower = 0
    xc = x.clamp(self.alpha_w_lower, self.alpha_w_upper)
    # Calculating scaling factor
    scaling_factor = (self.alpha_w_upper - self.alpha_w_lower) / (2**self.abit-1)
    # step 2: quantization
    xq = QSTE.apply(xc, scaling_factor)
    return xq

  def forward(self, x):
    # Quantizing weights
    wq = self.wquant()
    # Quantizing inputs
    xq = self.xquant(x)
    # Generating mask for pruning weights
    mask = prune_net(wq, sparsity=self.sparsity_fraction)
    reset = torch.ones_like(wq)
    mask = mask[:, None, None, None].mul(reset)
    wq_masked = wq.mul(mask)
    self.mask = mask
    # Running PyTorch's convolution function with quantized values
    Y = F.conv2d(xq, wq.mul(self.mask), self.bias, self.stride, self.padding, self.dilation, self.groups)
    return Y


## Quantized Linear Layer

In [None]:
# Defining the linear layer
class QLinear(nn.Linear):
  def __init__(self, in_channels, out_channels, bias=False, wbit=4, abit=4):
      super().__init__(in_channels, out_channels, bias=bias)
      self.wbit = wbit
      self.abit = abit

  # Weight quantization
  def wquant(self):
    # Setting quantization boundary: 
    self.alpha_w = 2*self.weight.abs().mean()
    wc = self.weight.clamp(-self.alpha_w, self.alpha_w)
    # Calculating scaling factor
    scaling_factor = (self.alpha_w) / (2**(self.wbit-1)-1)
    # Weight quantization
    wq = QSTE.apply(wc, scaling_factor)
    return wq

  # Input quantization
  def xquant(self, x):
    # Defining quantization boundaries
    self.alpha_w_upper = 6
    self.alpha_w_lower = 0
    xc = x.clamp(self.alpha_w_lower, self.alpha_w_upper)
    # Calculating scaling factor
    scaling_factor = (self.alpha_w_upper - self.alpha_w_lower) / (2**self.abit-1)
    # Input quantization
    xq = QSTE.apply(xc, scaling_factor)
    return xq

  def forward(self, x):
    # Quantizing weights
    wq = self.wquant()
    # Quantizing inputs
    xq = self.xquant(x)
    # Running PyTorch's linear function with quantized values
    Y = F.linear(xq, wq, self.bias)
    return Y


## Pruning

In [None]:
# Code for pruning
def prune_net(W, sparsity):
  W1 = W.clone()
  wg = W1.contiguous().view(W1.size(0), W1.size(1)*W1.size(2)*W1.size(3))
  # Step2: Compute the norm of each group
  wnorm = wg.norm(dim=1)
  # Step 3: Sort the score and remove lowest
  wn_sorted, indx = torch.sort(wnorm)
  # Step 4: Given the sparsity, remove the group
  thres_index = int(sparsity * len(wn_sorted)) # Finding threshold index, given sparsity
  threshold = wn_sorted[(thres_index-1)] if thres_index != 0 else wn_sorted[(thres_index)]  # Finding threshold
  mask = wnorm.gt(threshold).float() # Generating mask
  return mask

# Use this for a variable sparsity variation scheduler
def get_sparsity(t, n):
  si = 0
  sf = 0.5
  t0 = 0
  del_t = 1
  st = sf + (si-sf)*((1-((t-t0)/(n*del_t)))*(1-((t-t0)/(n*del_t)))*(1-((t-t0)/(n*del_t))))
  global sparsity
  sparsity = st


# Main Function

In [None]:
# Main Function
if __name__ == '__main__':
  batch = 128
  epoch = 100
  learn_rate = 0.005
  momentum = 0.9
  # Obtaining dataset
  trainset, validset, testset = data_processing(cnn=True)

  #========= Convolutional Neural Network =========
  set_seed()
  model = CNN()
  if torch.cuda.is_available():
    model = model.cuda()
  nn_run(model, trainset, validset, testset, batch, learn_rate, epoch, momentum, "CNN", decay=1e-4)
  # Saving model to memory
  torch.save(model.state_dict(), "trained_network.pth.tar")
    