In [15]:
import numpy as np
from sklearn.utils import shuffle

In [109]:
class ConvLayer:
    def __init__(self, num_filters, padding, stride, filter_size):
        self.num_filters = num_filters
        self.padding = padding
        self.stride = stride
        self.filter_size = filter_size
    def _initialize_vars(self, input_size):
        n_examples, h_prev, w_prev, c_prev = input_size
        if self.padding == 'same':
            n_h = h_prev
            n_w = w_prev
            self.pad = True
            self.pad_val_h_l = ((n_h - 1)*self.stride + self.filter_size - n_h)//2
            self.pad_val_h_r = ((n_h - 1)*self.stride + self.filter_size - n_h) - self.pad_val_h_l
            self.pad_val_w_l = ((n_w - 1)*self.stride + self.filter_size - n_w)//2
            self.pad_val_w_r = ((n_w - 1)*self.stride + self.filter_size - n_w) - self.pad_val_w_l
            #print("Same Padding Vals!", self.pad_val_h_l, self.pad_val_h_r)
        else:
            n_h = (h_prev - self.filter_size) // self.stride + 1
            n_w = (w_prev - self.filter_size) // self.stride + 1
            self.pad = False
        n_c = self.num_filters
        self.kernels = np.random.randn(self.num_filters, self.filter_size*self.filter_size*c_prev)
        self.op_size = (n_examples, n_h, n_w, n_c)
        self.bias = np.zeros(self.num_filters)
        self.update_vars = [self.kernels, self.bias]
    def forward(self, input_vol):
        #print(input_vol.shape)
        n_ex = input_vol.shape[0]
        self.n_ex = n_ex
        self.n_c_prev = input_vol.shape[3]
        _, n_h, n_w, n_c = self.op_size
        self.op_vol = np.zeros((n_ex, n_h, n_w, n_c))
        n_h_prev = input_vol.shape[1]
        n_w_prev = input_vol.shape[2]
        self.n_h_prev = n_h_prev
        self.n_w_prev = n_w_prev
        if self.pad == True:
            input_vol = np.pad(input_vol, ((0,0),(self.pad_val_h_l, self.pad_val_h_r),(self.pad_val_w_l, self.pad_val_w_r),(0,0)),'constant', constant_values = 0)
            n_h_prev += (self.pad_val_h_l + self.pad_val_h_r)
            n_w_prev += (self.pad_val_w_l + self.pad_val_w_r)
            #print("Same Padding!", n_h_prev, n_w_prev, "Shape:", input_vol.shape)
        self.n_h_prev_pad = n_h_prev
        self.n_w_prev_pad = n_w_prev
        num_patches_w = (n_w_prev - self.filter_size) // self.stride + 1
        num_patches_h = (n_h_prev - self.filter_size) // self.stride + 1
        #print(num_patches_w,num_patches_h)
        self.im2col_cache = np.zeros((n_ex, num_patches_h*num_patches_w, self.filter_size*self.filter_size*input_vol.shape[3]))
        for example_num in range(n_ex):
            self.im2col_cache[example_num] = im2col(input_vol[example_num, :, :, :], self.filter_size, self.stride)
            #print(self.im2col_cache[example_num].shape)
            for h in range(n_h):
                for w in range(n_w):
                    curr_slice = self.im2col_cache[example_num,(n_h*h)+w]
                    for c in range(n_c):
                        self.op_vol[example_num, h, w, c] = np.dot(curr_slice, self.kernels[c]) + self.bias[c]
        return self.op_vol
    def backward(self, da_next):
        da_prev = np.zeros((self.n_ex, self.n_h_prev_pad, self.n_w_prev_pad, self.n_c_prev))
        d_kernel = np.zeros(self.kernels.shape)
        d_bias = np.zeros(self.bias.shape)
        for example_num in range(self.n_ex):
            for h in range(da_next.shape[1]):
                for w in range(da_next.shape[2]):
                    for c in range(da_next.shape[3]):
                        d_kernel[c] += da_next[example_num, h, w, c]*self.im2col_cache[example_num, h*da_next.shape[1]+w,:]
                        da_prev[example_num, h*self.stride:h*self.stride+self.filter_size, w*self.stride:w*self.stride+self.filter_size, :] += (da_next[example_num, h, w, c]*(self.kernels[c].reshape((self.filter_size, self.filter_size, self.n_c_prev))))
        d_bias += np.sum(np.sum(np.sum(da_next, axis=2, keepdims = True),axis=1, keepdims = True),axis=0, keepdims = True).reshape(d_bias.shape)
        #print(da_prev)
        #raise ValueError("Wait!")
        if self.pad == True:
            return [da_prev[:, self.pad_val_h_l:-self.pad_val_h_r, self.pad_val_w_l: -self.pad_val_w_r, :], d_kernel, d_bias]
        else:
            return [da_prev, d_kernel, d_bias]
    def update(self, update_list):
        if len(update_list) > 2:
            raise ValueError("Check number of parameters")
        self.kernels -= update_list[0]
        self.bias -= update_list[1]

In [72]:
def im2col(array, filter_size, stride):
    
    h, w, c = array.shape
    #print(array.shape)
    num_patches_w = (w - filter_size) // stride + 1
    num_patches_h = (h - filter_size) // stride + 1
    col = np.zeros((num_patches_h*num_patches_w, filter_size*filter_size*c))
    row_index = 0
    for i in range(num_patches_h):
        col_index = 0
        for j in range(num_patches_w):
            col[num_patches_h*i+j] = np.copy(array[row_index:row_index+filter_size, col_index:col_index+filter_size,:].reshape(-1))
            col_index += stride
        row_index += stride
    return col

In [15]:
C1 = ConvLayer(10, 'same', 1, 3)

In [16]:
C1._initialize_vars((None, 5, 5, 3))

In [18]:
ip_vol = np.random.randn(4,5,5,3)

In [19]:
C1.forward(ip_vol).shape

(4, 5, 5, 10)

In [25]:
da = np.random.randn(4,5,5,10)

In [196]:
class Activation:
    def __init__(self, name = 'sigmoid', leaky_relu_rate = None):
        self.act = name
        self.leaky_relu_rate = leaky_relu_rate
    def _initialize_vars(self, input_size):
        self.op_size = input_size
        self.update_vars = []
    def forward(self, prev_layer):
        if self.act == 'sigmoid':
            self.op = 1.0 / (1.0 + np.exp(-prev_layer))
        elif self.act == 'relu':
            self.op = np.maximum(0, prev_layer)
            self.op_mask = prev_layer>0
        elif activation == 'leaky_relu':
            self.op = np.maximum(prev_layer*self.leaky_relu_rate, prev_layer)
            self.op_mask = prev_layer>0 + ((prev_layer<0)*self.leaky_relu_rate)
        elif activation == 'tanh':
            self.op = np.tanh(prev_layer)
        else:
            raise ValueError("Invalid Activation String")
        return self.op
    def backward(self, da_next):
        if self.act == 'sigmoid':
            self.da_prev = self.op * (1-self.op) * da_next
        elif self.act == 'relu':
            self.da_prev = self.op_mask * da_next
        elif activation == 'leaky_relu':
            self.da_prev = self.op_mask * da_next
        elif activation == 'tanh':
            self.da_prev = 1 - np.square(self.op)
        return [self.da_prev]

In [195]:
class MaxPool:
    def __init__(self, padding, stride, pool_size):
        self.padding = padding
        self.stride = stride
        self.pool_size = pool_size
    def _initialize_vars(self, input_size):
        n_ex, n_h_prev, n_w_prev, n_c = input_size
        if self.padding == 'same':
            n_h = n_h_prev
            n_w = n_w_prev
            self.pad = True
            self.pad_val_h = ((n_h - 1)*self.stride + self.pool_size - n_h)//2
            self.pad_val_w = ((n_w - 1)*self.stride + self.pool_size - n_w)//2
        else:
            n_h = (n_h_prev - self.pool_size)//self.stride + 1
            n_w = (n_w_prev - self.pool_size)//self.stride + 1
            self.pad = False
        self.op_size = (n_ex, n_h, n_w, n_c)
        self.update_vars = []
    def forward(self, input_vol):
        n_ex, n_h_prev, n_w_prev, n_c = input_vol.shape
        self.n_ex = n_ex
        self.n_c = n_c
        _, n_h, n_w, _ = self.op_size
        if self.pad == True:
            input_vol = np.pad(input_vol, ((0,0),(self.pad_val_h, self.pad_val_h),(self.pad_val_w, self.pad_val_w),(0,0)), 'constant', constant_values=0)
        self.op_vol = np.zeros((n_ex, n_h, n_w, n_c))
        self.ip_mask = np.zeros(input_vol.shape)
        for e in range(n_ex):
            for c in range(n_c):
                for h in range(n_h):
                    for w in range(n_w):
                        self.op_vol[e,h,w,c] = np.max(input_vol[e,h*self.stride:h*self.stride+self.pool_size,w*self.stride:w*self.stride+self.pool_size,c])
                        mask = input_vol[e,h*self.stride:h*self.stride+self.pool_size,w*self.stride:w*self.stride+self.pool_size,c] == self.op_vol[e,h,w,c]
                        self.ip_mask[e,h*self.stride:h*self.stride+self.pool_size,w*self.stride:w*self.stride+self.pool_size,c] += mask                
        return self.op_vol
    def backward(self, da_next):
        da_prev = self.ip_mask
        for e in range(da_next.shape[0]):
            for h in range(da_next.shape[1]):
                for w in range(da_next.shape[2]):
                    for c in range(da_next.shape[3]):
                        da_prev[e,h*self.stride:h*self.stride+self.pool_size,w*self.stride:w*self.stride+self.pool_size,c] *= da_next[e,h,w,c]
        if self.pad == True:
            return [da_prev[:,self.pad_val_h:-self.pad_val_h, self.pad_val_w:-self.pad_val_w,:]]
        else:
            return [da_prev]

In [32]:
M1 = MaxPool('valid', 2, 2)

In [33]:
M1._initialize_vars((None,5,5,3))

In [28]:
M1.forward(ip_vol).shape

(4, 2, 2, 3)

In [34]:
A1 = Activation('relu')

In [35]:
A1._initialize_vars((None, 5,5,3))

In [37]:
A1.forward(ip_vol).shape

(4, 5, 5, 3)

In [49]:
M1.backward(ip_vol)

array([[[[0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
         [0.00000000e+00, 5.27213575e-02, 0.00000000e+00],
         [0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
         [7.95585519e-01, 7.18981675e-01, 0.00000000e+00],
         [0.00000000e+00, 0.00000000e+00, 0.00000000e+00]],

        [[8.62311303e-04, 0.00000000e+00, 0.00000000e+00],
         [0.00000000e+00, 0.00000000e+00, 1.09911683e+00],
         [0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
         [0.00000000e+00, 0.00000000e+00, 1.04639377e-01],
         [0.00000000e+00, 0.00000000e+00, 0.00000000e+00]],

        [[0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
         [3.73687427e-01, 5.12400596e-01, 1.28319781e-02],
         [0.00000000e+00, 0.00000000e+00, 1.26615232e+00],
         [1.03785935e-04, 2.88910079e-02, 0.00000000e+00],
         [0.00000000e+00, 0.00000000e+00, 0.00000000e+00]],

        [[0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
         [0.00000000e+00, 0.00000000e+00, 0.000000

In [33]:
A1.backward(ip_vol).shape

(4, 5, 5, 3)

In [259]:
class Flatten:
    def __init__(self):
        pass
    def _initialize_vars(self, input_size):
        n_ex, n_h, n_w, n_c = input_size
        self.op_size = (n_ex, n_h*n_w*n_c)
        self.update_vars = []
    def forward(self, input_vol):
        self.prev_shape = input_vol.shape
        n_ex, n_h, n_w, n_c = input_vol.shape
        op_vol = np.reshape(input_vol,(n_ex, n_h*n_w*n_c))
        return op_vol
    def backward(self, da_next):
        da_prev = da_next.reshape(self.prev_shape)
        #print("Flatten:",da_prev.shape)
        return [da_prev]

In [40]:
F1 = Flatten()

In [41]:
F1._initialize_vars(ip_vol.shape)

In [155]:
F1.backward(F1.forward(ip_vol)).shape

(4, 5, 5, 3)

In [41]:
class AvgPool:
    def __init__(self, padding, stride, pool_size):
        self.padding = padding
        self.stride = stride
        self.pool_size = pool_size
    def _initialize_vars(self, input_size):
        n_ex, n_h_prev, n_w_prev, n_c = input_size
        if self.padding == 'same':
            n_h = n_h_prev
            n_w = n_w_prev
            self.pad = True
            self.pad_val_h = ((n_h - 1)*self.stride + self.pool_size - n_h)//2
            self.pad_val_w = ((n_w - 1)*self.stride + self.pool_size - n_w)//2
        else:
            n_h = (n_h_prev - self.pool_size)//self.stride + 1
            n_w = (n_w_prev - self.pool_size)//self.stride + 1
            self.pad = False
        self.op_size = (n_ex, n_h, n_w, n_c)
        self.update_vars = []
    def forward(self, input_vol):
        n_ex, n_h_prev, n_w_prev, n_c = input_vol.shape
        self.n_ex = n_ex
        self.n_c = n_c
        _, n_h, n_w, _ = self.op_size
        if self.pad == True:
            input_vol = np.pad(input_vol, ((0,0),(self.pad_val_h, self.pad_val_h),(self.pad_val_w, self.pad_val_w),(0,0)), 'constant', constant_values=0)
        self.op_vol = np.zeros((n_ex, n_h, n_w, n_c))
        self.ip_mask = np.ones(input_vol.shape)/(pool_size*pool_size)
        for e in range(n_ex):
            for c in range(n_c):
                for h in range(n_h):
                    for w in range(n_w):
                        self.op_vol[e,h,w,c] = np.average(input_vol[e,h*self.stride:h*self.stride+self.pool_size,w*self.stride:w*self.stride+self.pool_size,c])
                        mask = input_vol[e,h*self.stride:h*self.stride+self.pool_size,w*self.stride:w*self.stride+self.pool_size,c] == self.op_vol[e,h,w,c]
                        self.ip_mask[e,h*self.stride:h*self.stride+self.pool_size,w*self.stride:w*self.stride+self.pool_size,c] += mask                
        return self.op_vol
    def backward(self, da_next):
        da_prev = self.ip_mask
        for e in range(da_next.shape[0]):
            for h in range(da_next.shape[1]):
                for w in range(da_next.shape[2]):
                    for c in range(da_next.shape[3]):
                        da_prev[e,h*self.stride:h*self.stride+self.pool_size,w*self.stride:w*self.stride+self.pool_size,c] *= da_next[e,h,w,c]
        if self.pad == True:
            return [da_prev[:,self.pad_val_h:-self.pad_val_h, self.pad_val_w:-self.pad_val_w,:]]
        else:
            return [da_prev]

In [193]:
class FullyConnected:
    def __init__(self, num_units):
        self.num_units = num_units
    def _initialize_vars(self, input_shape):
        self.ip_shape = input_shape
        n_ex, n_prev = input_shape
        self.weights = np.random.randn(n_prev, self.num_units)
        self.biases = np.zeros((1, self.num_units))
        self.op_size = (n_ex, self.num_units)
        self.update_vars = [self.weights, self.biases]
    def forward(self, input_vol):
        self.input_vol = input_vol
        op = np.dot(self.input_vol, self.weights) + self.biases
        return op
    def backward(self, da_next):
        d_weights = np.dot(self.input_vol.T, da_next)
        d_biases = np.sum(da_next, axis = 0, keepdims=True)
        da_prev = np.dot(da_next, self.weights.T)
        return [da_prev, d_weights, d_biases]
    def update(self, update_list):
        if len(update_list) > 2:
            raise ValueError("Check number of parameters")
        self.weights -= update_list[0]
        self.biases -= update_list[1]

In [49]:
FC1 = FullyConnected(45)

In [50]:
FC1._initialize_vars((None, 20))

In [51]:
FC1.forward(np.random.rand(15,20)).shape

(15, 45)

In [52]:
FC1.backward(np.random.rand(15, 45))[2].shape

(1, 45)

In [143]:
class Model:
    def __init__(self, input_shape):
        self.layers = []
        self.updates = []
        self.update_vals = []
        self.update_layers = []
        self.last_shape = input_shape
    def add(self, layer):
        self.layers.append(layer)
        layer._initialize_vars(self.last_shape)
        self.last_shape = layer.op_size
        self.update_layers.append(len(layer.update_vars))
    def summary(self):
        for layer in self.layers:
            layer.summary()
    def train(self, x_data, y_data, batch_size = 16, num_epochs = 100, loss = "mse", optimizer = "SGD", learning_rate = 0.000001, shuffle = False, momentum = 0.9, print_period = 1):
        num_batches = y_data.shape[0] // batch_size
        for epoch in range(num_epochs):
            if shuffle:
                x_data, y_data = shuffle(x_data, y_data)
            for batch_num in range(num_batches + 1):
                if batch_num != num_batches:
                    X = x_data[batch_num*batch_size:batch_num*batch_size+batch_size]
                    Y = y_data[batch_num*batch_size:batch_num*batch_size+batch_size]
                else:
                    if batch_num*batch_size == y_data.shape[0]:
                        continue
                    else:
                        X = x_data[batch_num*batch_size:]
                        Y = y_data[batch_num*batch_size:]
                data_feed = X
                for layer in self.layers:
                    data_feed = layer.forward(data_feed)
                if loss == 'mse':
                    cost = np.mean(np.square((data_feed - Y)), axis = 0)/2
                    d_cost = (data_feed - Y)/ Y.shape[0]
                elif loss == 'crossentropy':
                    cost = -np.sum(Y*np.log(data_feed) + (1-Y)*np.log(1-data_feed))
                    d_cost = -Y/data_feed + (1-Y)/(1-data_feed)
                current_deriv = d_cost
                for layer in reversed(self.layers):
                    #print(current_deriv.shape)
                    items = layer.backward(current_deriv)
                    for index, item in enumerate(items):
                        if index == 0:
                            current_deriv = item
                            #print(layer)
                        else:
                            self.update_vals.append(item)
                if optimizer == 'SGD':
                    self.updates = [learning_rate*u for u in self.update_vals]
                elif optimizer == 'SGD with Momentum':
                    pass
                param_count = 0
                num_layers = len(self.layers) - 1
                for index, layer in enumerate(reversed(self.layers)):
                    num_params_for_layer = self.update_layers[num_layers - index]
                    if num_params_for_layer > 0:
                        layer.update(self.updates[param_count:param_count+num_params_for_layer])
                    param_count += num_params_for_layer
            if epoch % print_period == 0:
                data_feed = x_data
                for layer in self.layers:
                    data_feed = layer.forward(data_feed)
                error = np.mean(np.square(data_feed - y_data))
                accuracy = np.sum(np.equal(data_feed,y_data))/y_data.shape[0]
                print("Epoch {}: Error: {} Accuracy: {}".format(epoch, error, accuracy))

In [144]:
x = np.random.rand(20, 10, 10, 3)
y = np.random.rand(20, 1)

In [145]:
m2 = Model((None, 5,5,3))

In [146]:
m2.add(ConvLayer(5,'same',1,2))

In [147]:
m2.add(ConvLayer(1,'same',1,5))

In [148]:
m2.last_shape

(None, 5, 5, 1)

In [149]:
m2.train(np.random.rand(10,5,5,3),np.random.rand(10,5,5,1),batch_size=6)

Epoch 0: Error: 179.68235558616266 Accuracy: 0.0
Epoch 1: Error: 177.62121920328306 Accuracy: 0.0
Epoch 2: Error: 175.58294768737576 Accuracy: 0.0
Epoch 3: Error: 173.56744764481346 Accuracy: 0.0
Epoch 4: Error: 171.57462583206643 Accuracy: 0.0
Epoch 5: Error: 169.60438915570268 Accuracy: 0.0
Epoch 6: Error: 167.65664467238764 Accuracy: 0.0
Epoch 7: Error: 165.73129958888453 Accuracy: 0.0
Epoch 8: Error: 163.82826126205416 Accuracy: 0.0
Epoch 9: Error: 161.9474371988549 Accuracy: 0.0
Epoch 10: Error: 160.08873505634278 Accuracy: 0.0
Epoch 11: Error: 158.25206264167153 Accuracy: 0.0
Epoch 12: Error: 156.43732791209231 Accuracy: 0.0
Epoch 13: Error: 154.64443897495417 Accuracy: 0.0
Epoch 14: Error: 152.8733040877035 Accuracy: 0.0
Epoch 15: Error: 151.1238316578845 Accuracy: 0.0
Epoch 16: Error: 149.39593024313893 Accuracy: 0.0
Epoch 17: Error: 147.68950855120622 Accuracy: 0.0
Epoch 18: Error: 146.00447543992337 Accuracy: 0.0
Epoch 19: Error: 144.34073991722497 Accuracy: 0.0
Epoch 20: Err