In [81]:
from typing import Tuple, List
import gzip
import pickle
from numpy import ndarray
import numpy as np
from scipy.special import softmax as s_softmax

In [82]:


def load_data() -> Tuple[Tuple[ndarray, ndarray], Tuple[ndarray, ndarray], Tuple[ndarray, ndarray]]:
    with gzip.open("mnist.pkl.gz", "rb") as fd:
        train_set, valid_set, test_set = pickle.load(fd, encoding="latin")
    return train_set, valid_set, test_set


g_train, g_valid, g_test = load_data()

In [83]:
def get_one_hot_labels(labels: ndarray) -> ndarray:
    one_hot_labels = np.full((labels.shape[0], 10), 0)
    for i in range(labels.shape[0]):
        one_hot_labels[i, labels[i]] = 1
    return one_hot_labels


get_one_hot_labels(g_train[1])

array([[0, 0, 0, ..., 0, 0, 0],
       [1, 0, 0, ..., 0, 0, 0],
       [0, 0, 0, ..., 0, 0, 0],
       ...,
       [0, 0, 0, ..., 0, 1, 0],
       [0, 0, 0, ..., 0, 0, 0],
       [0, 0, 0, ..., 0, 1, 0]])

In [84]:
def sigmoid(x: ndarray) -> ndarray:
    return 1.0 / (1.0 + np.exp(-x))


def sigmoid_prime(x: ndarray) -> ndarray:
    return sigmoid(x) * (1 - sigmoid(x))

def softmax(x: ndarray) -> ndarray:
    return s_softmax(x, axis=1)

In [85]:
def unison_shuffle(a: ndarray, b: ndarray) -> Tuple[ndarray, ndarray]:
    """

    :return: unison shuffle using numpy advanced array indexing
    """
    assert len(a) == len(b)
    p = np.random.permutation(len(a))
    return a[p], b[p]

unison_shuffle(g_train[0], g_train[1])

(array([[0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.],
        ...,
        [0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.]], dtype=float32),
 array([1, 4, 6, ..., 0, 2, 3], dtype=int64))

In [86]:
def blur_image(image: ndarray) -> ndarray:
    """

    :param image: ndarray(28, 28)
    :return: box-blurred image
    """
    copy = np.array(image, copy=True)
    dx = [-1, -1, -1, 0, 1, 1, 1, 0]
    dy = [-1, 0, 1, 1, 1, 0, -1, -1]
    for i in range(copy.shape[0]):
        for j in range(copy.shape[1]):
            s = copy[i, j] * 8
            for ii, jj in zip(dx, dy):
                x = i + ii
                y = j + jj
                if 0 <= x < copy.shape[0] and 0 <= y < copy.shape[1]:
                    s += copy[x, y]
            s /= 16
            copy[i, j] = s
    return copy

In [87]:
def augment_data(data: Tuple[ndarray, ndarray]) -> Tuple[ndarray, ndarray]:
    labels = data[1]
    data = data[0]
    blurred_set = np.array([blur_image(image.reshape([28, 28])).reshape((784, )) for image in data])
    return np.concatenate((data, blurred_set), axis=0), np.concatenate((labels, labels), axis=0)

# TODO: use some library to rotate or flip the images
# tf.image.flip_left_right
# tf.image.random_brightness
# data_augmentation = tf.keras.Sequential([
#      layers.experimental.preprocessing.RandomFlip("horizontal_and_vertical"),
#      layers.experimental.preprocessing.RandomRotation(0.2)])


In [None]:
class Network(object):
    def __init__(self, layers: List[int]):
        self.layers = layers
        self.weights = self.init_weights()

        self.iterations = 10
        self.learning_rate = 0.005
        self.mini_batch_size = 200
        self.dropout = False
        self.dropout_rate = 0.5
        self.augment = False
        self.activations = [sigmoid, softmax]

    def init_weights(self):
        """
        initialization is done by generating random numbers with the mean=loc=0, and sd=scale=sqrt(weights
        that enter the neuron), and size is a matrix having all weights for each neuron on the same layer

        :return:a list of matrices of weights for each intermediary layer
        """
        return [
            np.random.normal(loc=0, scale=(1 / np.sqrt(self.layers[i])), size=(self.layers[i] + 1, self.layers[i + 1]))
            for i in range(len(self.layers) - 1)
        ]

    def init_args(self, **kwargs):
        if 'iterations' in kwargs:
            self.iterations = kwargs['iterations']
        if 'learning_rate' in kwargs:
            self.learning_rate = kwargs['learning_rate']
        if 'mini_batch_size' in kwargs:
            self.mini_batch_size = kwargs['mini_batch_size']
        if 'dropout' in kwargs:
            self.dropout = kwargs['dropout']
        if 'dropout_rate' in kwargs:
            self.dropout_rate = kwargs['dropout_rate']
        if 'augment' in kwargs:
            self.augment = kwargs['augment']
        if 'activations' in kwargs:
            self.activations = kwargs['activations']

        # TODO: also add L2 and momentum
        # TODO: try a version with RMSProp

    def train(self, training_set: Tuple[ndarray, ndarray], validation_set: ndarray = None, **kwargs):
        self.init_args(**kwargs)
        if validation_set is None:
            validation_set = training_set
        if self.augment:
            training_set = augment_data(training_set)
        data = training_set[0]
        labels = get_one_hot_labels(training_set[1])
        for i in range(self.iterations):
            print(f"Iteration {i}: {round(self.evaluate(validation_set), 2)}% on validation")
            data, labels = unison_shuffle(data, labels)
            mini_batches = [
                (data[mini_batch:mini_batch + self.mini_batch_size],
                 labels[mini_batch: mini_batch + self.mini_batch_size])
                for mini_batch in range(0, labels.shape[0], self.mini_batch_size)
            ]
            for data, labels in mini_batches:
                self.train_mini_batch(data, labels)


    def train_mini_batch(self, data: ndarray, labels: ndarray) -> None:
        """
        feeds forward and saves activations and raw values
        then back propagates the error and adjusts weights

        :param data: images
        :param labels: labels
        """
        raw_values, activated_values = self.feed_forward(data)
        delta_weights = self.back_prop(labels, raw_values, activated_values)
        for i in range(len(self.weights)):
            self.weights[i] -= self.learning_rate * delta_weights[i]

    def feed_forward(self, data: ndarray, evaluate: bool = False) -> Tuple[List[ndarray], List[ndarray]]:
        """

        :param data: images
        :param evaluate: if true than no fancy operations are done
        :return: raw and activated values
        """
        raw_values = []
        activated_values = []

        for i in range(len(self.layers) - 1):
            bias = np.ones((data.shape[0], 1))
            data = np.c_[bias, data]
            activated_values.append(data)

            z = np.dot(data, self.weights[i])

            data = self.activations[i](z)

            if not evaluate and self.dropout and i != len(self.layers) - 2:
                dropout_arr = np.random.choice([0, 1], z.shape, p=[self.dropout_rate, 1 - self.dropout_rate])
                z = np.multiply(dropout_arr * z, 1 / (1 - self.dropout_rate))

            raw_values.append(z)


        activated_values.append(data)
        return raw_values, activated_values

    def back_prop(self, labels: ndarray, raw_values: List[ndarray], activated_values: List[ndarray]) -> List[ndarray]:
        """

        :param labels: the correct labels
        :param raw_values: the raw values predicted by the model on each layer
        :param activated_values: the activation of the said raw values
        :return: a list of modifications to be done to the weights
        """
        delta_weights = [0 for _ in self.weights]
        delta = activated_values[-1] - labels
        delta_weights[-1] = np.dot(activated_values[-2].T, delta)
        for layer in range(2, len(self.layers)):
            z = sigmoid_prime(raw_values[- layer]).T
            delta = np.delete(np.dot(self.weights[- layer + 1], delta.T), 0, 0) * z  # do we need to eliminate row or column??
            delta_weights[- layer] = np.dot(activated_values[-layer - 1].T, delta.T)
        return delta_weights


    def evaluate(self, data: Tuple[ndarray, ndarray]) -> float:
        predictions_ok = 0
        for i in range(data[0].shape[0]):
            raw_values, activated_values = self.feed_forward(np.array([data[0][i]]), evaluate = True)
            # raw_values = raw_values[-1]
            activated_values = activated_values[-1]
            if np.argmax(activated_values) == data[1][i]:
                predictions_ok += 1
        return predictions_ok / data[0].shape[0] * 100

x = Network([784, 100, 10])
x.train(g_train, iterations=50, dropout=True, augment=False)
x.evaluate(g_test)

Iteration 0: 9.1% on validation
Iteration 1: 90.94% on validation
Iteration 2: 90.91% on validation
Iteration 3: 90.86% on validation
Iteration 4: 90.84% on validation
Iteration 5: 90.83% on validation
Iteration 6: 90.75% on validation
