In [1]:
import torch
import torch.nn.functional as F
from torchvision import datasets, transforms
from naive_cifar import NaiveModel
from nni.algorithms.compression.pytorch.quantization import QAT_Quantizer
from nni.compression.pytorch.quantization.settings import set_quant_scheme_dtype
import numpy as np
import pandas as pd

  warn(


In [6]:
# Custom test model function
# Only implements forward pass of neural network - no training done.
def test_custom(model, device, test_loader, num_bits):
    model.eval()
    test_loss = 0
    correct = 0
    i = 1
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            batch = data.size()[0]

            # Custom forward pass function
            non_linear_model = forward_custom(model, data, batch, num_bits, 3, 32, 32, 5, 1, 64, 64, device)
            output = non_linear_model.forward_pass()
            test_loss += F.nll_loss(output, target, reduction='sum').item()
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()
            print(f"ITERATION {i}: Accuracy = {correct/i:.2f} (cumulative = {correct})")
            i = i+1
    test_loss /= len(test_loader.dataset)

    print('Loss: {}  Accuracy: {}%)\n'.format(
        test_loss, 100 * correct / len(test_loader.dataset)))


# Test original/default model forward pass.
def test(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += F.nll_loss(output, target, reduction='sum').item()
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()
    test_loss /= len(test_loader.dataset)

    print('Loss: {}  Accuracy: {}%)\n'.format(
        test_loss, 100 * correct / len(test_loader.dataset)))


def main():
    torch.set_default_dtype(torch.float32)
    torch.manual_seed(0)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # INT8 weight, INT8 activations
    num_bits = 8
    # Make sure this matches quantization config from CIFAR10_CNN_Training
    configure_list = [{
        'quant_types': ['weight', 'input'],
        'quant_bits': {'weight': num_bits, 'input': num_bits},
        'quant_start_step': 2,
        'op_names': ['conv1', 'conv2']
    }, {
        'quant_types': ['output'],
        'quant_bits': {'output': num_bits},
        'quant_start_step': 2,
        'op_names': ['relu1', 'relu2', 'relu3']
    }, {
        'quant_types': ['output', 'weight', 'input'],
        'quant_bits': {'output': num_bits, 'weight': num_bits, 'input': num_bits},
        'quant_start_step': 2,
        'op_names': ['fc1', 'fc2'],
    }]

    set_quant_scheme_dtype('weight', 'per_tensor_symmetric', 'int')
    set_quant_scheme_dtype('output', 'per_tensor_symmetric', 'int')
    set_quant_scheme_dtype('input', 'per_tensor_symmetric', 'int')

    # Load CIFAR-10 dataset with train/test split sets.
    trans = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
    train_loader = torch.utils.data.DataLoader(
        datasets.CIFAR10('data', train=True, download=True, transform=trans),
        batch_size=64, shuffle=True)
    test_loader = torch.utils.data.DataLoader(
        datasets.CIFAR10('data', train=False, transform=trans),
        batch_size=100, shuffle=True)


    # Create a NaiveModel object and apply QAT_Quantizer setup
    model_path = "cifar_model.pth"
    qmodel = NaiveModel().to(device)
    dummy_input = torch.randn(1, 3, 32, 32).to(device)
    optimizer = torch.optim.SGD(qmodel.parameters(), lr=0.01, momentum=0.5)
    # To enable batch normalization folding in the training process, you should
    # pass dummy_input to the QAT_Quantizer.
    quantizer = QAT_Quantizer(qmodel, configure_list, optimizer, dummy_input=dummy_input)
    quantizer.compress()

    # Load trained model (from CIFAR10_CNN_Training step).
    state = torch.load(model_path, map_location=device)
    qmodel.load_state_dict(state, strict=True)
    qmodel.eval()

    # Evaluate test accuracy with imported quantized model (qmodel) from cifar_model.pth.
    test(qmodel, device, test_loader)

    # Evaluate test accuracy of qmodel, with custom forward pass.
    test_custom(qmodel, device, test_loader, num_bits)




class forward_custom:
  def __init__(self, model, data, batch, num_bits, ifmap,
               ofmap, idim, knl, stride, ncol, nrow, device):
    self.model = model
    self.data = data
    self.batch = batch
    self.num_bits = num_bits
    self.ifmap = ifmap        # Image (colour CIFAR-10)
    self.ofmap = ofmap        # Output maps (from applying ofmap knl x knl weight kernels)
    self.idim = idim          # Input image dimensions
    self.knl = knl            # Kernel dimensions
    self.stride = stride      # Convolutional stride
    self.ncol = ncol          # Hardware matrix-vector-multiplication (MVM) columns (corresponds to number of output ADCs)
    self.nrow = nrow          # Hardware MVM rows (corresponds to number of input DACs)
                              # In-memory computing hardware therefore,
                              # has ncol*nrow weights.
    self.nl_mult = self.read_non_idealities('../HardwareSpec/8x8_Mac_result_final.xlsx')
    self.device = device


  def read_non_idealities(self, file_name):
    return pd.read_excel(file_name, index_col=0).values

  def forward_pass(self):
    # Scale input
    s_in, x = self.scale_quant(self.data, self.num_bits)

    # Convolutional layer 1
    x = self.conv2d(x, self.model.conv1.module, s_in, self.idim, self.ifmap, self.ofmap)

    # ReLU 1
    x = self.relu6(x, 0, 6, self.model.relu1.module)

    # Maxpool layer 1
    x = self.maxpool2d(x, self.idim - self.knl + 1, self.ofmap, 2)

    # Scale for convolutional layer 2
    sin_conv2 = self.model.conv2.module.input_scale
    x = self.noscale_quant(x, sin_conv2.cpu(), 0, self.num_bits)
    x = self.conv2d(x, self.model.conv2.module, sin_conv2, (self.idim-self.knl+1)/2, self.ofmap, 64)

    # ReLU 2
    x = self.relu6(x, 0, 6, self.model.relu2.module)

    # Maxpool layer 2
    x = self.maxpool2d(x, ((self.idim - self.knl+1)/2) - self.knl + 1, 64, 2)

    # Fully connected layer 1
    x = x.view(-1, x.size()[1:].numel())  # Flatten outputs of out_maxpool2 layer to feed to FC layers.
    x = self.fc(x, self.model.fc1.module, 128) # 128 outputs (32*4*4 = 1600 - > 128)

    # ReLU 3
    x = self.relu6(x, 0, 6, self.model.relu3.module)

    # Fully connected layer 2
    x = self.fc(x.cpu(), self.model.fc2.module, 10)

    # Softmax layer
    return F.log_softmax(x, dim=1)

  def conv2d(self, x, conv, s_in, idim, ifmap, ofmap):
    # Scale input image, conv weight and bias to bits used
    sw_conv, filters_conv = self.scale_quant(conv.weight.cpu(), self.num_bits)
    bias_conv = conv.bias / (s_in * sw_conv)

    # CONV layer (WSAB dataflow - see convolve2D_wsab function)
    out_conv = self.convolve2D_wsab(x, filters_conv, bias_conv, 0, self.stride, self.batch, ifmap, ofmap, idim, self.knl, self.ncol, self.nrow)
    out_conv = torch.from_numpy(out_conv)
    out_conv = out_conv.to(self.device)

    # Applying scaling normalization s_in*sw_conv (convert back to float)
    return out_conv * s_in * sw_conv

  def convolve2D_wsab(self, image, kernel, bias, padding, strides, batch, ifmap, ofmap, idim, knl, ncol, nrow):
    xKernShape = knl
    yKernShape = knl
    xImgShape = idim
    yImgShape = idim

    # Number of weight block partitions to fit into hardware
    block_col = int(np.ceil(ofmap/ncol))
    block_row = int(np.ceil(knl*knl*ifmap/nrow))
    kernel_flat = np.zeros((block_col*ncol, block_row*nrow))

    # Shape of Output Convolution
    xOutput = int(((xImgShape - xKernShape + 2 * padding) / strides) + 1)
    yOutput = int(((yImgShape - yKernShape + 2 * padding) / strides) + 1)
    output = np.zeros((batch, ofmap, xOutput, yOutput))

    kernel_flat[0:ofmap,0:ifmap*knl*knl] = torch.reshape(kernel, [ofmap, ifmap*knl*knl]).cpu().numpy()

    # Process image by image
    for b in range(batch):
        # Apply Equal Padding to All Sides
        if padding != 0:
            imagePadded = np.zeros((idim + padding * 2, idim + padding * 2))
            imagePadded[int(padding):int(-1 * padding), int(padding):int(-1 * padding)] = image[b]
        else:
            imagePadded = image[b]

            # Iterate through image

        image_block = np.zeros((block_row * nrow, xOutput, yOutput))
        otemp = torch.zeros(block_row, block_col*ncol, xOutput, yOutput)
        for bc in range(block_col):
            for br in range(block_row):
                ktemp = torch.zeros(ncol,nrow)
                ktemp = kernel_flat[ncol*bc:(bc+1)*ncol, nrow*br:(br+1)*nrow]
                ktemp = torch.from_numpy(ktemp)

                for y in range(yOutput):
                    for x in range(xOutput):
                        # Fetch image section x,y, bc,br
                        image_block[0:knl * knl * ifmap, x, y] = imagePadded[0:ifmap,
                                                                 strides * x: strides * x + xKernShape,
                                                                 strides * y: strides * y + yKernShape].reshape(
                            knl * knl * ifmap).cpu().numpy()

                        itemp = image_block[br*nrow:(br+1)*nrow, x, y].reshape(1, nrow)
                        itemp = torch.from_numpy(itemp)


                        # Replace this line with Arduino SPI function call
                        #otemp[br, bc*ncol:(bc+1)*ncol, x, y] = torch.sum(ktemp*itemp, dim=1)
                        otemp[br, bc*ncol:(bc+1)*ncol, x, y] = self.non_linear_mult(ktemp, itemp[0])
                        #print("Batch:%d,BROW:%d,BCOL:%d,X:%d,Y:%d" % (b,br,bc,x,y))
        output[b] = torch.sum(otemp[:, 0:ofmap, :, :], dim=0) + (bias.reshape(ofmap, 1, 1).cpu()*torch.ones(xOutput, yOutput)).detach().numpy()

    return output

  def relu6(self, x, min_val, max_val, relu_module):
    i = (x >= min_val) * x
    out_relu = (i <= max_val) * (i - max_val) + max_val
    so_relu = relu_module.output_scale

    # Apply fake quantization to relu1 output.
    out_relu = self.noscale_quant(out_relu, so_relu, 0, self.num_bits)
    return self.dequantize(out_relu, so_relu, 0)

  def maxpool2d(self, x, idim, ofmap, knl):
    # Maxpool layer - downsample by knl x knl with maxpool (no quantization required for max function)
    return torch.from_numpy(self.maxpool2D_wsa(self.batch, x, idim, ofmap, knl))

  # Maxpool layer (apply to all ofmaps simultaneously - faster!)
  def maxpool2D_wsa(self, batch, image, idim, ofmap, knl):
    xKernShape = knl
    yKernShape = knl
    xImgShape = idim
    yImgShape = idim
    strides = knl
    padding = 0
    # Shape of Output Convolution
    xOutput = int(((xImgShape - xKernShape + 2 * padding) / strides) + 1)
    yOutput = int(((yImgShape - yKernShape + 2 * padding) / strides) + 1)
    output = np.zeros((batch, ofmap, xOutput, yOutput))

    for b in range(batch):
        # Apply Equal Padding to All Sides
        if padding != 0:
            imagePadded = np.zeros((idim + padding * 2, idim + padding * 2))
            imagePadded[int(padding):int(-1 * padding), int(padding):int(-1 * padding)] = image[b]
        else:
            imagePadded = image[b]

        # Iterate through image
        for y in range(yOutput):
            for x in range(xOutput):
                output[b, :, x, y] = torch.amax(
                        imagePadded[:, strides * x: strides * x + xKernShape, strides * y: strides * y + yKernShape], dim=(1,2)).detach().cpu().numpy()
    return output

  def fc(self, x, module, odim):
    sin_fc = module.input_scale.cpu()
    in_fcs = self.noscale_quant(x, sin_fc, 0, self.num_bits)
    sw_fc, filters_fc = self.scale_quant(module.weight.cpu(), self.num_bits)
    bias_fc = module.bias.cpu() / (sw_fc * sin_fc)


    # FC layer using (WSA dataflow)
    # N.B. Need to implement WSAB function to implement 'hardware acceleration' of FC layer.
    out_fc = torch.from_numpy(self.fc_custom_wsa(in_fcs, filters_fc, bias_fc, self.batch, odim))

    # FC output scaling
    out_fcs = out_fc * sin_fc * sw_fc

    so_fc = module.output_scale.cpu()
    # FC output fake quantization
    return self.dequantize(self.noscale_quant(out_fcs, so_fc, 0, self.num_bits), so_fc, 0).to(self.device)

  # 'Fake' Quantization function [Jacob et. al]
  def quantize(self, real_value, scale, zero_point, qmin, qmax):
    transformed_val = zero_point + real_value / scale
    clamped_val = torch.clamp(transformed_val, qmin, qmax)
    quantized_val = torch.round(clamped_val)
    return quantized_val

  # 'Fake' Dequantization function [Jacob et. al]
  def dequantize(self, quantized_val, scale, zero_point):
    real_val = scale * (quantized_val - zero_point)
    return real_val

  # Scaling function (Jacob et. al)
  def scale_quant(self, real_value, num_bits):
    qmin = -(2 ** (num_bits - 1) - 1)
    qmax = 2 ** (num_bits - 1) - 1
    abs_max = torch.abs(real_value).max()
    scale = abs_max / (float(qmax - qmin) / 2)
    zero_point = 0
    quant = self.quantize(real_value, scale, zero_point, qmin, qmax)
    return scale, quant

  # Scaling function (Jacob et. al)
  def noscale_quant(self, real_value, scale, zero_point, num_bits):
    qmin = -(2 ** (num_bits - 1) - 1)
    qmax = 2 ** (num_bits - 1) - 1
    quant = self.quantize(real_value, scale, zero_point, qmin, qmax)
    return quant

  # Custom FC layer (vector multiplication style - much faster than for loop implementation)
  def fc_custom_wsa(self, fc_input, filters, bias, batch, ofmap):
    output = np.zeros((batch, ofmap))
    for b in range(batch):
        outputi = np.zeros(ofmap)
        input_batch = fc_input[b]
        outputi = torch.sum((input_batch * filters),dim=1) + bias
        output[b, :] = outputi.detach().numpy()
    return output


  # Non-linear jazz
  def non_linear_conv(self, kernel, image):
    sum = 0
    for i in range(len(kernel)):
      sum = sum + self.nl_mult[int(kernel[i]+15)][int(image[i]+15)]
    return sum

  def non_linear_mult(self, kernel, image):
    output = np.empty(len(kernel))

    for i in range(len(kernel)):
      output[i] = self.non_linear_conv(kernel[i, :].tolist(), image.tolist())

    return torch.from_numpy(output)

if __name__ == '__main__':
    main()

Files already downloaded and verified
Loss: 1.0538166458129883  Accuracy: 62.22%)

ITERATION 1: Accuracy = 54.0 (cumulative = 54)
ITERATION 2: Accuracy = 62.0 (cumulative = 124)
ITERATION 3: Accuracy = 65.66666666666667 (cumulative = 197)
ITERATION 4: Accuracy = 65.5 (cumulative = 262)
ITERATION 5: Accuracy = 63.2 (cumulative = 316)
ITERATION 6: Accuracy = 64.0 (cumulative = 384)
ITERATION 7: Accuracy = 63.57142857142857 (cumulative = 445)
ITERATION 8: Accuracy = 63.25 (cumulative = 506)
ITERATION 9: Accuracy = 62.888888888888886 (cumulative = 566)
ITERATION 10: Accuracy = 62.5 (cumulative = 625)
ITERATION 11: Accuracy = 62.36363636363637 (cumulative = 686)
