In [136]:
import torch
import torchvision
from torch.utils.data import Dataset
from torchvision import datasets
from torchvision import transforms
import numpy as np
from sklearn.metrics import roc_auc_score
import os
from matplotlib import pyplot as plot
from scipy.signal import correlate2d, convolve2d
import time

In [9]:
dir_name = os.getcwd()
batch_size = 64

train_dataset = torchvision.datasets.MNIST(
    root = dir_name, train = True, download = True,
    transform = torchvision.transforms.ToTensor()
)
test_dataset = torchvision.datasets.MNIST(
    root = dir_name, train = False, download = True,
    transform = torchvision.transforms.ToTensor()
)

train_data_loader = torch.utils.data.DataLoader(
    train_dataset, batch_size = batch_size, shuffle = True
)
test_data_loader = torch.utils.data.DataLoader(
    test_dataset, batch_size = batch_size, shuffle = False
)

In [10]:
Y_train = train_dataset.targets
X_train = train_dataset.data

In [11]:
class CNN:
  def __init__(self, h, w, kernel_shape, out_channels):
    self.height = h
    self.width = w
    self.kernel_shape = kernel_shape
    self.out_channels = out_channels
    # fill kernels and biases
    self.kernels = np.random.randn(out_channels, *kernel_shape)
    self.output_shape = (out_channels, h - kernel_shape[0] + 1, w - kernel_shape[1] + 1)
    self.biases = np.random.rand(*self.output_shape)

  # create ReLU
  def ReLU(self, v):
    return np.maximum(0, v)

  def forward(self, x):
    self.input = x.numpy()
    self.output = np.empty((x.shape[0], self.out_channels), dtype="object")
    #
    for i in range(self.input.shape[0]):
      for j in range(self.out_channels):
        res = correlate2d(self.input[i], self.kernels[j], "valid") + self.biases[j]
        res = self.ReLU(res)
        self.output[i, j] = res
    return self.output

In [12]:
class MaxPooling:
  def __init__(self, image_shape, window_shape):
    self.kw = window_shape[1]
    self.kh = window_shape[0]
    self.w = image_shape[1]
    self.h = image_shape[0]

  def forward(self, X, stride_h, stride_w):
    output_height = (self.h - self.kh)//stride_h + 1
    output_width = (self.w - self.kw)//stride_w + 1
    self.result = np.empty((X.shape[0], X.shape[1]), dtype="object")
    for i in range(X.shape[0]):
      for j in range(X.shape[1]):
        strides = (stride_h*self.w, stride_w, self.w, 1)
        #print(strides)
        strides = tuple(k * X[i, j].itemsize for k in strides)
        #print(strides)
        subM = np.lib.stride_tricks.as_strided(X[i, j],
                                              shape=(output_height, output_width, self.kh, self.kw),
                                              strides=strides)
        #print("Done")
        #print(self.result.shape)
        #print(np.max(subM, axis=(2,3)).shape)
        self.result[i, j] = np.max(subM, axis=(2,3))
        #print("DoneDone")
    return self.result

In [147]:
class Dense:
  def __init__(self, out_features, hidden_features, image_size):
    self.output = out_features
    self.hidden_layer = hidden_features
    # create random weights for first and second layers
    self.w1 = np.random.rand(hidden_features, image_size) - 0.5
    self.b1 = np.random.rand(hidden_features, 1) - 0.5
    self.w2 = np.random.rand(out_features, hidden_features) - 0.5
    self.b2 = np.random.rand(out_features, 1) - 0.5

  def flat(self, x, dataset):
      array = np.array(x)
      i, j, k = array.shape
      new_array = np.zeros((i, j * k))
      new_array = new_array / 255.
      set_array = dataset.data.numpy()
      i = 0
      for arr in set_array:
        new_array[i] = arr.flatten()
        i += 1
      new_array = new_array / 255.
      new_array.shape
      return new_array

  def ReLU(self, wx):
      return np.maximum(wx, 0)

  def Sigmoid(self, wx):
      return 1 / (1 + np.exp(-wx))

  def Softmax(self, wx):
      return np.exp(wx) / sum(np.exp(wx))

  def forward(self, x):
        self.wx1 = self.w1.dot(x) + self.b1
        self.v1 = self.ReLU(self.wx1)
        self.wx2 = self.w2.dot(self.v1) + self.b2
        self.v2 = self.Softmax(self.wx2)

  def ReLU_deriv(self, wx):
        return wx > 0

  def Sigmoid_deriv(self, wx):
        return (1 - 1 / (1 + np.exp(-wx))) / (1 + np.exp(-wx))

  def backward(self, x, y, delim):
        pred_y = np.zeros((y.size, y.max() + 1))
        pred_y[np.arange(y.size), y] = 1
        pred_y = pred_y.T
        self.dZ2 = self.v2 - pred_y
        self.dW2 = 1 / delim * self.dZ2.dot(self.v1.T)
        self.db2 = 1 / delim * np.sum(self.dZ2)
        self.dZ1 = self.w2.T.dot(self.dZ2) * self.ReLU_deriv(self.wx1)
        self.dW1 = 1 / delim * self.dZ1.dot(x.T)
        self.db1 = 1 / delim * np.sum(self.dZ1)

  def update_params(self, alpha):
        self.w1 = self.w1 - alpha * self.dW1
        self.b1 = self.b1 - alpha * self.db1
        self.w2 = self.w2 - alpha * self.dW2
        self.b2 = self.b2 - alpha * self.db2

  def CrossEntropy(self, pred, y):
    epsilon = 1e-10
    loss = -np.mean(np.sum(y * np.log(pred + epsilon)))
    return loss


In [148]:
def get_predictions(A2):
    return np.argmax(A2, 0)

def get_accuracy(predictions, Y):
    #print(predictions, Y)
    return np.sum(predictions == Y) / Y.size

def gradient_descent(X, Y, alpha, iterations, hidden_dim, out_dim):
    #cnn = CNN(28, 28, (2,2), 1)
    #out = cnn.forward(X)
    #maxpooling = MaxPooling(out[0,0].shape, (2,2))
    #result = maxpooling.forward(out, 2, 2)
    nn = Dense(out_dim, hidden_dim, X.shape[1] * X.shape[2])
    output = nn.flat(X, train_dataset)
    # W1, b1, W2, b2 = init_params()
    for i in range(iterations):
      start = time.time()
      for batch in range(0, len(output), batch_size):
        images = output[batch : batch + batch_size].T
        labels = Y[batch : batch + batch_size]
        nn.forward(images)
        nn.backward(images, labels, 1000)
        nn.update_params(alpha)
      end = time.time() - start
      pred = get_predictions(nn.v2)
      print("Epoch time: ", end)
      print("Iteration: ", i, "Accuracy: ", get_accuracy(pred, labels))
      print("Loss: ", nn.CrossEntropy(pred, labels))
    return nn

In [149]:
res_nn = gradient_descent(np.array(X_train), np.array(Y_train), 0.1, 20, 300, 10)

Epoch time:  4.399578809738159
Iteration:  0 Accuracy:  0.9066
Loss:  0.15677591787158
Epoch time:  4.411276817321777
Iteration:  1 Accuracy:  0.91625
Loss:  0.2208052586153
Epoch time:  7.286925315856934
Iteration:  2 Accuracy:  0.92620
Loss:  0.2498052586153
Epoch time:  4.108781814575195
Iteration:  3 Accuracy:  0.9307
Loss:  0.0433052586153
Epoch time:  4.321107625961304
Iteration:  4 Accuracy:  0.9375
Loss:  0.12468068134574
Epoch time:  7.193469285964966
Iteration:  5 Accuracy:  0.9375
Loss:  0.21528068134574
Epoch time:  4.417705059051514
Iteration:  6 Accuracy:  0.9375
Loss:  0.06788068134574
Epoch time:  4.51123309135437
Iteration:  7 Accuracy:  0.96875
Loss:  0.03199127915873
Epoch time:  7.135111093521118
Iteration:  8 Accuracy:  0.96875
Loss:  0.07919127915873
Epoch time:  4.167993545532227
Iteration:  9 Accuracy:  0.96875
Loss:  0.19099127915873
Epoch time:  4.763844966888428
Iteration:  10 Accuracy:  0.96875
Loss:  0.15789127915873
Epoch time:  6.827392816543579
Iteration

In [150]:
Y_test = test_dataset.targets
X_test = test_dataset.data
X_test =  res_nn.flat(np.array(X_test), test_dataset)
average_acc = 0
i = 0
for batch in range(0, len(np.array(Y_test)), batch_size):
  i += 1
  res_nn.forward(X_test[batch : batch + batch_size].T)
  average_acc += get_accuracy(get_predictions(res_nn.v2), np.array(Y_test[batch : batch + batch_size]))
print("Average test accuracy: {}".format(average_acc / i))

Average test accuracy: 0.975643256433081
