In [5]:
from abc import ABCMeta, abstractmethod
from collections import deque

import numpy as np
import pandas as pd
from sklearn.datasets import fetch_mldata

In [7]:
mnist = fetch_mldata('MNIST original', data_home='../')
mnist.target = np.array(pd.get_dummies(mnist.target))

In [8]:
holdout_set_mask = np.array([i % 10 == 0 for i in range(len(mnist.data))])

X = mnist.data[~holdout_set_mask]
y = mnist.target[~holdout_set_mask]

In [9]:
class BaseTrainingBatchGenerator(metaclass=ABCMeta):
    
    def __init__(self, X, y, n_batches):
        np.random.shuffle(X)
        self.X = X
        self.y = y
        self.n_batches = n_batches
        
    @abstractmethod
    def __iter__(self):
        pass

In [10]:
class BaseLossFunction(metaclass=ABCMeta):
    
    @staticmethod
    @abstractmethod
    def loss_function(y_true, y_predicted):
        pass
    
    @staticmethod
    @abstractmethod
    def derivative_of_loss_function(y_true, y_predicted):
        pass

In [11]:
class BaseActivationFunction(metaclass=ABCMeta):
    
    @staticmethod
    @abstractmethod
    def activation_function(linear_combination):
        pass
    
    @abstractmethod
    def derivative_of_activation_function(linear_combination):
        pass

In [12]:
class BaseOptimizationAlgorithm(metaclass=ABCMeta):
    
    @abstractmethod
    def run(self):
        return 

In [13]:
class HoldoutData:
    
    def __init__(self, X, y):
        self.X = X
        self.y = y

In [14]:
class TrainingBatch:
    
    def __init__(self, X, y):
        self.X = X
        self.y = y

In [15]:
class MeanSquaredError(BaseLossFunction):
    
    @staticmethod
    def loss_function(y_true, y_predicted):
        return .5*(y_true - y_predicted)**2
    
    @staticmethod
    def derivative_of_loss_function(y_true, y_predicted):
        return y_predicted - y_true

In [16]:
class MiniBatchGenerator(BaseTrainingBatchGenerator):
        
    def __iter__(self):
        for batch_index in np.array_split(range(len(self.X)), self.n_batches):
            yield TrainingBatch(X=self.X[batch_index], y=self.y[batch_index])

In [17]:
class FullBatchGenerator(BaseTrainingBatchGenerator):
    
    pass

In [18]:
class SigmoidActivationFunction(BaseActivationFunction):
    
    @staticmethod
    def activation_function(linear_combination):
        return 1/(1 + np.exp(-linear_combination))

    @classmethod
    def derivative_of_activation_function(cls, linear_combination):
        return cls.activation_function(linear_combination) * (1 - cls.activation_function(linear_combination))

In [19]:
class GradientDescent(BaseOptimizationAlgorithm):
    
    def __init__(self, weight_matrices, bias_vectors, linear_combinations, activations, y, 
                 activation_function_class, loss_function_class, learning_rate):
        self.weight_matrices=weight_matrices
        self.bias_vectors=bias_vectors
        self.linear_combinations = linear_combinations
        self.activations=activations
        self.y=y
        self.activation_function_class = activation_function_class
        self.loss_function_class=loss_function_class
        self.learning_rate=learning_rate
        self.batch_size = len(self.y)
        
    def run(self):
        delta_matrices = self._compute_delta_matrices()
        updated_weight_matrices = self._update_weight_matrices(delta_matrices)
        updated_bias_vectors = self._update_bias_vectors(delta_matrices)
        return updated_weight_matrices, updated_bias_vectors
    
    def _compute_delta_matrices(self):
        output_layer_delta_matrix = self._compute_output_layer_delta_matrix()
        delta_matrices = deque([output_layer_delta_matrix])
        for linear_combination, weight_matrix in zip(reversed(self.linear_combinations[:-1]), reversed(self.weight_matrices)):
            delta_matrix = np.dot(delta_matrices[-1], weight_matrix.T) * \
                self.activation_function_class.derivative_of_activation_function(linear_combination)
            delta_matrices.appendleft(delta_matrix)
        return delta_matrices
    
    def _compute_output_layer_delta_matrix(self):
        return self.loss_function_class.derivative_of_loss_function(y_true=self.y, y_predicted=self.activations[-1]) * \
            self.activation_function_class.derivative_of_activation_function(self.linear_combinations[-1])
        
    def _update_weight_matrices(self, delta_matrices):
        weight_gradient_matrices = self._compute_weight_gradient_matrices(delta_matrices)
        return [weight_matrix + (-self.learning_rate*weight_gradient_matrix/self.batch_size) for weight_matrix, \
            weight_gradient_matrix in zip(self.weight_matrices, weight_gradient_matrices)]
    
    def _compute_weight_gradient_matrices(self, delta_matrices):
        weight_gradient_matrices = deque()
        for activation_matrix, delta_matrix in zip(reversed(self.activations[:-1]), reversed(delta_matrices)):
            weight_gradient_matrices.appendleft(np.dot(activation_matrix.T, delta_matrix))
        return weight_gradient_matrices
    
    def _update_bias_vectors(self, delta_matrices):
        bias_gradient_vectors = self._compute_bias_gradient_vectors(delta_matrices)
        return [bias_vector + (-self.learning_rate*bias_gradient_vector/self.batch_size) for bias_vector, \
            bias_gradient_vector in zip(self.bias_vectors, bias_gradient_vectors)]
    
    def _compute_bias_gradient_vectors(self, delta_matrices):
        return [delta_matrix.sum(axis=0) for delta_matrix in delta_matrices]

In [27]:
class VanillaNeuralNet:
    
    def __init__(self, layer_sizes, training_batch_generator_class, loss_function_class, activation_function_class, 
                 optimization_algorithm_class, learning_rate, n_epochs, n_batches_per_epoch, holdout_data):
        self.weight_matrices = self._initialize_weight_matrices(layer_sizes)
        self.bias_vectors = self._initialize_bias_vectors(layer_sizes)
        self.training_batch_generator_class = training_batch_generator_class
        self.loss_function_class = loss_function_class
        self.activation_function_class = activation_function_class
        self.optimization_algorithm_class = optimization_algorithm_class
        self.learning_rate = learning_rate
        self.n_epochs = n_epochs
        self.n_batches_per_epoch = n_batches_per_epoch
        self.holdout_data = holdout_data
        
    def fit(self, X, y):
        for epoch in range(self.n_epochs):
            print('Epoch {}'.format(epoch))
            training_batch_generator = self.training_batch_generator_class(X=X, y=y, n_batches=self.n_batches_per_epoch)
            
            for training_batch in training_batch_generator:
                self._update_weights_and_biases(training_batch)
            self._validate_on_holdout_set()

    def predict(self, X):
        linear_combination_matrices, activations = self._feed_forward(X)
        return activations[-1]
    
    def _update_weights_and_biases(self, training_batch):
        linear_combinations, activations = self._feed_forward(training_batch.X)
        self._back_propagate(linear_combinations=linear_combinations, activations=activations, y=training_batch.y)

    def _feed_forward(self, X):
        activation_matrices = [X]
        linear_combination_matrices = []
        for weight_matrix, bias_vector in zip(self.weight_matrices, self.bias_vectors):
            linear_combination = np.dot(activation_matrices[-1], weight_matrix) + bias_vector
            linear_combination_matrices.append(linear_combination)
            activation_matrix = self.activation_function_class.activation_function(linear_combination)
            activation_matrices.append(activation_matrix)
        return linear_combination_matrices, activation_matrices
        
    def _back_propagate(self, linear_combinations, activations, y):
        self.weight_matrices, self.bias_vectors = self.optimization_algorithm_class(
            weight_matrices=self.weight_matrices,
            bias_vectors=self.bias_vectors,
            linear_combinations=linear_combinations,
            activations=activations,
            y=y,
            activation_function_class=self.activation_function_class,
            loss_function_class=self.loss_function_class,
            learning_rate=self.learning_rate
        ).run()
        
    def _validate_on_holdout_set(self):
        holdout_predictions = self.predict(self.holdout_data.X)
        holdout_error = self.loss_function_class.loss_function(
            y_true=self.holdout_data.y,
            y_predicted=holdout_predictions
        ).mean()
        print('Holdout error: {}'.format(np.round(holdout_error, 5)))
        
    @staticmethod
    def _initialize_weight_matrices(layer_sizes):
        return [np.random.randn(layer_size, next_layer_size) for layer_size, next_layer_size \
                in zip(layer_sizes[:-1], layer_sizes[1:])]
    
    @staticmethod
    def _initialize_bias_vectors(layer_sizes):
        return [np.random.randn(layer_size) for layer_size in layer_sizes[1:]]

In [28]:
HIDDEN_LAYER_SIZE = 100
LEARNING_RATE = .05
N_EPOCHS = 100
N_BATCHES_PER_EPOCH = 20

In [29]:
LAYER_SIZES = [mnist.data.shape[1], HIDDEN_LAYER_SIZE, mnist.target.shape[1]]

In [30]:
vanilla_neural_net = VanillaNeuralNet(
    layer_sizes=LAYER_SIZES,
    training_batch_generator_class=MiniBatchGenerator,
    loss_function_class=MeanSquaredError,
    activation_function_class=SigmoidActivationFunction,
    optimization_algorithm_class=GradientDescent,
    learning_rate=LEARNING_RATE,
    n_epochs=N_EPOCHS,
    n_batches_per_epoch=N_BATCHES_PER_EPOCH,
    holdout_data=HoldoutData(X=mnist.data[holdout_set_mask], y=mnist.target[holdout_set_mask])
)

In [31]:
vanilla_neural_net.fit(X, y)

Epoch 0
Holdout error: 0.17125
Epoch 1
Holdout error: 0.14791
Epoch 2
Holdout error: 0.12846
Epoch 3
Holdout error: 0.11248
Epoch 4
Holdout error: 0.10003
Epoch 5
Holdout error: 0.09062
Epoch 6
Holdout error: 0.08319
Epoch 7
Holdout error: 0.0774
Epoch 8
Holdout error: 0.07299
Epoch 9
Holdout error: 0.06954
Epoch 10
Holdout error: 0.06682
Epoch 11
Holdout error: 0.06467
Epoch 12
Holdout error: 0.06298
Epoch 13
Holdout error: 0.06152
Epoch 14
Holdout error: 0.06033
Epoch 15
Holdout error: 0.05935
Epoch 16
Holdout error: 0.05854
Epoch 17
Holdout error: 0.05783
Epoch 18
Holdout error: 0.05721
Epoch 19
Holdout error: 0.05667
Epoch 20
Holdout error: 0.05618
Epoch 21
Holdout error: 0.05575
Epoch 22
Holdout error: 0.05538
Epoch 23
Holdout error: 0.05507
Epoch 24
Holdout error: 0.05477
Epoch 25
Holdout error: 0.05452
Epoch 26
Holdout error: 0.05428
Epoch 27
Holdout error: 0.05409
Epoch 28
Holdout error: 0.0539
Epoch 29
Holdout error: 0.05373
Epoch 30
Holdout error: 0.05358
Epoch 31
Holdout err

KeyboardInterrupt: 