In [38]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import random
from keras.datasets import mnist
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import SubsetRandomSampler
from tqdm import tqdm

# Previous

In [39]:
class GradientDescent():


    def __init__(self, lr = 1e-3, eps = 1e-4):
        self.lr = lr
        self.eps = eps
        self.delta = 0


    def optimize(self, target, gradients):
        optimized = []
        for t, grad in zip(target, gradients):
            optimized.append(t - self.lr * grad)
            self.delta += self.lr * np.linalg.norm(grad)
        return optimized


    def stop(self):
        return not(self.delta > 1e-9 or self.delta < self.eps)

In [40]:
class Node:


    def __init__(self, input_dim, output_dim, inner_ndim):

        self.n_input = (input_dim, ) if isinstance(input_dim, int) else tuple(input_dim)
        self.input_dim = 1 if isinstance(input_dim, int) else len(self.n_input)
        self.n_output = (output_dim, ) if isinstance(output_dim, int) else tuple(output_dim)
        self.output_dim = 1 if isinstance(input_dim, int) else len(self.n_output)
        self.inner_dim = inner_ndim
        self.input = None
        self.labels = None



    def change_dims(self, x, dim):
        return np.reshape(x, x.shape[-dim:]) if x.ndim > dim else (x if x.ndim == dim else np.expand_dims(x, tuple(range(dim - x.ndim))))

In [41]:
class Softmax(Node):


    def __init__(self, n_input):
        super().__init__(n_input, n_input, inner_ndim=2)


    def softmax_call(self, x):
        self.sm_max_index = np.argmax(np.abs(x), axis=1).reshape(-1, 1)
        self.softmax_x_norm = x / np.max(np.abs(x), axis=1).reshape(-1, 1)
        exp = np.exp(self.softmax_x_norm)
        return exp / exp.sum(axis=1).reshape(-1, 1)


    def jacobian(self, x):
        rows, classes = self.softmax_x_norm.shape
        exp_x = np.exp(self.softmax_x_norm)
        exp_sum = exp_x.sum(axis=1)

        softmax_jacobian = np.zeros((rows, classes, classes))

        for row in range(rows):
            exp_x_row = exp_x[row]
            exp_sum_row = exp_sum[row]
            diag = np.diag([exp_xi / exp_sum_row - (exp_xi / exp_sum_row) ** 2 for exp_xi in exp_x_row])
            triag = np.array([[-exp_x_row[i] * exp_x_row[j] / exp_sum_row**2 if i > j else 0 for i in range(classes)] for j in range(classes)])
            softmax_jacobian[row] = diag + triag + triag.T

        for row in range(x.shape[0]):
            max_index = self.sm_max_index[row][0]
            x_max = np.abs(x)[row, max_index]
            dx_norm_dx = np.diag([1/x_max for _ in range(x.shape[1])])
            dx_norm_dx[:, max_index] = np.array([-x_i / x_max**2 for x_i in x[row]])
            dx_norm_dx[max_index] = np.zeros(x.shape[1])
            softmax_jacobian[row] = softmax_jacobian[row] @ dx_norm_dx

        return softmax_jacobian


    def forward(self, input, labels = None):
        self.input = self.change_dims(input, self.inner_dim)
        self.labels = self.change_dims(labels, self.inner_dim)
        return self.change_dims(self.softmax_call(self.input), self.output_dim)


    def backward(self):
        backprop_pd = self.jacobian(self.input)
        self.input = None
        self.labels = None
        return self.change_dims(backprop_pd, self.input_dim)

In [42]:
class ReLU(Node):


    def __init__(self, n_input):
        super().__init__(n_input, n_input, 2)


    def jacobian(self, x):
        return np.array([np.diag([1 if x_elem > 0 else 0 for x_elem in x_row]) for x_row in x])


    def forward(self, input, labels = None):
        self.input = self.change_dims(input, self.inner_dim)
        return self.change_dims(np.maximum(self.input, 0), self.output_dim)


    def backward(self, input_pd):
        input_pd = self.change_dims(input_pd, self.inner_dim)
        jacobian = self.jacobian(self.input)
        backprop_pd = np.array([jacobian[i] @ input_pd[i] for i in range(jacobian.shape[0])])
        return self.change_dims(backprop_pd, self.output_dim)


    def optimize_weights(self, optimizer):
        pass

In [43]:
class Convolution(Node):


    def __init__(self, input_dim, conv_dim, W = None):
        super().__init__(input_dim, (1, input_dim[1] - conv_dim[1] + 1, input_dim[1] - conv_dim[1] + 1), 3)
        self.W = np.random.uniform(0.4, 0.6, conv_dim) if W is None else W
        self.input_values = None
        self.output_values = None
        self.labels = None
        self.W_pd = None


    def convolve(self, T, W, add_padding = False):
        T = np.expand_dims(T, axis=0) if T.ndim == 2 else T
        W = np.expand_dims(W, axis=0) if W.ndim == 2 else W
        output_shape = (1, (T.shape[1] + W.shape[1] - 1), (T.shape[2] + W.shape[2] - 1)) if add_padding else (1, (T.shape[1] - W.shape[1] + 1), (T.shape[2] - W.shape[2] + 1))
        T = np.pad(T, pad_width=[(0, 0), (W.shape[1] - 1, W.shape[1] - 1), (W.shape[2] - 1, W.shape[2] - 1)]) if add_padding else T

        convolution = np.zeros(output_shape)
        for row in range(output_shape[1]):
            for col in range(output_shape[2]):
                convolution[0, row, col] = np.sum(T[:, row: row + W.shape[1], col: col + W.shape[2]] * W)
        return convolution


    def forward(self, input, labels = None):
        self.input_values = self.change_dims(input, self.inner_dim)
        self.output_values = self.convolve(self.input_values, self.W)
        return self.change_dims(self.output_values, self.output_dim)


    def backward(self, input_pd):
        self.W_pd = np.concatenate([self.convolve(self.input_values[i], self.change_dims(input_pd, self.inner_dim)) for i in range(self.n_input[0])], axis=0)
        return self.change_dims(np.concatenate([self.convolve(self.change_dims(input_pd, self.inner_dim), self.W[i, ::-1, ::-1], True) for i in range(self.n_input[0])], axis=0), self.output_dim)


    def optimize_weights(self, gd):
        self.W = gd.optimize([self.W], [self.W_pd])[0]

In [44]:
class FullyConnectedLayer():

    def __init__(self, n_input, n_output):
        self.n_input = n_input
        self.n_output = n_output
        self.W = np.random.randn(n_input, n_output)
        self.b = np.zeros(n_output)

    def forward(self, input):
        self.input = input
        return np.dot(input, self.W) + self.b

    def backward(self, grad_output, learning_rate):
        grad_input = np.dot(grad_output, self.W.T)
        grad_weights = np.dot(self.input.T, grad_output)
        grad_biases = np.sum(grad_output, axis=0)

        self.W -= learning_rate * grad_weights
        self.b -= learning_rate * grad_biases

        return grad_input

In [45]:
class MeanSquaredError():
    def __init__(self):
        pass

    def forward(self, y_pred, y_true):
        self.y_pred = y_pred
        self.y_true = y_true
        loss = np.mean((y_pred - y_true) ** 2)
        return loss

    def backward(self):
        batch_size = self.y_true.shape[0]
        grad_input = 2 * (self.y_pred - self.y_true) / batch_size
        return grad_input

In [46]:
class ConvolutionalNeuralNetwork:

    def __init__(self, n_input, n_output, lr):
        self.layers = [Convolution((n_output, n_input), (n_input, n_output)), Convolution((n_output, n_input), (n_input, n_output)), FullyConnectedLayer(128, 576), FullyConnectedLayer(n_output, 128), Softmax(n_output), ReLU(n_output), ReLU(n_output)]
        self.loss = MeanSquaredError()
        self.gd = GradientDescent(lr)


    def fit(self, X, y, n_epochs):
        n = 0
        while True:
            loss = 0
            for batch in range(X.shape[0]):
                state = X[batch]
                label = y[batch]

                state = self.predict(state)
                loss += self.loss.forward(state, label)

                upstream = self.loss.backward()
                for layer in self.layers[::-1]:
                    upstream = layer.backward(upstream)
                    layer.optimize_weights(self.gd)
            n += 1
            print(f"Epoch {n}, Loss: {loss}")

            if n >= n_epochs or self.gd.stop():
                break


    def predict(self, x):
        state = x.copy()
        for layer in self.layers:
            state = layer.forward(state)
        return state

## Part 1

In [47]:
def label_vec_func(labels):
    labels_matrix = np.zeros([len(labels), 10])
    labels_matrix[np.arange(len(labels)), labels] = 1
    return labels_matrix

In [48]:
(X_train, y_train), (X_test, y_test) = mnist.load_data()

n_input, n_output = 784, 10

X_train = X_train.reshape(X_train.shape[0], -1)
y_train_one_hot = label_vec_func(y_train).reshape((y_train.shape[0], 10))

In [None]:
network = ConvolutionalNeuralNetwork(n_input, n_output, 0.5)
network.fit(X_train, y_train_one_hot, n_epochs=2)

In [None]:
def compute_accuracy(X_test, y_test, model):

    correct_predictions = 0
    total = 0

    for input, label in zip(X_test, y_test):
        predicts = model.predict(input)
        correct_predictions += (np.argmax(predicts, axis=1) == label).sum()
        total += len(label)

    return correct_predictions / total

In [None]:
X_batches = X_test.reshape((X_test.shape[0], -1))
y_batches = y_test.reshape((y_test.shape[0],))

In [None]:
print(f"Accuracy: {compute_accuracy(X_batches, y_batches, network)}")

## Part 2

In [None]:
class TorchNetwork(nn.Module):
    def __init__(self, in_channels=1, out_channels=10):
        super(TorchNetwork, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, in_channels, 3)
        self.conv2 = nn.Conv2d(in_channels, in_channels, 3)
        self.fc1 = nn.Linear(576, 128)
        self.fc2 = nn.Linear(128, out_channels)
        self.softmax = nn.Softmax(dim=1)
        self.relu1 = nn.ReLU()
        self.relu2 = nn.ReLU()

    def forward(self, x):
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.conv2(x)
        x = self.relu2(x)
        x = x.view(x.size(0), -1)
        x = self.fc1(x)
        x = self.fc2(x)
        x = self.softmax(x)
        return x

In [None]:
transform = transforms.Compose([transforms.ToTensor()])

trainset = torchvision.datasets.MNIST(root='./data', train=True, download=True, transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=128, shuffle=True)

testset = torchvision.datasets.MNIST(root='./data', train=False, download=True, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=1, shuffle=False)

network = TorchNetwork().to('cpu')
loss_function = nn.CrossEntropyLoss().to('cpu')
optimizer = torch.optim.SGD(network.parameters(), lr=0.01)

for epoch in range(2):
    bar = tqdm(trainloader)
    total_loss = 0
    for i, (inputs, targets) in enumerate(bar):
        inputs = inputs.to('cpu')
        targets = targets.to('cpu')

        outputs = network(inputs)
        loss = loss_function(outputs, targets)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        bar.set_description('Epoch: %d/%d | Loss: %.4f' % (epoch + 1, 2, total_loss / (i + 1)))

In [None]:
correct = 0
total = 0
with torch.no_grad():
    bar = tqdm(testloader)
    for data in bar:
        inputs, targets = data
        outputs = network(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += 1
        correct += (predicted == targets).item()
        bar.set_description('Accuracy: %.2f %%' % (100 * correct / total))