In [2]:
import numpy as np
import gzip
import os
from sklearn import preprocessing

In [3]:
class MnistDataLoader:
    def __init__(self, data_folder_path):
        self.data_folder_path = data_folder_path
        self.train_file_name = 'train-images-idx3-ubyte.gz'
        self.train_label_file_name = 'train-labels-idx1-ubyte.gz'
        self.test_file_name = 't10k-images-idx3-ubyte.gz'
        self.test_label_file_name = 't10k-labels-idx1-ubyte.gz'
        self.data = dict()
        self.size = 28
        self.color_channel = 1
        self.data_list = [
            'train_images',
            'train_labels',
            'test_images',
            'test_labels'
        ]

    def load_images(self, data_list_index, file_name):
        images = gzip.open(os.path.join(self.data_folder_path, file_name), 'rb')
        self.data[self.data_list[data_list_index]] = np.frombuffer(images.read(), dtype=np.uint8, offset=16).reshape(-1, self.size, self.size)
        self.data[self.data_list[data_list_index]] = self.data[self.data_list[data_list_index]].reshape(self.data[self.data_list[data_list_index]].shape[0], self.size, self.size, self.color_channel).astype(np.float32)

    def load_labels(self, data_list_index, file_name):
        labels = gzip.open(os.path.join(self.data_folder_path, file_name), 'rb')
        self.data[self.data_list[data_list_index]] = np.frombuffer(labels.read(), dtype=np.uint8, offset=8)
        self.data[self.data_list[data_list_index]].resize(self.data[self.data_list[data_list_index]].shape[0],1)

    def load_mnist(self):
        self.load_images(data_list_index=0, file_name=self.train_file_name)
        self.load_labels(data_list_index=1, file_name=self.train_label_file_name)
        self.load_images(data_list_index=2, file_name=self.test_file_name)
        self.load_labels(data_list_index=3, file_name=self.test_label_file_name)

        self.assert_data_shape()

    def assert_data_shape(self):
        assert self.data[self.data_list[0]].shape == (60000, 28, 28, 1)
        assert self.data[self.data_list[1]].shape == (60000, 1)
        assert self.data[self.data_list[2]].shape == (10000, 28, 28, 1)
        assert self.data[self.data_list[3]].shape == (10000, 1)

    def preprocess_data(self):

        self.data[self.data_list[0]] /= 255
        self.data[self.data_list[2]] /= 255

        self.data[self.data_list[1]] = Utility.one_hot_encode(self.data[self.data_list[1]])

        assert self.data[self.data_list[1]].shape == (60000, 10)


In [4]:
class Cifer10DataLoader:
    pass

In [57]:
class Convolution2D:
    def __init__(self, num_out_channel, filter_size, stride, padding_size):
        self.num_out_channel = num_out_channel
        self.filter_size = filter_size
        self.stride = stride
        self.padding_size = padding_size
        self.h_prev, self.w_prev, self.num_channel_prev = None, None, None
        self.h_new, self.w_new = None, None
        self.W = None
        self.b = None
        self.output_tensor = None
        self.cache = {}
        self.relu_activation = ReLUActivation()

    def initialize_output_dimensions(self, prev_layer_output_dim):
        """
        Initializes output dimensions with the dimension of the previous layers
        :param prev_layer_output_dim: output dimension of the layer immediately before this layer
        :return: None
        """
        self.h_prev, self.w_prev , self.num_channel_prev = prev_layer_output_dim
        self.h_new = (self.h_prev - self.filter_size + 2 * self.padding_size) // self.stride + 1
        self.w_new = (self.w_prev - self.filter_size + 2 * self.padding_size) // self.stride + 1
        self.output_tensor = np.zeros((self.h_new, self.w_new, self.num_out_channel))

    def initialize_weights_biases(self):
        """
        Initializes weights with the proper dimensions
        :return:
        """
        self.W = np.random.randn(self.filter_size, self.filter_size, self.num_channel_prev, self.num_out_channel)
        self.b = np.random.randn(1, 1, 1, self.num_out_channel)

    def forward(self, Z_prev, is_training):
        """
        Performs a forward operation of the convolution layer
        :param Z_prev: The activation of the previous layer
        :param is_training: whether we are in training mode or not
        :return:
        """

        assert Z_prev.shape == (self.h_prev, self.w_prev, self.num_channel_prev)
        Z_prev = np.array(Z_prev, copy=True)

        # create zero padded Z_prev
        Z_prev_padded = Utility.zero_pad(Z_prev, self.padding_size)

        # Apply convolution operation over this zero padded previous activation
        for row in range(self.h_new):

            row_start = row * self.stride

            for col in range(self.w_new):

                col_start = col *  self.stride

                for output_channel_index in range(self.num_out_channel):

                    Z_prev_windowed = Z_prev_padded[
                        row_start : row_start + self.filter_size,
                        col_start : col_start + self.filter_size,
                        :
                    ]

                    conv_step_W = self.W[:, :, :, output_channel_index]
                    conv_step_b = self.b[:, :, :, output_channel_index]

                    self.output_tensor[row, col, output_channel_index] = Utility.convolve_single_step(Z_prev_windowed, conv_step_W, conv_step_b)

        # asserting output shape
        assert(self.output_tensor.shape == (self.h_new, self.w_new, self.num_out_channel))

        if is_training:
            # cache some values
            pass

        # perform activation element wise in this case
        self.output_tensor = self.relu_activation.activation_f(self.output_tensor)

    def backward(self):
        pass

    def update_CNN_parameters(self, dW, db):
        self.W = self.W - dW
        self.b = self.b - db

    def print_layer_dimensions(self):
        print(f'Output Tensor Dimensions: {self.output_tensor.shape}')
        print(f'Weight Dimension: {self.W.shape}')
        print(f'Bias Dimension: {self.b.shape}')

    def get_output_dimension(self):
        return self.output_tensor.shape


In [58]:
class MaxPool:
    def __init__(self, filter_size, stride):
        self.filter_size = filter_size
        self.stride = stride
        self.h_prev, self.w_prev, self.num_channel_prev = None, None, None
        self.h_new, self.w_new, self.num_out_channel = None, None, None
        self.output_tensor = None
        self.cache = {}

    def initialize_max_pool_params(self, prev_layer_output_dim):
        """
        Initializes output dimensions with the dimension of the previous layers
        :param prev_layer_output_dim: output dimension of the layer immediately before this layer
        :return: None
        """
        self.h_prev, self.w_prev , self.num_channel_prev = prev_layer_output_dim
        self.h_new = int((self.h_prev - self.filter_size) / self.stride + 1)
        self.w_new = int((self.w_prev - self.filter_size) / self.stride + 1)
        self.num_out_channel = self.num_channel_prev
        print(self.h_new)
        print(self.w_new)
        print(self.num_out_channel)
        self.output_tensor = np.zeros((self.h_new, self.w_new, self.num_out_channel))

    def forward(self, Z_prev, is_training):

        assert Z_prev.shape == (self.h_prev, self.w_prev, self.num_channel_prev)
        Z_prev = np.array(Z_prev, copy=True)

        # Apply convolution operation over this zero padded previous activation
        for row in range(self.h_new):

            row_start = row * self.stride

            for col in range(self.w_new):

                col_start = col *  self.stride

                for output_channel_index in range(self.num_out_channel):

                    Z_prev_windowed = Z_prev[
                                      row_start : row_start + self.filter_size,
                                      col_start : col_start + self.filter_size,
                                      output_channel_index
                                      ]

                    self.output_tensor[row, col, output_channel_index] = Utility.get_max_pool_window(Z_prev_windowed)

        assert self.output_tensor.shape == (self.h_new, self.w_new, self.num_out_channel)
        if is_training:
            pass

    def print_layer_dimensions(self):
        print(f'Output Tensor Dimensions: {self.output_tensor.shape}')

    def backward(self):
        pass

    def get_output_dimension(self):
        return self.output_tensor.shape

In [None]:
class DenseLayer:
    def __init__(self, num_units, activation_obj):
        self.W = None
        self.b = None
        self.dW = None
        self.db = None
        self.num_units = num_units
        self.cache = {}
        self.output_tensor = None
        self.activation_obj = activation_obj

    def initialize_weights_biases(self, prev_flat_layer_output_dim):
        self.W = np.random.randn(self.num_units, prev_flat_layer_output_dim)
        self.b = np.random.randn(1, self.num_units)

    def forward(self, Z_prev):
        """

        :param Z_prev: tensor of shape (1, prev_flattened_shape)
        :return:
        """
        assert Z_prev.shape[1] == self.W.shape[1]

        Z_prev = np.array(Z_prev, copy=True)
        self.output_tensor = self.activation_obj.activation_f(np.dot(Z_prev, np.transpose(self.W)) + self.b)

        assert self.output_tensor.shape == (1, self.num_units)

    def backward(self):
        pass

    def get_output_dimension(self):
        return self.output_tensor.shape

    def print_layer_dimensions(self):
        print(f'Output Tensor Dimensions: {self.output_tensor.shape}')
        print(f'Weight Dimension: {self.W.shape}')
        print(f'Bias Dimension: {self.b.shape}')


In [48]:
class Utility:

    @staticmethod
    def one_hot_encode(y_true):
        # Define the One-hot Encoder
        ohe = preprocessing.OneHotEncoder()
        ohe.fit(y_true)
        y_true = ohe.transform(y_true).toarray()
        return y_true

    @staticmethod
    def zero_pad(tensor, pad_size):
        """
        :param tensor: tensor of shape (h, w, num_channel)
        :return: padded tensor of shape (h + 2 * pad_size, w + 2 * pad_size, num_channel)
        """
        return np.pad(tensor, ((pad_size, pad_size), (pad_size, pad_size), (0,0)), mode='constant', constant_values=0)

    @staticmethod
    def convolve_single_step(Z_prev_windowed, W, b):
        """
        :param Z_prev_windowed: window of shape (F, F, num_channel_Z_prev)
        :param W: kernel/filter/weight of shape (F, F, num_channel_Z_prev)
        :param b: bias term of shape (1, 1, 1)
        :return: scaler convolved value
        """
        return np.multiply(Z_prev_windowed, W).sum() + float(b)


    @staticmethod
    def get_max_pool_window(Z_prev_windowed):
        return Z_prev_windowed.max()

    def create_mini_batches(self):
        pass

In [56]:
class ReLUActivation:
    @staticmethod
    def activation_f(tensor):
        return np.max(tensor, 0)

    @staticmethod
    def d_relu(tensor):
        return np.where(tensor > 0, 1, 0)


In [50]:
class GradientDescent:
    pass

In [51]:
class Model:
    def __init__(self):
        pass

    def compile(self):
        pass

    def train(self):
        pass

    def predict(self):
        pass

In [52]:
mnist = MnistDataLoader('./dataset/mnist')
mnist.load_mnist()
mnist.preprocess_data()

In [12]:
mnist.data[mnist.data_list[1]][0]

array([0., 0., 0., 0., 0., 1., 0., 0., 0., 0.])

In [20]:
a = np.random.randn(3,3,3,5)
a[:,:,:,1].shape

(3, 3, 3)

In [33]:
np.random.seed(1)
A_prev = np.random.randn(5,7,4) # h, w, c of previous layer
cnn = Convolution2D(num_out_channel=8, filter_size=3, stride=2, padding_size=1)
cnn.initialize_output_dimensions(A_prev.shape)
cnn.initialize_weights_biases()
cnn.print_layer_dimensions()

Output Tensor Dimensions: (3, 4, 8)
Weight Dimension: (3, 3, 4, 8)
Bias Dimension: (1, 1, 1, 8)


In [35]:
cnn.forward(A_prev, is_training=True)

In [36]:
print("Z's mean =\n", np.mean(cnn.output_tensor))
print("Z[3,2,1] =\n", cnn.output_tensor[2,1])

Z's mean =
 0.7671068812283951
Z[3,2,1] =
 [-2.05981919  0.8841739   2.44456907 -4.05359906  5.20914997  1.40909525
  5.19845828  0.4675558 ]


In [54]:
# Case 1: stride of 1
np.random.seed(1)
A_prev = np.random.randn(5, 5, 3)
maxpool = MaxPool(filter_size=3, stride=1)
maxpool.initialize_max_pool_params(A_prev.shape)
maxpool.forward(A_prev, True)
maxpool.print_layer_dimensions()
print(maxpool.output_tensor)

3
3
3
Output Tensor Dimensions: (3, 3, 3)
[[[1.74481176 0.90159072 1.65980218]
  [1.74481176 1.46210794 1.65980218]
  [1.74481176 1.6924546  1.65980218]]

 [[1.14472371 0.90159072 2.10025514]
  [1.14472371 0.90159072 1.65980218]
  [1.14472371 1.6924546  1.65980218]]

 [[1.13162939 1.51981682 2.18557541]
  [1.13162939 1.51981682 2.18557541]
  [1.13162939 1.6924546  2.18557541]]]


In [None]:

hparameters = {"stride" : 1, "f": 3}

A, cache = pool_forward(A_prev, hparameters)
print("mode = max")
print("A.shape = " + str(A.shape))
print("A =\n", A)
print()
A, cache = pool_forward(A_prev, hparameters, mode = "average")
print("mode = average")
print("A.shape = " + str(A.shape))
print("A =\n", A)

In [60]:
a = np.array([1,23])
b = np.array(a, copy=True)
b == a

array([ True,  True])