In [1]:
# 3. Utilize batched operations for enhanced operations
# Asta inseamna sa folosesti mini-batch-uri si sa folosesti
# pe cat se poate operatii de numpy, adica spre ex: sa nu
# faci inmultirile de mana cand poti face dot product

# Training-ul ar trebui sa ia cam 5-6min

# As zice ca implementarea dropout suna cel mai bine
# E posibil ca unii neuroni sa puna monopol pe output, vedem cum se
# descurca reteaua fara ei si dam update la retea

## Initialize MNIST dataset

In [2]:
import math
import random
from random import randint

import numpy as np
from torchvision.datasets import MNIST
import ssl

ssl._create_default_https_context = ssl._create_unverified_context

def download_mnist(is_train: bool):
    dataset = MNIST(root="./data",
                    transform=lambda x: np.array(x).flatten(),
                    download=True,
                    train=is_train)

    mnist_data = []
    mnist_labels = []
    for image, label in dataset:
        mnist_data.append(image)
        mnist_labels.append(label)

    return mnist_data, mnist_labels

train_data, train_labels = download_mnist(True)
test_data, test_labels = download_mnist(False)

train_data = np.array(train_data)
train_labels = np.array(train_labels)

test_data = np.array(test_data)
test_labels = np.array(test_labels)

## Normalize data and Convert labels to One-Hot Encoding

In [3]:
def normalize(data, min_value, max_value):
    return (data - min_value) / (max_value - min_value)

def one_hot_encode(labels, num_categories):
    length = len(labels)
    converted_labels = np.full((length, num_categories), 0, dtype=np.int32)

    for i in range(length):
        converted_labels[i][labels[i]] = 1

    return converted_labels

classes = np.max(train_labels) + 1

min_value = min(np.min(train_data), np.min(test_data))
max_value = max(np.max(train_data), np.max(test_data))

train_data = normalize(train_data, min_value, max_value)
test_data = normalize(test_data, min_value, max_value)

train_labels = one_hot_encode(train_labels, 10)
test_labels = one_hot_encode(test_labels, 10)

num_inputs = len(train_data[0])

## NN Training

In [44]:
import time
start_time = time.time()

hidden_layers = 3
hidden_neurons_per_layer = 16
num_epochs = 1000
batch_size = 128
learning_rate = 0.002
slow_learning_rate = 0.0005
dropout_neurons = 4
weight_decay = 0.0005
momentum = 0.9

tester = None
cost_function = None

class Function:
    def apply(self, *values):
        pass
    
    def apply_derivative(self, *values):
        pass
    
class Relu(Function):
    def apply(self, *values):
        if values[0] > 0:
            return values[0]
        
        return 0.01 * values[0]
    
    def apply_derivative(self, *values):
        if values[0] > 0:
            return 1
        
        return 0.01
    
class Softmax(Function):
    def __init__(self, values):
        self.base_values = values.copy()
        self.values = np.exp(values)
        self.values /= np.max(self.values)
        
        self.cached_sum = np.sum(self.values)
    
    def apply(self, *values): 
        index = values[0]
        
        return self.values[index] / self.cached_sum
    
    def apply_derivative(self, *values):
        index = values[0]
        
        if index == len(self.base_values) - 1:
            return self.base_values[index] * (1 - self.base_values[index])
        
        return -self.base_values[index] * self.base_values[-1]
    
class CrossEntropy(Function):
    def apply(self, *values):
        value = values[0]
        target = values[1]
        
        return value - target

class Neuron:
    def __init__(self, num_inputs, neuron_index, activation_function):
        std = math.sqrt(2.0 / num_inputs) # He init
        self.weights = np.random.randn(num_inputs) * std
        self.bias = np.random.randn() * std
        self.num_inputs = num_inputs
        self.neuron_index = neuron_index
        self.value = 0
        self.net_sum = 0
        self.error = 0
        self.activation_function = activation_function
        self.dropout = False
        self.weight_momentum = np.zeros(num_inputs)
        self.bias_momentum = 0
        
    def set_dropout(self, value):
        self.dropout = value
    
    def compute(self, data):
        if self.dropout:
            self.net_sum = 0
            self.value = 0
            return
        
        self.net_sum = np.dot(self.weights, data) + self.bias
        self.value = self.activation_function.apply(self.net_sum)
    
    def compute_error(self, influenced_neurons):
        self.error = 0
        for i in range(len(influenced_neurons)):
            self.error += influenced_neurons[i].error * influenced_neurons[i].weights[self.neuron_index]
        self.error *= self.activation_function.apply_derivative(self.net_sum)
            
    def adjust_weights(self, influencer_neurons):
        for i in range(len(self.weights)):
            #self.weight_momentum[i] = self.weight_momentum[i] * momentum + (self.error * influencer_neurons[i].value + weight_decay * self.weights[i])
            #self.weights[i] -= learning_rate * self.weight_momentum[i]
    
            self.weights[i] -= learning_rate * (self.error * influencer_neurons[i].value + weight_decay * self.weights[i])
            
        #self.bias_momentum = self.bias_momentum * momentum * self.error
        #self.bias -= learning_rate * self.bias_momentum
        
        self.bias -= learning_rate * self.error
    
class InputNeuron(Neuron):
    def compute(self, data):
        self.value = data[self.neuron_index] 

class OutputNeuron(Neuron):
    def __init__(self, num_inputs, neuron_index, activation_function):
        super().__init__(num_inputs, neuron_index, activation_function)
        self.target = 0
        
    def compute(self, data):
        self.net_sum = np.dot(self.weights, data) + self.bias
        
    def compute_probability(self):
        self.value = self.activation_function.apply(self.neuron_index)
        
    def compute_error(self, target):
        self.error = self.value - target
        

class Layer:
    def __init__(self, num_neurons, neuron_inputs, neuron_type):
        self.activation_function = Relu()
        self.neurons = [neuron_type(neuron_inputs, i, self.activation_function) for i in range(num_neurons)]
        
    def compute(self, data):
        for neuron in self.neurons:
            neuron.compute(data)
            
    def get_values(self):
        return np.array([neuron.value for neuron in self.neurons])
    
    def get_net_sums(self):
        return np.array([neuron.net_sum for neuron in self.neurons])
    
    def compute_error(self, next_layer):
        next_layer_neurons = next_layer.neurons
        for neuron in self.neurons:
            neuron.compute_error(next_layer_neurons)
            
    def adjust_weights(self, prev_layer):
        prev_layer_neurons = prev_layer.neurons
        for neuron in self.neurons:
            neuron.adjust_weights(prev_layer_neurons)
    
class OutputLayer(Layer):
    def __init__(self, num_neurons, neuron_inputs):
        super().__init__(num_neurons, neuron_inputs, OutputNeuron)
        
    def compute(self, data):
        super().compute(data)
        
        self.activation_function = Softmax(self.get_net_sums())
        
        for neuron in self.neurons:
            neuron.activation_function = self.activation_function
            neuron.compute_probability()
            
    def compute_error(self, targets):
        for i in range(len(self.neurons)):
            self.neurons[i].compute_error(targets[i])

class NeuralNetwork:
    def __init__(self):
        self.layers = []
        self.layers.append(Layer(num_inputs, 1, InputNeuron))
        for i in range(hidden_layers):
            self.layers.append(Layer(hidden_neurons_per_layer, len(self.layers[i].neurons), Neuron))
            
        self.layers.append(OutputLayer(classes, len(self.layers[-1].neurons)))
        
    def forward_pass(self, data):
        self.layers[0].compute(data)
        
        for i in range(1, len(self.layers)):
            self.layers[i].compute(self.layers[i-1].get_values())
            
        return self.layers[-1].get_values()
    
    def backprop(self, target):
        self.layers[-1].compute_error(target)
        self.layers[-1].adjust_weights(self.layers[-2])
        for i in range(len(self.layers) - 2, 0, -1):
            self.layers[i].compute_error(self.layers[i+1])
            self.layers[i].adjust_weights(self.layers[i-1])

class TrainingSample:
    def __init__(self, data, output):
        self.input = data
        self.output = output

class Trainer:
    def __init__(self, nn: NeuralNetwork):
        self.neural_network = nn
        self.training_samples = []
        for i in range(len(train_data)):
            self.training_samples.append(TrainingSample(train_data[i], train_labels[i]))
    
    def get_mini_batches(self):
        random.shuffle(self.training_samples)
        
        num_batches = len(self.training_samples) // batch_size + 1
    
        batches = []
    
        for i in range(num_batches - 1):
            batches.append(self.training_samples[i * batch_size:(i + 1) * batch_size])
    
        batches.append(self.training_samples[(num_batches - 1) * batch_size:])
    
        return batches
    
    def start(self):
        for epoch in range(num_epochs):
            batches = self.get_mini_batches()
            
            for batch_index in range(len(batches)):
                training_sample = self.training_samples[batch_index]
                
                neurons = []
                
                for i in range(dropout_neurons):
                    layer = random.randint(1, len(self.neural_network.layers) - 2)
                    neuron = random.randint(0, len(self.neural_network.layers[layer].neurons) - 1)
                    
                    neurons.append(self.neural_network.layers[layer].neurons[neuron])
                    
                    neurons[i].set_dropout(True)
        
                self.neural_network.forward_pass(training_sample.input)
                
                for i in range(dropout_neurons):
                    neurons[i].set_dropout(False)
                
                self.neural_network.backprop(training_sample.output)
                
            accuracy = tester.get_accuracy() * 100
                
            if accuracy > 88:
                global learning_rate
                learning_rate = slow_learning_rate
                
            print("Epoch " + str(epoch + 1) + "/" + str(num_epochs) + ": " + str(accuracy) + "% - " + str(time.time() - start_time) + "s")

class Tester:
    def __init__(self, nn: NeuralNetwork):
        self.neural_network = nn
        self.test_samples = []
        for i in range(len(test_data)):
            self.test_samples.append(TrainingSample(test_data[i], test_labels[i]))
        
    def get_accuracy(self):
        length = len(self.test_samples)
        success_cases = 0
        for i in range(length):
            outputs = self.neural_network.forward_pass(test_data[i])
            
            prediction = np.argmax(outputs)
            
            if test_labels[i][prediction] == 1:
                success_cases += 1
                
        return success_cases / length

inner_activation_function = Relu()
neural_network = NeuralNetwork()
cost_function = CrossEntropy()
tester = Tester(neural_network)
trainer = Trainer(neural_network)
trainer.start()

Epoch 1/1000: 19.040000000000003% - 2.755295991897583s
Epoch 2/1000: 24.03% - 5.3779778480529785s
Epoch 3/1000: 33.48% - 7.94980525970459s
Epoch 4/1000: 34.23% - 10.54573392868042s
Epoch 5/1000: 39.290000000000006% - 13.118313074111938s
Epoch 6/1000: 42.84% - 15.761070966720581s
Epoch 7/1000: 57.74% - 18.337092876434326s
Epoch 8/1000: 63.51% - 20.916375160217285s
Epoch 9/1000: 62.51% - 23.49451208114624s
Epoch 10/1000: 64.62% - 26.16215991973877s
Epoch 11/1000: 70.82000000000001% - 29.067703247070312s
Epoch 12/1000: 67.86% - 31.69726014137268s
Epoch 13/1000: 70.62% - 34.291430950164795s
Epoch 14/1000: 73.37% - 36.94242215156555s
Epoch 15/1000: 73.91% - 39.54590916633606s
Epoch 16/1000: 77.21000000000001% - 42.1254301071167s
Epoch 17/1000: 78.22% - 44.70270919799805s
Epoch 18/1000: 80.74% - 47.33926701545715s
Epoch 19/1000: 78.4% - 49.926454067230225s
Epoch 20/1000: 79.71000000000001% - 52.50599718093872s
Epoch 21/1000: 80.81% - 55.09620785713196s
Epoch 22/1000: 81.10000000000001% - 57.

KeyboardInterrupt: 