In [69]:
import random
import numpy as np
import pickle
import gzip

In [70]:
def load_data():
    """Return the MNIST data as a tuple containing the training data, the validation data, and the test data."""
    
    with gzip.open('./data/mnist.pkl.gz', 'rb') as f:
        training_data, validation_data, test_data = pickle.load(f, encoding='latin1')

    return (training_data, validation_data, test_data)

def change_loaded_data_format():
    """Return a tuple containing ``(training_data, validation_data,
    test_data)``. Based on ``load_data``, but the format is more
    convenient for use in our implementation of neural networks.

    In particular, ``training_data`` is a list containing 50,000
    2-tuples ``(x, y)``.  ``x`` is a 784-dimensional numpy.ndarray
    containing the input image.  ``y`` is a 10-dimensional
    numpy.ndarray representing the unit vector corresponding to the
    correct digit for ``x``.

    ``validation_data`` and ``test_data`` are lists containing 10,000
    2-tuples ``(x, y)``.  In each case, ``x`` is a 784-dimensional
    numpy.ndarry containing the input image, and ``y`` is the
    corresponding classification, i.e., the digit values (integers)
    corresponding to ``x``.

    Obviously, this means we're using slightly different formats for
    the training data and the validation / test data.  These formats
    turn out to be the most convenient for use in our neural network
    code."""
    tr_d, va_d, te_d = load_data()
    training_inputs = [np.reshape(x, (784, 1)) for x in tr_d[0]]
    training_results = [vectorized_result(y) for y in tr_d[1]]
    training_data = list(zip(training_inputs, training_results))
    validation_inputs = [np.reshape(x, (784, 1)) for x in va_d[0]]
    validation_data = list(zip(validation_inputs, va_d[1]))
    test_inputs = [np.reshape(x, (784, 1)) for x in te_d[0]]
    test_data = list(zip(test_inputs, te_d[1]))
    return (training_data, validation_data, test_data)

def vectorized_result(j):
    """Convert the digit to 10 dimensional vector"""
    e = np.zeros((10, 1))
    e[j] = 1.0
    return e

In [71]:
training_data, validation_data, test_data = change_loaded_data_format()

In [72]:
# Class of neural network
class NeuralNetwork:
    def __init__(self, sizes):
        self.sizes = sizes
        self.layer_number = len(sizes)
        # Initialize the weights and biases
        self.weights = [np.random.randn(j, i) for i, j in zip(sizes[:-1], sizes[1:])]
        self.biases = [np.random.randn(j, 1) for j in sizes[1:]]
    
    @staticmethod
    def sigmoid(z):
        """Coumputes activations using sigmoid activation function"""
        return 1/(1+np.exp(-z))
    
    def sigmoid_prime(self, z):
        """Derivative of activation function"""
        return self.sigmoid(z)*(1-self.sigmoid(z))
    
    def feedforward(self, x):
        """Coumputes the output using current weights and biases with given input x"""
        for w, b in zip(self.weights, self.biases):
            x = self.sigmoid(np.dot(w, x) + b)
            
        return x
    
    def MiniBatchGradDesc(self, training_data, epochs, batch_size, eta, test_data=None):
        """Performs Mini-batch Gradient Descent alghorithm using the training data"""
        
        number_of_td = len(training_data)
        
        for epoch in range(epochs):
            random.shuffle(training_data)
            mini_batches = [training_data[k:k+batch_size] for k in range(0, number_of_td, batch_size) if k+batch_size < number_of_td]

            for mini_batch in mini_batches:
                self.update_model_parameters(mini_batch, eta)

            if test_data:
                print(f"Epoch {epoch}: {self.evaluate(test_data)}/{len(test_data)}")
            else:
                print(f"Epoch {epoch}")
    
    def evaluate(self, test_data):
        """Evaluate the model performance usign test data"""
        test_results = [(np.argmax(self.feedforward(x)), y) for x, y in test_data]
        
        return sum(int(j==i) for i, j in test_results)
    
    def update_model_parameters(self, mini_batch, eta):
        """Update model parameters using single mini-batch and learning rate eta"""
        
        # Create a matrix of inputs and outputs to perform fast calculations using vectorizations
        x = np.concatenate(tuple(x for x, y in mini_batch), axis=1)
        y = np.concatenate(tuple(y for x, y in mini_batch), axis=1)
        
        nabla_b, nabla_w = self.backprop(x, y)
        
        # Update the weights and biases of model knowing the partial derivatives(nabla) of Cost function
        self.weights = [w-eta/len(mini_batch)*nw for w, nw in zip(self.weights, nabla_w)]
        self.biases = [b-eta/len(mini_batch)*nb for b, nb in zip(self.biases, nabla_b)]
    
    def backprop(self, x, y):
        """Perform backpropagation alghorithm for given inputs(x) and outputs(y)"""
        
        nabla_w = [np.zeros(w.shape) for w in self.weights]
        nabla_b = [np.zeros(b.shape) for b in self.biases]
        
        # Performing feedforward to store all activations and zs
        activation = x
        activations = [x]
        zs = []
        for w, b in zip(self.weights, self.biases):
            z = np.dot(w, activation) + b
            zs.append(z)
            activation = self.sigmoid(z)
            activations.append(activation)
        
        # Calculate delta, weights and biases for last layer
        delta = self.cost_derivative(activations[-1], y)*self.sigmoid_prime(zs[-1])
        # Example for batch_size=11, delta.shape=(10,11) activations[-2].shape=(30,11), 
        # then to have weights[-1].shape=(10, 30)
        nabla_w[-1] = np.dot(delta, activations[-2].transpose())
        # delta.shape=(10, 11), nabla_b[-1].shape should be (10, 1)
        # np.sum(delta, axis=1).shape=(10,)
        # np.sum(delta, axis=1)[:, np.newaxis].shape=(10,1)
        nabla_b[-1] = np.sum(delta, axis=1)[:, np.newaxis]
        
        # Backpropagate to compute delta, w, b for all other layers
        for l in range(2, self.layer_number):
            # delta_new.shape=(30,11), delta_old.shape=(10,11), weights[-1].shape(10, 30) 
            delta = np.dot(self.weights[-l+1].transpose(), delta)*self.sigmoid_prime(zs[-l])
            nabla_w[-l] = np.dot(delta, activations[-l-1].transpose())
            # [:, np.newaxis] adds extra dimension, thus make nabla_b a column vector
            nabla_b[-l] = np.sum(delta, axis=1)[:, np.newaxis]
            
        return nabla_b, nabla_w
        
    
    @staticmethod
    def cost_derivative(activations, y):
        """Derivative of mean squared error cost function"""
        
        return activations - y

In [73]:
np.random.seed(42)
net = NeuralNetwork([784, 30, 10])
net.MiniBatchGradDesc(training_data, 30, 10, 3.0, test_data=test_data)

Epoch 0: 9111/10000
Epoch 1: 9283/10000
Epoch 2: 9357/10000
Epoch 3: 9379/10000
Epoch 4: 9409/10000
Epoch 5: 9428/10000
Epoch 6: 9458/10000
Epoch 7: 9495/10000
Epoch 8: 9472/10000
Epoch 9: 9477/10000
Epoch 10: 9470/10000
Epoch 11: 9486/10000
Epoch 12: 9494/10000
Epoch 13: 9499/10000
Epoch 14: 9509/10000
Epoch 15: 9505/10000
Epoch 16: 9513/10000
Epoch 17: 9531/10000
Epoch 18: 9534/10000
Epoch 19: 9501/10000
Epoch 20: 9533/10000
Epoch 21: 9538/10000
Epoch 22: 9513/10000
Epoch 23: 9531/10000
Epoch 24: 9509/10000
Epoch 25: 9538/10000
Epoch 26: 9529/10000
Epoch 27: 9546/10000
Epoch 28: 9535/10000
Epoch 29: 9564/10000
