In [1]:
import pickle
import gzip
import random

import numpy as np

In [2]:
class Layer:
    def __init__(self):
        self.params = []
        
        self.previous = None  # prev layer
        self.next = None  # next layer
        
        self.input_data = None  # forward input
        self.output_data = None  #forward output
        
        self.input_delta = None  # backward input
        self.output_delta = None  # backward output
        
    def connect(self, layer):
        self.previous = layer
        layer.next = self
        
    def forward(self):
        """Forwarding data over the network"""
        raise NotImplementedError
        
    def get_forward_input(self):
        if self.previous is not None:
            return self.previous.output_data
        else:
            return self.input_data
        
    def backward(self):
        """Back propagation of an error"""
        raise NotImplementedError
        
    def get_backward_input(self):
        if self.next is not None:
            return self.next.output_delta
        else:
            return self.input_delta
        
    def clear_deltas(self):
        """After accumulating delta for each mini packet, you need to reset them"""
        pass
    
    def update_params(self, learning_rate):
        pass
    
    def describe(self):
        raise NotImplementedError

In [3]:
def sigmoid_double(x):
    return 1.0 / (1.0 + np.exp(-x))

def sigmoid(z):
    return np.vectorize(sigmoid_double)(z)

def sigmoid_prime_double(x):
    return sigmoid_double(x) * (1 - sigmoid_double(x))

def sigmoid_prime(Z):
    return np.vectorize(sigmoid_prime_double)(Z)

In [4]:
class ActivationLayer(Layer):
    def __init__(self, input_dim):
        super(ActivationLayer, self).__init__()
        
        self.input_dim = input_dim
        self.output_dim = input_dim
        
    def forward(self):
        data = self.get_forward_input()
        self.output_data = sigmoid(data)
        
    def backward(self):
        delta = self.get_backward_input()
        data = self.get_forward_input()
        self.output_delta = delta * sigmoid_prime(data)
        
    def describe(self):
        print("|-- " + self.__class__.__name__)
        print(f" |-- dimensions: {self.input_dim},{self.output_dim}")

In [5]:
class DenseLayer(Layer):
    def __init__(self, input_dim, output_dim):
        super(DenseLayer, self).__init__()
        
        self.input_dim = input_dim
        self.output_dim = output_dim
        
        self.weight = np.random.randn(output_dim, input_dim)
        self.bias = np.random.randn(output_dim, 1)
        
        self.params = [self.weight, self.bias]
        
        self.delta_w = np.zeros(self.weight.shape)
        self.delta_b = np.zeros(self.bias.shape)

    def forward(self):
        data = self.get_forward_input()
        self.output_data = np.dot(self.weight, data) + self.bias

    def backward(self):
        data = self.get_forward_input()
        delta = self.get_backward_input()
        
        self.delta_b += delta
        self.delta_w += np.dot(delta, data.transpose())
        
        self.output_delta = np.dot(self.weight.transpose(), delta)

    def update_params(self, rate):
        self.weight -= rate * self.delta_w
        self.bias -= rate * self.delta_b

    def clear_deltas(self):
        self.delta_w = np.zeros(self.weight.shape)
        self.delta_b = np.zeros(self.bias.shape)

    def describe(self):
        print("|-- " + self.__class__.__name__)
        print(f" |-- dimensions: {self.input_dim},{self.output_dim}")

In [6]:
class MSE:
    def __init__(self):
        pass

    @staticmethod
    def loss_functions(predictions, labels):
        diff = predictions - labels
        return 0.5 * sum(diff)[0]

    @staticmethod
    def loss_derivative(predictions, labels):
        return predictions - labels

In [7]:
class SequentialNetwork:
    def __init__(self, loss=None):
        print('Initialize network...')
        self.layers = []
        if loss is None:
            self.loss = MSE()
            
    def add(self, layer):
        self.layers.append(layer)
        layer.describe()
        if len(self.layers) > 1:
            self.layers[-1].connect(self.layers[-2])
            
    def train(self, training_data, epochs, mini_batch_size, learning_rate, test_data=None):
        n = len(training_data)
        for epoch in range(epochs):
            random.shuffle(training_data)
            mini_batches = [training_data[k:k + mini_batch_size] for k in range(0, n, mini_batch_size)]
            for mini_batch in mini_batches:
                self.train_batch(mini_batch, learning_rate)
                
            if test_data:
                n_test = len(test_data)
                print(f'Epoch {epoch}: {self.evaluate(test_data)} / {n_test}')
            else:
                print(f'Epoch {epoch} complite')
                
    def train_batch(self, mini_batch, learning_rate):
        self.forward_backward(mini_batch)
        self.update(mini_batch, learning_rate)
        
    def update(self, mini_batch, learning_rate):
        learning_rate = learning_rate / len(mini_batch)
        for layer in self.layers:
            layer.update_params(learning_rate)
            
        for layer in self.layers:
            layer.clear_deltas()
            
    def forward_backward(self, mini_batch):
        for x, y in mini_batch:
            self.layers[0].input_data = x
            for layer in self.layers:
                layer.forward()
            self.layers[-1].input_delta = self.loss.loss_derivative(self.layers[-1].output_data, y)
            for layer in reversed(self.layers):
                layer.backward()
                
    def single_forward(self, x):
        self.layers[0].input_data = x
        for layer in self.layers:
            layer.forward()
        
        return self.layers[-1].output_data
    
    def evaluate(self, test_data):
        test_results = [(np.argmax(self.single_forward(x)), np.argmax(y)) for (x, y) in test_data]
        
        return sum(int(x == y) for x, y in test_results)

In [8]:
def encode_label(j):
    """One hot encoding"""
    e = np.zeros((10, 1))
    e[j] = 1.0
    return e

def shape_data(data):
    """Reshape input 2d arrays(28 x 28) to 1d arrays(784), encode labels(0-10) to one hot""" 
    features = [np.reshape(x, (784, 1)) for x in data[0]]
    labels = [encode_label(y) for y in data[1]]
    
    return [(f, l) for f, l in zip(features, labels)]
                                   
def load_data():
    with gzip.open('data/mnist.pkl.gz', 'rb') as f:
        train_data, validation_data, test_data = pickle.load(f, encoding='latin1')
        
    return shape_data(train_data), shape_data(validation_data), shape_data(test_data)

In [9]:
train_data, val_data, test_data = load_data()

In [10]:
net = SequentialNetwork()

net.add(DenseLayer(784, 392))
net.add(ActivationLayer(392))
net.add(DenseLayer(392, 196))
net.add(ActivationLayer(196))
net.add(DenseLayer(196, 10))
net.add(ActivationLayer(10))

Initialize network...
|-- DenseLayer
 |-- dimensions: 784,392
|-- ActivationLayer
 |-- dimensions: 392,392
|-- DenseLayer
 |-- dimensions: 392,196
|-- ActivationLayer
 |-- dimensions: 196,196
|-- DenseLayer
 |-- dimensions: 196,10
|-- ActivationLayer
 |-- dimensions: 10,10


In [11]:
net.train(train_data, epochs=10, mini_batch_size=10, learning_rate=3.0, test_data=test_data)

Epoch 0: 3245 / 10000
Epoch 1: 5513 / 10000
Epoch 2: 5891 / 10000
Epoch 3: 6065 / 10000
Epoch 4: 5712 / 10000
Epoch 5: 5755 / 10000
Epoch 6: 5790 / 10000
Epoch 7: 5890 / 10000
Epoch 8: 5847 / 10000
Epoch 9: 5967 / 10000
