# AI6104 Mathematics for AI Assignment

In [None]:
from torchvision import datasets
import PIL.Image as Image
import math
import numpy as np

## MNIST Dataset

In [None]:
train_set = datasets.MNIST('./data', train=True, download=True)
test_set = datasets.MNIST('./data', train=False, download=True)

X_train = train_set.data.numpy().astype(np.float64)/255.0
y_train = train_set.targets.numpy()
X_test = test_set.data.numpy().astype(np.float64)/255.0
y_test = test_set.targets.numpy()
mean = X_train.mean()
std = X_train.std()

## Main Functions

In [None]:
def make_training_indices(data):
  train_indices = np.arange(data.shape[0])
  np.random.shuffle(train_indices)
  return train_indices

def conv2d(x, w, b, stride, padding):
  h_f, w_f, c_in, c_out = w.shape
  n, h_in, w_in, c_in = x.shape
  out_size = int((h_in - h_f + 2 * padding)/stride + 1)
  x_pad = np.pad(x, ((0,0), (1,1), (1,1), (0,0)))
  output = np.zeros((n, out_size,out_size, c_out))

  for i in range(out_size):
    for j in range(out_size):
      h_start = i * stride
      h_end = h_start + h_f
      w_start = j * stride
      w_end = w_start + w_f

      output[:, i, j, :] = np.sum(x_pad[:, h_start:h_end, w_start:w_end, :, np.newaxis] * w[np.newaxis, :, :, :], axis=(1, 2, 3))
  return x, output + b

def conv2d_backward(diff, x, w, b, stride, padding):
  h_f, w_f, c_in, c_out = w.shape
  n, h_in, w_in, c_in = x.shape
  x_pad = np.pad(x, ((0,0), (1,1), (1,1), (0,0)))
  diff_new = np.zeros(x_pad.shape)
  dw = np.zeros_like(w)
  db = diff.sum(axis = (0, 1, 2)) / diff.shape[0]

  for i in range(diff.shape[1]):
    for j in range(diff.shape[2]):
      h_start = i * stride
      h_end = h_start + h_f
      w_start = j * stride
      w_end = w_start + w_f
      diff_new[:, h_start:h_end, w_start:w_end, :] += np.sum(
        w[np.newaxis, :, :, :, :] *
        diff[:, i:i+1, j:j+1, np.newaxis, :],
        axis=4
      )
      dw += np.sum(
        x_pad[:, h_start:h_end, w_start:w_end, :, np.newaxis] *
        diff[:, i:i+1, j:j+1, np.newaxis, :],
        axis=0
      )

  dw /= diff.shape[0]
  w = w - lr * dw
  b = b - lr * db
  return diff_new[:, padding:padding+h_in, padding:padding+w_in, :], w, b

def relu(x):
  out = np.maximum(0, x)
  return out, out

def relu_backward(diff, mask):
  diff[mask <= 0] = 0
  return diff

def pool2d(x, pool_size, stride):
  n, h_in, w_in, c_in = x.shape
  out_size = int((h_in - pool_size)/stride + 1)
  out = np.zeros((n, out_size, out_size, c_in))
  masks = {}

  for i in range(out_size):
    for j in range(out_size):
      h_start = i * stride
      h_end = h_start + pool_size
      w_start = j * stride
      w_end = w_start + pool_size
      x_slice = x[:, h_start:h_end, w_start:w_end, :]

      mask = np.zeros_like(x_slice)
      idx = np.argmax(x_slice.reshape(n, pool_size * pool_size, c_in), axis = 1)
      n_idx, c_idx = np.indices((n, c_in))
      mask.reshape(n, pool_size * pool_size, c_in)[n_idx, idx, c_idx] = 1
      masks[(i,j)] = mask

      out[:, i, j, :] = np.max(x_slice, axis = (1, 2))
  return masks, out

def pool2d_backward(diff, mask, pool_size, stride, out_size):
  diff_next = np.zeros(out_size)

  for i in range(diff.shape[1]):
    for j in range(diff.shape[2]):
      h_start = i * stride
      h_end = h_start + pool_size
      w_start = j * stride
      w_end = w_start + pool_size
      diff_next[:, h_start:h_end, w_start:w_end, :] += diff[:, i:i+1, j:j+1, :] * mask[(i, j)]
    return diff_next

def linear(x, w, b):
  return x, np.dot(x, w.T) + b

def linear_backward(diff, x, w, b):
  diff_next = np.dot(diff, w)
  n = x.shape[0]
  dw = np.dot(diff.T, x) / n
  db = np.sum(diff, axis = 0, keepdims = True) / n
  w = w - lr * dw
  b = b - lr * db
  return diff_next, w, b

def softmax(x):
  e = np.exp(x - x.max(axis = 1, keepdims = True)) # Subtract maximum to prevent overflow/underflow
  return e / np.sum(e, axis = 1, keepdims = True)

def onehot(labels):
  one_hot = np.zeros((labels.shape[0], 10))
  one_hot[np.arange(labels.shape[0]), labels] = 1
  return one_hot

def forward_pass(X_batch):
  global conv1_x, relu1_mask, pool1_mask, conv2_x, relu2_mask, pool2_mask, ll_x
  conv1_x, x = conv2d(X_batch, conv1_w, conv1_b, conv_stride, conv_padding)
  relu1_mask, x = relu(x)
  pool1_mask, x = pool2d(x, pool_size, pool_stride)
  conv2_x, x = conv2d(x, conv2_w, conv2_b, conv_stride, conv_padding)
  relu2_mask, x = relu(x)
  pool2_mask, x = pool2d(x, pool_size, pool_stride)
  x = np.ravel(x).reshape(x.shape[0], -1)
  ll_x, x = linear(x, ll_w, ll_b)
  x = softmax(x)
  return x

def backward_pass(grad, n):
  global conv1_w, conv1_b, conv2_w, conv2_b, ll_w, ll_b
  grad, ll_w, ll_b = linear_backward(grad, ll_x, ll_w, ll_b)
  grad = grad.reshape(n, 7, 7, 100)
  grad = pool2d_backward(grad, pool2_mask, pool_size, pool_stride, (n, 14, 14, 100))
  grad = relu_backward(grad, relu2_mask)
  grad, conv2_w, conv2_b = conv2d_backward(grad, conv2_x, conv2_w, conv2_b, conv_stride, conv_padding)
  grad = pool2d_backward(grad, pool1_mask, pool_size, pool_stride, (n, 28, 28, 50))
  grad = relu_backward(grad, relu1_mask)
  grad, conv1_w, conv1_b = conv2d_backward(grad, conv1_x, conv1_w, conv1_b, conv_stride, conv_padding)

def crossentropyloss(y_hat, y):
  return - np.sum(y * np.log(np.clip(y_hat, 1e-20, 1.))) / y.shape[0]

def accuracy(y_hat, y):
  return (y_hat.astype(int) == y.astype(int)).all(axis = 1).mean()

## Network Parameters

In [None]:
conv_kernel_size = 3
conv_stride = 1
conv_padding = 1

pool_size = 2
pool_stride = 2

lr = 0.25
bs = 128
start_epoch = 1
total_epoch = 10

## Conv1

In [None]:
conv1_in_channels = 1
conv1_out_channels = 50

conv1_w = np.random.randn(conv_kernel_size, conv_kernel_size, conv1_in_channels, conv1_out_channels) * 0.1
conv1_b = np.random.randn(conv1_out_channels) * 0.1
conv1_x = None
relu1_mask = None
pool1_mask = None

## Conv2

In [None]:
conv2_in_channels = 50
conv2_out_channels = 100

conv2_w = np.random.randn(conv_kernel_size, conv_kernel_size, conv2_in_channels, conv2_out_channels) * 0.1
conv2_b = np.random.randn(conv2_out_channels) * 0.1
conv2_x = None
relu2_mask = None
pool2_mask = None

## Linear Layer

In [None]:
ll_in_channels = 4900
ll_out_channels = 10

ll_w = np.random.randn(ll_out_channels, ll_in_channels) * 0.1
ll_b = np.random.randn(1, ll_out_channels) * 0.1
ll_x = None

##  Load Weights

In [None]:
checkpoint  = np.load('epoch_latest.npz')
conv1_w     = checkpoint['conv1_w']
conv1_b     = checkpoint['conv1_b']
conv2_w     = checkpoint['conv2_w']
conv2_b     = checkpoint['conv2_b']
ll_w        = checkpoint['ll_w']
ll_b        = checkpoint['ll_b']
start_epoch = checkpoint['start_epoch'].item()
bs          = checkpoint['bs'].item()

## Training Loop

In [None]:
for epoch in range(start_epoch, total_epoch + 1):
  train_indices = make_training_indices(X_train)
  train_loss = []
  train_acc = []
  
  test_indices = np.arange(X_test.shape[0])
  test_loss = []
  test_acc = []

  for i in range(math.ceil(train_indices.shape[0]/bs)):
    print("Epoch: " + str(epoch) + "/" + str(total_epoch) + " Train Iter: " + str(i + 1) + "/" + str(math.ceil(train_indices.shape[0]/bs)))
    X_batch = (X_train[:,:,:,np.newaxis][train_indices[i*bs:i*bs+bs]] - mean) / std
    y_batch = onehot(y_train[train_indices[i*bs:i*bs+bs]])
    y_hat = forward_pass(X_batch)
    grad = y_hat - y_batch
    backward_pass(grad, X_batch.shape[0])
    train_loss.append(crossentropyloss(y_hat, y_batch))
    train_acc.append(accuracy(onehot(y_hat.argmax(axis = 1)), y_batch))
    print("loss: " + str(train_loss[i]) + ' acc: ' + str(train_acc[i]))
  print("avg train loss: " + str(np.average(train_loss)) + ' avg train acc: ' + str(np.average(train_acc)))

  for i in range(math.ceil(test_indices.shape[0]/bs)):
    print("Epoch: " + str(epoch) + "/" + str(total_epoch) + " Test Iter: " + str(i + 1) + "/" + str(math.ceil(test_indices.shape[0]/bs)))
    X_batch = (X_test[:,:,:,np.newaxis][test_indices[i*bs:i*bs+bs]] - mean) / std
    y_batch = onehot(y_test[test_indices[i*bs:i*bs+bs]])
    y_hat = forward_pass(X_batch)
    test_loss.append(crossentropyloss(y_hat, y_batch))
    test_acc.append(accuracy(onehot(y_hat.argmax(axis = 1)), y_batch))
    print("loss: " + str(test_loss[i]) + ' acc: ' + str(test_acc[i]))
  print("avg test loss: " + str(np.average(test_loss)) + ' avg test acc: ' + str(np.average(test_acc)))

  np.savez('epoch_' + str(epoch) + '.npz',
           conv1_w = conv1_w,
           conv1_b = conv1_b,
           conv2_w = conv2_w,
           conv2_b = conv2_b,
           ll_w    = ll_w,
           ll_b    = ll_b,
           train_loss = train_loss,
           train_acc  = train_acc,
           test_loss  = test_loss,
           test_acc   = test_acc,
           start_epoch = epoch + 1,
           bs = bs
          )

  np.savez('epoch_latest.npz',
           conv1_w = conv1_w,
           conv1_b = conv1_b,
           conv2_w = conv2_w,
           conv2_b = conv2_b,
           ll_w    = ll_w,
           ll_b    = ll_b,
           train_loss = train_loss,
           train_acc  = train_acc,
           test_loss  = test_loss,
           test_acc   = test_acc,
           start_epoch = epoch + 1,
           bs = bs
          )

## Single Test/Validation

In [None]:
test_indices = np.arange(X_test.shape[0])
test_loss = []
test_acc = []

for i in range(math.ceil(test_indices.shape[0]/bs)):
  print("Test Iter: " + str(i + 1) + "/" + str(math.ceil(test_indices.shape[0]/bs)))
  X_batch = (X_test[:,:,:,np.newaxis][test_indices[i*bs:i*bs+bs]] - mean) / std
  y_batch = onehot(y_test[test_indices[i*bs:i*bs+bs]])
  y_hat = forward_pass(X_batch)
  test_loss.append(crossentropyloss(y_hat, y_batch))
  test_acc.append(accuracy(onehot(y_hat.argmax(axis = 1)), y_batch))
  print("loss: " + str(test_loss[i]) + ' acc: ' + str(test_acc[i]))
print("avg test loss: " + str(np.average(test_loss)) + ' avg test acc: ' + str(np.average(test_acc)))

## Random Test

In [None]:
test_size = 20
dataset = X_test
test_indices = np.random.randint(dataset.shape[0], size = test_size)

imArr = (dataset[test_indices] * 255).astype(np.uint8)
dst = np.zeros((28,0), dtype = np.uint8)
for im in imArr:
  dst = np.concatenate((dst,im), axis = 1)
display(Image.fromarray(dst))


X_batch = (dataset[:,:,:,np.newaxis][test_indices] - mean) / std
y_hat = forward_pass(X_batch)
for pred in y_hat.argmax(axis = 1):
  print(pred, end = '   ')