Convolution neural networks : Densely neural networks learn relationships between data to predict output correctly. In images, we have some additional properties which can help us design neural networks in a different better way. for example, there are spatial relations like a 3X3 patch of adjacent pixels provide more information than 9 randomly selected pixels. 

Usage of 3X3 or 5X5 filters as pattern detectors is a well known idea in computer vision. For example we could use a filter with negative value in middle and positive values in the sides to detect edges and similar ones for corners, horizontal lines etc. by taking these filters and convolving with the input image, we can determine which patch of image is sensitive to these filters and thus has corners, edges etc. the output of this convolution operation is usually called the feature map. 

In deep learning, we let the model determine these filters based on the objective. 

in convolutional neural networks, we have f sets of such features per pixel. So, if we have n pixels as input, if we have 3X3 weight matrix and convolve , we have n feature maps [one per image patch in input]. we also have f such 3x3 [or 5X5] weight matrices called convolutional filters. and number of such filters is usually called channels --> Multichannel convolution

Once we have these feature maps, to finally have the class probabilities for let us say digits, we use the flatten layer to squash the channels together and have a matrix multiplication with out dimension as num_classes and use the softmax cross entropy loss as discussed in previous chapter

Max pooling : we don't the entire image dimension at each layer of CNN. instead , we downsize the feature map dimension and increase the number of filters at each layer. this is done using pooling layers. \
pooling takes a bunch of pixels like 2X2 patch and finds the max or average of the values and puts it in the output. so, the feature map size reduces from nxn to n/2 x n/2 - a 4 times reduction. \
the main advantage of pooling is computational . it reduces the number of training parameters thereby reducing the memory footprint of the model. Also, fewer computations are needed and this is compounded as number of layers increase. \
But, the model loses 1/4 the information at each layer . the performance on benchmarks is reasonably good and thus it was used before in many models due to computational efficiency. 

To maintain the output shape of convolution of input with filter to be the same as input shape, we need to pad the input of size filter_size/2. \
We will take a simple 1D case and code the convolution up and scale that to bigger dimension

In [1]:
import numpy as np

In [2]:
input1D = np.array([1,2,3,4,5])
param1D = np.array([1,1,1])



In [3]:
def _pad_1D(inp : np.ndarray, num : int) -> np.ndarray :
    z = np.array([0])
    z = np.repeat(z,num)
    return np.concatenate((z,inp,z))


In [4]:
inp1D_padded = _pad_1D(input1D, param1D.shape[0]/2)

In [5]:
inp1D_padded

array([0, 1, 2, 3, 4, 5, 0])

In [11]:
def conv1d(inp : np.ndarray, param : np.ndarray) -> np.ndarray :
    assert(len(inp.shape) == 1)
    assert(len(param.shape) == 1) # for 1d conv

    out = np.zeros(shape=inp.shape)

    param_len = param.shape[0]
    param_mid = param_len // 2
    inp_pad = _pad_1D(inp, param_mid)

    for o in range(out.shape[0]):
        for p in range(param_len):
            out[o] += (param[p] * inp_pad[o+p])

    return out

In [12]:
conv1d(input1D, param1D)

array([ 3.,  6.,  9., 12.,  9.])

a basic convolution has been coded up! there is another hyperparamter. Pooling is one way of scaling down the feature map size. it loses 1/4 th the amout of information for 2X2 pooling. there is also another way of reducing the computational load without pooling. it is by increasing the stride. it is the jump in sliding window each time convolution happens. 

backpropagation of gradients is slightly tricky for convolution. we will find it out for 1d case and then scale it up to 2d and more channels case

In [13]:
def conv1d_sum(inp : np.ndarray, param : np.ndarray) -> np.ndarray :
    out = conv1d(inp, param)
    return np.sum(out)



There is a pattern to calculate the gradients. assuming $\frac{\partial{(L)}}{\partial{(o_i)}}$ is available, we see that : 
$\frac{\partial{(L)}}{\partial{(i_i)}} = \frac{\partial{(L)}}{\partial{(o_j)}} \times \frac{\partial{(o_j)}}{\partial{(i_i)}}$ for j = 1...n [in our case 5]

this boils down to case, partial derivate of L with respect to i is partial derivative of L with respect to o multiplied by w, with weight indices increasing and partial derivative of o decreasing. To code this up in loop, we introduce padding to output grad -> this because sometimes there are cases where we need to multiply only two of the three weights especially for $i_1$ and $i_5$ corner ones

In [14]:
def input_grad_1D(output_grad : np.ndarray, inp : np.ndarray, param : np.ndarray) -> np.ndarray : 
    param_len = param.shape[0]
    param_mid = param_len // 2
    inp_pad = _pad_1D(inp, param_mid)

    if output_grad is None:
        output_grad = np.ones_like(inp)
    else:
        assert(inp.shape == output_grad.shape)

    output_grad_pad = _pad_1D(output_grad)

    input_grad = np.zeros_like(inp)

    for o in range(output_grad.shape[0]):
        for p in range(param.shape[0]):
            input_grad[o] += param[p] * output_grad_pad[o+param_len - p - 1]
    
    return input_grad


    


In [16]:
def param_grad_1D(output_grad : np.ndarray, param : np.ndarray, inp : np.ndarray) -> np.ndarray:
    assert(output_grad.shape == inp.shape)
    param_len = param.shape[0]
    param_mid = param_len // 2

    param_grad = np.zeros_like(param)
    input_pad = _pad_1D(inp)

    for o in range(output_grad.shape[0]):
        for p in range(param_len):
            param_grad[p] += input_pad[o+p] * output_grad[o]
    
    return param_grad



adding a layer of complexity : batches \
now we have input of shape (batches, input) -> 2D numpy array. need to do convolution for these inputs and find gradients. \
Example : input = np.array([[0,1,2,3,4,],[5,6,7,8,9]]) 2X5 array

In [22]:
def _pad_1d_batch(inp : np.ndarray, num : int) -> np.ndarray : 
    outs = [_pad_1D(obs,num) for obs in inp]
    return np.stack(outs)

In [17]:
def conv1d_batch(inp : np.ndarray, param : np.ndarray) -> np.ndarray : 
    outs = [conv1d(obs,param) for obs in inp]
    return np.stack(outs)

In [24]:
input_1d_batch = np.array([[1,2,3,4,5],[6,7,8,9,10]])

In [27]:
input_padded = _pad_1d_batch(input_1d_batch, 1)

In [29]:
conv1d_batch(input_1d_batch, param1D)

array([[ 3.,  6.,  9., 12.,  9.],
       [13., 21., 24., 27., 19.]])

In [30]:
# backpropagation for input with batches

def input_grad_1D_batch(output_grad : np.ndarray, inp : np.ndarray, param : np.ndarray) -> np.ndarray : 
    batch_size = output_grad.shape[0]
    grads = [input_grad_1D(output_grad[i], inp[i], param) for i in range(batch_size)]
    return np.stack(grads)

In [31]:
def param_grad_1D_batch(output_grad : np.ndarray, inp : np.ndarray, param : np.ndarray) -> np.ndarray : 
    param_grad = np.zeros_like(param)
    assert(inp.shape == output_grad.shape)
    inp_pad = _pad_1d_batch(inp)
    for i in range(inp.shape[0]):
        for o in range(inp.shape[1]):
            for p in range(param.shape):
                param_grad[p] += inp_pad[i][o+p] * output_grad[i][o]
    
    return param_grad

the next step is to move on to 2D convolutions from 1D convolutions. we are not processing batches for now. that will be added after 2D convolution discussion

In [32]:
imgs_2d = np.random.randn(3,28,28) # three channel img like RGB

param_2d = np.random.randn(3,3) # 3X3 filter



In [33]:
def _pad_2D_obs(inp : np.ndarray, num : int) -> np.ndarray : 
   inp_pad = _pad_1d_batch(inp,num)
   border = np.zeros((num,inp.shape[0]+num*2))
   return np.concatenate([border, inp_pad, border])


In [34]:
def _pad_2D(inp : np.ndarray, num : int) -> np.ndarray : 
    z = [_pad_2D_obs(obs, num) for obs in inp]
    return np.stack(z)

In [35]:
_pad_2D(imgs_2d, 1).shape

(3, 30, 30)

forward pass for 2d convolution

In [36]:
def compute_2d_output_obs(inp : np.ndarray, param : np.ndarray) -> np.ndarray:
    param_mid = param.shape[0] // 2
    inp_pad = _pad_2D_obs(inp, param_mid)
    out = np.zeros_like(inp)
    for o_h in range(inp.shape[0]):
        for o_w in range(inp.shape[1]):
            for p_h in range(param.shape[0]):
                for p_w in range(param.shape[1]):
                    out[o_h][o_w] += inp_pad[o_h + p_h][o_w + p_w] * param[p_h][p_w]
    
    return out

In [38]:
def compute_2d_output(inp : np.ndarray , param : np.ndarray) : 
    outs = [compute_2d_output_obs(img,param) for img in inp]
    return np.stack(outs)

In [39]:
compute_2d_output(imgs_2d, param_2d).shape

(3, 28, 28)

In [40]:
def input_grads_2d(output_grad : np.ndarray, inp: np.ndarray, param : np.ndarray) -> np.ndarray:
    outs = []
    for i in range(output_grad.shape[0]):
        param_size = param.shape[0]
        output_grad_obs = _pad_2D_obs(output_grad[i], param_size//2)
        input_grads = np.zeros_like(inp[i])
        for i_h in range(inp[i].shape[0]):
            for i_w in range(inp[i].shape[1]):
                for p_h in range(param.shape[0]):
                    for p_w in range(param.shape[1]):
                        input_grads[i_h][i_w] += param[p_h][p_w] * output_grad_obs[i_h + param_size - p_h-1][i_w + param_size - p_w - 1]
        outs.append(input_grads)
    return np.stack(outs)




In [41]:
def param_2d_grad(output_grad : np.ndarray, param : np.ndarray, inp : np.ndarray) -> np.ndarray:
    param_grad = np.zeros_like(param)

    channels = inp.shape[0]
    param_size = param.shape[0]
    input_pad = _pad_2D(inp, param_size//2)
    for i in range(channels):
        for i_h in range(inp.shape[1]):
            for i_w in range(inp.shape[2]):
                for p_h in range(param.shape[0]):
                    for p_w in range(param.shape[1]):
                        param_grad[p_h][p_w] += output_grad[i][i_h][i_w] * input_pad[i][i_h+p_h][i_w+p_w]
    
    return param_grad



In [42]:
# verify shapes
input_2d_grad = input_grads_2d(np.ones_like(imgs_2d), imgs_2d, param_2d)
input_2d_grad.shape

(3, 28, 28)

In [43]:
input_2d_grad.flatten()[:5]

array([-2.81081701, -5.34547063, -5.34547063, -5.34547063, -5.34547063])

In [44]:
param_grad_2d = param_2d_grad(np.ones_like(imgs_2d), param_2d, imgs_2d)
param_grad_2d.shape

(3, 3)

In [45]:
param_grad_2d

array([[ -4.96823183, -13.83603079, -16.73461795],
       [ -2.86856529, -13.73812317, -18.16716972],
       [-18.72853249, -29.22406312, -33.57882624]])

there is one final step as far as convolution operation is concerned -> adding batches to 2d and channels [till now we had one feature map], now multiple feature maps!

In [46]:
# input shape : batch_size, channel, height, width
# param shape : in_channel, out_channel, filter_height, filter_width

def _pad_2d_channel(inp : np.ndarray , num : int) -> np.ndarray :
    outs = [_pad_2D_obs(channel,num) for channel in inp]
    return np.stack(outs)

In [47]:
def _pad_conv_input(inp : np.ndarray, num : int) -> np.ndarray : 
    outs = [_pad_2d_channel(obs,num) for obs in inp]
    return np.stack(outs)

In [48]:
#forward pass
def compute_output_obs(inp : np.ndarray, param : np.ndarray) -> np.ndarray :
    assert(len(inp.shape) == 3)
    assert(len(param.shape) == 4) 
    param_size = param.shape[2]
    param_mid = param_size // 2
    obs_pad = _pad_2d_channel(inp,param_mid)

    in_channels = param.shape[0]
    out_channels = param.shape[1]

    img_size = inp.shape[1]

    out = np.zeros((out_channels, inp.shape[1],inp.shape[2]))

    for c_out in range(out_channels):
        for c_in in range(in_channels):
            for i_h in range(inp.shape[1]):
                for i_w in range(inp.shape[2]):
                    for p_h in range(param.shape[2]):
                        for p_w in range(param.shape[3]):
                            out[c_out][i_h][i_w] += obs_pad[c_in][i_h + p_h][i_w + p_w] * param[c_in][c_out][p_h][p_w]
    return out





In [50]:
def compute_output(inp : np.ndarray, param : np.ndarray) -> np.ndarray :
    outs = [compute_output_obs(obs, param) for obs in inp]
    return np.stack(outs)

In [52]:
def _compute_input_grads(input_obs : np.ndarray, output_grad_obs : np.ndarray, param : np.ndarray) -> np.ndarray : 
    input_grad = np.zeros_like(input_obs)
    param_size = param.shape[2]
    param_mid = param_size // 2
    img_size = input_obs.shape[1]
    in_channels = input_obs.shape[0]
    out_channels = param.shape[1]
    output_obs_pad = _pad_2d_channel(output_grad_obs, param_mid)

    for c_in in range(in_channels):
        for c_out in range(out_channels):
            for i_h in range(input_obs.shape[1]):
                for i_w in range(input_obs.shape[2]):
                    for p_h in range(param.shape[2]):
                        for p_w in range(param.shape[3]):
                            input_grad[c_in][i_h][i_w] += output_obs_pad[c_out][i_h + param_size - p_h-1][i_w + param_size - p_w - 1] * param[c_in][p_h][p_w]
    return input_grad





In [51]:
#Backward pass

def _input_grad(output_grad : np.ndarray, inp : np.ndarray, param : np.ndarray) -> np.ndarray :
    grads = [_compute_input_grads(inp[i], output_grad[i],param) for i in range(output_grad.shape[0])]
    return np.stack(grads)



In [53]:
def _param_grad(inp : np.ndarray, output_grad : np.ndarray, param : np.ndarray) :
    param_grad = np.zeros_like(param)
    param_size = param.shape[2]
    param_mid = param_size // 2
    img_size = inp.shape[2]
    in_channels = inp.shape[1]
    out_channels = output_grad.shape[1]

    inp_pad = _pad_conv_input(inp, param_mid)
    img_shape = output_grad.shape[2:]

    for i in range(inp.shape[0]):
        for c_in in range(in_channels):
            for c_out in range(out_channels):
                for o_h in range(img_shape[0]):
                    for o_w in range(img_shape[1]):
                        for p_h in range(param_size):
                            for p_w in range(param_size):
                                param_grad[c_in][c_out][p_h][p_w] += output_grad[i][c_out][o_h][o_w] * inp_pad[i][c_in][o_h + p_h][o_w + p_w]
    
    return param_grad


Flatten layer needs to be written. to use these convolution operations and train a model, having for loops is inefficient.instead we have to write vectorized numpy code. We will write a conv2d class inheriting from Layer class from the previous notebook.Also , we will use other classes like Operation, Neural Network, Trainer etc from that notebook.

In [67]:
from typing import Callable, List, Tuple, Dict
from copy import deepcopy

In [68]:
class Loss(object):
    '''
    Loss function of the neural network
    '''
    def __init__(self):
        pass

    def forward(self, prediction : np.ndarray, target : np.ndarray) -> float :
        assert(prediction.shape == target.shape)
        self.prediction = prediction
        self.target = target
        loss_value = self._output()
        return loss_value
    
    def backward(self) -> np.ndarray :
        self.input_grad = self._input_grad()
        assert(self.input_grad.shape == self.prediction.shape)
        return self.input_grad
    
    def _output(self) -> float:
        raise NotImplementedError()
    
    def _input_grad(self) -> np.ndarray:
        raise NotImplementedError()


# Mean squared loss : a subclass of Loss class

class MeanSquaredLoss(Loss):
    def __init__(self):
        super().__init__()
    
    def _output(self) -> float:
        return np.sum(np.power((self.prediction-self.target),2)) / self.prediction.shape[0]
    
    def _input_grad(self) -> np.ndarray:
        return 2.0 * (self.prediction-self.target) / self.prediction.shape[0]
    

In [69]:
# the next step is to implement learning rate decay
# this is incorporated inside the optimizer
class Optimizer2(object):
    def __init__(self, initial_lr : float = 0.01, final_lr : float = 0., decay_type : str = "exponential"):
        self.lr = initial_lr
        self.final_lr = final_lr
        self.decay_type = decay_type
        self.first = True
    def step(self):
        pass

    def _setup_decay(self) -> None:
        if not self.decay_type:
            return
        elif self.decay_type == "exponential":
            self.decay_per_epoch = np.power(self.final_lr/self.lr, 1.0/(self.max_epochs-1))
        
        elif self.decay_type == "linear":
            self.decay_per_epoch = (self.lr - self.final_lr) / (self.max_epochs - 1)
    
    def _decay_lr(self) -> None : 
        if not self.decay_type:
            return
        elif self.decay_type == "exponential":
            self.lr *= self.decay_per_epoch
        elif self.decay_type == "linear":
            self.lr -= self.decay_per_epoch
        

class SGD2(Optimizer2):
    def __init__(self, lr : float = 0.001, final_lr : float = 0., decay_type : str = "exponential"):
        super().__init__(lr, final_lr, decay_type)

    
    def step(self) -> None:
        for (param,param_grad) in zip(self.net.params(), self.net.param_grads()):
            param -= self.lr * param_grad

In [70]:
class SGDMomentum2(Optimizer2):
    def __init__(
        self, lr: float = 0.01, final_lr : float = 0. , decay_type : str = "exponential", momentum: float = 0.9
    ) -> None:
        super().__init__(lr, final_lr,decay_type)
        self.momentum = momentum

    def step(self) -> None:
        if self.first:
            self.velocities = [np.zeros_like(param) for param in self.net.params()]
            self.first = False

        for param, param_grad, velocity in zip(
            self.net.params(), self.net.param_grads(), self.velocities
        ):
            self._update_rule(param=param, grad=param_grad, velocity=velocity)

    def _update_rule(self, **kwargs) -> None:

        # Update velocity
        kwargs["velocity"] *= self.momentum
        kwargs["velocity"] += self.lr * kwargs["grad"]

        # Use this to update parameters
        kwargs["param"] -= kwargs["velocity"]

In [72]:
#helper functions
def permute_data(X, y):
    perm = np.random.permutation(X.shape[0])
    return X[perm], y[perm]

def mae(preds: np.ndarray, actuals: np.ndarray):
    '''
    Compute mean absolute error.
    '''
    return np.mean(np.abs(preds - actuals))

def rmse(preds: np.ndarray, actuals: np.ndarray):
    '''
    Compute root mean squared error.
    '''
    return np.sqrt(np.mean(np.power(preds - actuals, 2)))

In [73]:
class Operation_final(object):
    '''
    Base class for an operation in a neural network
    '''
    def __init__(self):
        pass
    
    def forward(self, input_ : np.ndarray, inference : bool = False):
        
        '''
        Stores input in the self.input attribute. 
        store output of forward computation is self.output attribute
        '''
        self.input_ = input_

        self.output = self._output(inference)
        return self.output
    
    def backward(self, output_grad : np.ndarray) -> np.ndarray:
        '''
        Calls the self._input_grad() function
        '''
        assert(self.output.shape == output_grad.shape)
        self.input_grad = self._input_grad(output_grad)
        assert(self.input_grad.shape == self.input_.shape)
        return self.input_grad
    
    def _output(self, inference : bool = False) -> np.ndarray:
        '''
        the output method must be defined for each operation
        '''
        raise NotImplementedError()
    
    def _input_grad(self,output_grad : np.ndarray) -> np.ndarray:
        '''
        the input_grad method must be defined for each operation
        '''
        raise NotImplementedError()
    
class ParamOperation_final(Operation_final):
    def __init__(self,param : np.ndarray) -> np.ndarray:
        super().__init__()
        self.param = param
    
    def backward(self, output_grad : np.ndarray) -> np.ndarray:
        assert(self.output.shape == output_grad.shape)
    
        self.input_grad = self._input_grad(output_grad)
        self.param_grad = self._param_grad(output_grad)
        assert(self.input_grad.shape == self.input_.shape)
        assert(self.param_grad.shape == self.param.shape)

        return self.input_grad
    
    def _param_grad(self,output_grad : np.ndarray) -> np.ndarray :
        raise NotImplementedError()
    
class weightMultiply_final(ParamOperation_final):
    def __init__(self, W : np.ndarray):
        super().__init__(W)
    
    def _output(self, inference : bool = False) -> np.ndarray :
        return np.dot(self.input_, self.param)
        
    def _input_grad(self,output_grad : np.ndarray) -> np.ndarray:
        return np.dot(output_grad, np.transpose(self.param, (1,0)))
    
    def _param_grad(self,output_grad : np.ndarray) -> np.ndarray:
        return np.dot(np.transpose(self.input_,(1,0)), output_grad)

# Addition of bias term
class BiasAdd_final(ParamOperation_final):
    def __init__(self,B:np.ndarray):
        assert B.shape[0] == 1
        super().__init__(B)
    
    def _output(self, inference : bool = False):
        return self.input_ + self.param
    
    def _input_grad(self, output_grad : np.ndarray) -> np.ndarray :
        return np.ones_like(self.input_) *  output_grad
    
    def _param_grad(self, output_grad : np.ndarray) -> np.ndarray :
        param_grad = np.ones_like(self.param) * output_grad
        return np.sum(param_grad, axis=0).reshape(1, self.param.shape[1])

# sigmoid activation layer

class Sigmoid_final(Operation_final):
    def __init__(self) -> None:
        super().__init__()
    
    def _output(self, inference : bool = False) -> np.ndarray:
        return (1.0/(1.0+np.exp(-1.0 * self.input_)))
    
    def _input_grad(self, output_grad : np.ndarray) -> np.ndarray:
        sigmoid_backward = self.output * (1-self.output) # derivative of sigmoid(x) = sigmoid(x) * (1-sigmoid(x))
        return (sigmoid_backward * output_grad)

class Linear_final(Operation_final):
    def __init__(self) -> None :
        super().__init__()
    
    def _output(self, inference : bool = False) -> np.ndarray:
        return self.input_
    
    def _input_grad(self, output_grad:np.ndarray) -> np.ndarray:
        return output_grad
    
class Tanh_final(Operation_final):
    def __init__(self) -> None:
        super().__init__()
    
    def _output(self, inference : bool = False) -> np.ndarray:
        return np.tanh(self.input_)
    
    def _input_grad(self, output_grad : np.ndarray) -> np.ndarray:
        tanh_backward = 1 - self.output * self.output # derivative of tanh(x) = (1-tanh(x)^2)
        return (tanh_backward * output_grad)
    
class Dropout(Operation_final):
    def __init__(self, keep_prob : float = 0.8):
        super().__init__()
        self.keep_prob = keep_prob
    
    def _output(self, inference : bool = False) -> np.ndarray:
        if inference : 
            return self.input_ * self.keep_prob
        else:
            self.mask = np.random.binomial(1, self.keep_prob, size=self.input_.shape)
            return self.input_ * self.mask 
    
    def _input_grad(self,output_grad : np.ndarray) -> np.ndarray:
        return output_grad * self.mask
    
# abstract layer class
class Layer_final(object):
    def __init__(self, neurons : int, dropout : float = 1.) :
        self.first = True # first layer or not
        self.neurons = neurons
        self.params : List[np.ndarray] = []
        self.param_grads : List[np.ndarray] = []
        self.operations : List[Operation_final] = []
        self.dropout = dropout


    
    def _setup_layer(self, input_ : np.ndarray) -> None:
        raise NotImplementedError() # to be filled in derived class
    
    def forward(self, input_ : np.ndarray, inference : bool = False) -> np.ndarray : 
        if self.first:
            self._setup_layer(input_)
            self.first = False
            
        self.input_ = input_
        
        for operation in self.operations: 
            
            input_ = operation.forward(input_, inference)
        
        self.output = input_
        return self.output
    
    def backward(self, output_grad : np.ndarray) -> np.ndarray :
        assert(self.output.shape == output_grad.shape)

        for operation in reversed(self.operations):
            output_grad = operation.backward(output_grad)
        
        input_grad = output_grad
        self._param_grads()
        return input_grad
    
    def _param_grads(self):
        self.param_grads = []
        for operation in self.operations:
            if issubclass(operation.__class__, ParamOperation_final):
                self.param_grads.append(operation.param_grad)
            
    def _params(self):
        self.params = []
        for operation in self.operations:
            if issubclass(operation.__class__, ParamOperation_final):
                self.params.append(operation.param)

class Dense_final(Layer_final):
    def __init__(self, neurons : int, activation : Operation_final = Sigmoid_final(), weight_init = "glorot", dropout : float = 1.) -> None:
        super().__init__(neurons, dropout)
        self.activation = activation
        self.weight_init = weight_init


    
    def _setup_layer(self, input_:np.ndarray) -> None:
        if self.seed:
            np.random.seed(self.seed)

        if self.weight_init == "glorot":
            scale = 2 / (input_.shape[1] + self.neurons)
        else:
            scale = 1.0
        
        self.params = []

        self.params.append(scale * np.random.randn(input_.shape[1],self.neurons))
        self.params.append(np.random.randn(1,self.neurons))
        self.operations = [weightMultiply_final(self.params[0]),
                           BiasAdd_final(self.params[1]),
                           self.activation]
        if self.dropout < 1.:
            self.operations.append(Dropout(self.dropout))

        
        return None 
    
# neural network class
class NeuralNetwork_final(object):
   def __init__(self,layers : List[Layer_final],
                loss : Loss, seed : float = 1):
      self.layers = layers
      self.loss = loss
      self.seed = seed
      for layer in self.layers:
         setattr(layer, "seed", self.seed)
    
   def forward(self, x_batch : np.ndarray, inference : bool = False) -> np.ndarray:
      x_out = x_batch

      for layer in self.layers:
         x_out = layer.forward(x_out,inference)
      
      return x_out

   def backward(self, loss_grad : np.ndarray) -> None:
      grad = loss_grad
      for layer in reversed(self.layers):
         grad = layer.backward(grad)
   
   def train_batch(self, X_batch : np.ndarray, y_batch : np.ndarray) -> float:
      predictions = self.forward(X_batch)
      loss = self.loss.forward(predictions, y_batch)
      loss_grad = self.loss.backward()
      self.backward(loss_grad)
      return loss
   
   def params(self):
      for layer in self.layers:
         yield from layer.params # get the parameters from the generator instead of writing a loop and getting the params for the batch

   def param_grads(self):
      for layer in self.layers:
         yield from layer.param_grads
         
class Trainer_final(object):
    def __init__(self, net : NeuralNetwork_final, optim : Optimizer2) -> None:
        self.net = net
        self.optim = optim
        self.best_loss = np.inf # initial value for best loss
        setattr(self.optim, "net",self.net)

    
    def generate_batches(self,X:np.ndarray, y:np.ndarray, batch_size : int = 32) -> Tuple[np.ndarray] :
        assert X.shape[0] == y.shape[0] # shape check
        N = X.shape[0]
        for i in range(0,N,batch_size):
            X_batch, y_batch = X[i:i+batch_size], y[i:i+batch_size]
            yield X_batch, y_batch
    
    def fit(self, X_train : np.ndarray, y_train : np.ndarray, 
            X_test : np.ndarray, y_test : np.ndarray, batch_size : int = 32,
            epochs : int = 100, eval_every : int = 100, seed : int = 42, restart : bool  = True) -> None:
        np.random.seed(seed)
        setattr(self.optim, "max_epochs", epochs)
        self.optim._setup_decay()
        if restart:
            for layer in self.net.layers:
                layer.first = True
            
            self.best_loss = np.inf
        
        for i in range(epochs):
            if (i+1)%eval_every == 0:
                last_model = deepcopy(self.net)
            
            X_train, y_train = permute_data(X_train, y_train)
            batch_generator = self.generate_batches(X_train, y_train)
            for ii, (X_batch, y_batch) in enumerate(batch_generator):
                self.net.train_batch(X_batch, y_batch)
                self.optim.step()
            if self.optim.final_lr:
                self.optim._decay_lr()
            
            if (i+1)%eval_every == 0:
                test_preds = self.net.forward(X_test, inference = True)
                loss = self.net.loss.forward(test_preds, y_test)

                if loss < self.best_loss:
                    print(f"Validation loss after {i+1} epochs is {loss:.3f}")
                    self.best_loss = loss
                else:
                    self.net = last_model
                    print(f"""Loss increased after epoch {i+1}, final loss was {self.best_loss:.3f}, using the model from epoch {i+1-eval_every}""")
                    setattr(self.optim, "net", self.net)
                    break



In [149]:
# Convolution Layer class
class conv2D(Layer_final):
    def __init__(self,out_channels : int, param_size : int, 
                 dropout : int = 1., weight_init : str = "glorot", 
                 activation : Operation_final = Linear_final(), flatten : bool = False):
        super().__init__(out_channels, dropout)
        self.param_size = param_size
        self.activation = activation
        self.flatten = flatten
        self.weight_init = weight_init
        self.out_channels = out_channels
    
    def _setup_layer(self,input : np.ndarray) -> None :
        in_channels = input.shape[1]
        if self.weight_init:
            scale = 2 / (in_channels + self.out_channels)
        else:
            scale = 1
        
        conv_param = np.random.normal(
            loc = 0, scale = scale, size = (in_channels, self.out_channels, self.param_size, self.param_size )

        )
        self.params.append(conv_param)
        self.operations = []
        self.operations.append(Conv2d_op(conv_param))
        self.operations.append(self.activation)

        if self.flatten:
            self.operations.append(Flatten())
        
        if self.dropout < 1. :
            self.operations.append(Dropout(self.dropout))


        

        


        


In [136]:
class Flatten(Operation_final):
    def __init__(self):
        super().__init__()
    
    def _output(self, inference : bool = False) -> np.ndarray:
        return self.input_.reshape(self.input_.shape[0],-1)
    
    def _input_grad(self, output_grad : np.ndarray) -> np.ndarray : 
        return output_grad.reshape(self.input_.shape)


In [102]:
from scipy import special
def unnormalize(a : np.ndarray):
    return a[np.newaxis,0]

def normalize(a : np.ndarray) -> np.ndarray :
    other = 1-a
    return np.concatenate([a,1-a], axis=1)

def softmax(x: np.ndarray, axis=None) -> np.ndarray:
    return np.exp(x - special.logsumexp(x, axis=axis, keepdims=True))
# Softmax cross entropy class
class SoftmaxCrossEntropy(Loss):
    def __init__(self, eps: float = 1e-9) -> None:
        super().__init__()
        self.eps = eps
        self.single_class = False

    def _output(self) -> float:

        # if the network is just outputting probabilities
        # of just belonging to one class:
        if self.target.shape[1] == 0:
            self.single_class = True

        # if "single_class", apply the "normalize" operation defined above:
        if self.single_class:
            self.prediction, self.target = normalize(self.prediction), normalize(
                self.target
            )

        # applying the softmax function to each row (observation)
        softmax_preds = softmax(self.prediction, axis=1)

        # clipping the softmax output to prevent numeric instability
        self.softmax_preds = np.clip(softmax_preds, self.eps, 1 - self.eps)

        # actual loss computation
        softmax_cross_entropy_loss = -1.0 * self.target * np.log(self.softmax_preds) - (
            1.0 - self.target
        ) * np.log(1 - self.softmax_preds)

        return np.sum(softmax_cross_entropy_loss) / self.prediction.shape[0]

    def _input_grad(self) -> np.ndarray:

        # if "single_class", "un-normalize" probabilities before returning gradient:
        if self.single_class:
            return unnormalize(self.softmax_preds - self.target)
        else:
            return (self.softmax_preds - self.target) / self.prediction.shape[0]

In [96]:
param_pad = 3//2
param_size = 3
def _pad_1d(inp: np.ndarray) -> np.ndarray:
        z = np.array([0])
        z = np.repeat(z, param_pad)
        return np.concatenate([z, inp, z])

def _pad_1d_batch(inp: np.ndarray) -> np.ndarray:
        outs = [_pad_1d(obs) for obs in inp]
        return np.stack(outs)

def _pad_2d_obs(inp: np.ndarray):
        """
        Input is a 2 dimensional, square, 2D Tensor
        """
        inp_pad = _pad_1d_batch(inp)

        other = np.zeros((param_pad, inp.shape[0] + param_pad * 2))

        return np.concatenate([other, inp_pad, other])

def _pad_2d_channel(inp: np.ndarray):
        """
        inp has dimension [num_channels, image_width, image_height]
        """
        return np.stack([_pad_2d_obs(channel) for channel in inp])

def _get_image_patches(input_: np.ndarray):
        imgs_batch_pad = np.stack([_pad_2d_channel(obs) for obs in input_])
        patches = []
        img_height = imgs_batch_pad.shape[2]
        for h in range(img_height - param_size + 1):
            for w in range(img_height - param_size + 1):
                patch = imgs_batch_pad[:, :, h : h + param_size, w : w + param_size]
                patches.append(patch)
        return np.stack(patches)

In [97]:
input = np.random.randn(2,3,228,228)
input_patches = _get_image_patches(input)
print(input_patches.shape)

(51984, 2, 3, 3, 3)


In [98]:
batch_size = 2
img_size = input.shape[2]*input.shape[3]
img_size


51984

In [99]:
patches_reshaped = input_patches.transpose(1, 0, 2, 3, 4).reshape(2, img_size, -1)


In [100]:
patches_reshaped.shape

(2, 51984, 27)

In [148]:
class Conv2d_op(ParamOperation_final):
    def __init__(self, conv_param : np.ndarray):
        super().__init__(conv_param)
        self.param_size = conv_param.shape[2]
        self.param_pad = self.param_size // 2
    
    def _pad_1d(self, inp: np.ndarray) -> np.ndarray:
        z = np.array([0])
        z = np.repeat(z, self.param_pad)
        return np.concatenate([z, inp, z])

    def _pad_1d_batch(self, inp: np.ndarray) -> np.ndarray:
        outs = [self._pad_1d(obs) for obs in inp]
        return np.stack(outs)

    def _pad_2d_obs(self, inp: np.ndarray):
        """
        Input is a 2 dimensional, square, 2D Tensor
        """
        inp_pad = self._pad_1d_batch(inp)

        other = np.zeros((self.param_pad, inp.shape[0] + self.param_pad * 2))

        return np.concatenate([other, inp_pad, other])

    def _pad_2d_channel(self, inp: np.ndarray):
        """
        inp has dimension [num_channels, image_width, image_height]
        """
        return np.stack([self._pad_2d_obs(channel) for channel in inp])

    def _get_image_patches(self, input_: np.ndarray):
        imgs_batch_pad = np.stack([self._pad_2d_channel(obs) for obs in input_])
        patches = []
        img_height = imgs_batch_pad.shape[2]
        for h in range(img_height - self.param_size + 1):
            for w in range(img_height - self.param_size + 1):
                patch = imgs_batch_pad[:, :, h : h + self.param_size, w : w + self.param_size]
                patches.append(patch)
        return np.stack(patches)
    
    def _output(self, inference : bool = False):
        batch_size = self.input_.shape[0]
        img_height = self.input_.shape[2]
        img_size = self.input_.shape[2] * self.input_.shape[3]
        patch_size = self.param.shape[2] * self.param.shape[3] * self.param.shape[0]
        patches = self._get_image_patches(self.input_) # num_patches X batch_size X in_channels X 3X3
        patches_reshaped = patches.transpose(1,0,2,3,4).reshape(batch_size, img_size, -1) # batch_size X (hXW) X (in_channels X 3 X 3)
        params_reshaped = self.param.transpose(0,2,3,1).reshape(patch_size,-1)
        output_reshaped = np.matmul(patches_reshaped, params_reshaped).reshape(batch_size,img_height, img_height,-1).transpose(0,3,1,2)
        return output_reshaped
    
    def _input_grad(self, output_grad : np.ndarray) -> np.ndarray:
        batch_size = self.input_.shape[0]
        img_size = self.input_.shape[2] * self.input_.shape[3]
        img_height = self.input_.shape[2]

        output_grad_pad = self._get_image_patches(output_grad).transpose(1,0,2,3,4).reshape(batch_size*img_size,-1) # 2 X (hXw) X (out_channel X 3 X 3)
        param_reshaped = self.param.reshape(self.param.shape[0],-1).transpose(1,0) # (out_channel X 3 X3) X in_channel

        input_grad = np.matmul(output_grad_pad, param_reshaped).reshape(batch_size, img_height, img_height, self.param.shape[0]).transpose(0,3,1,2)
        return input_grad

    def _param_grad(self, output_grad : np.ndarray) -> np.ndarray :
        batch_size = self.input_.shape[0]
        img_height = self.input_.shape[2]
        img_size =  img_height * img_height 
        in_channel = self.param.shape[0]
        out_channel = self.param.shape[1]

        in_patches_reshape = self._get_image_patches(self.input_).transpose(1,0,2,3,4).reshape(batch_size*img_size,-1).transpose(1,0)
        output_grad_reshape = output_grad.transpose(0,2,3,1).reshape(batch_size * img_size,-1)

        param_grad = np.matmul(in_patches_reshape, output_grad_reshape).reshape(in_channel,self.param.shape[2], self.param.shape[3], out_channel).transpose(0,3,1,2)
        return param_grad

        

        

        
    

    
    

    


In [154]:
# Instantiating the model
# a singel convnet with dense layer containing 10 neurons

model = NeuralNetwork_final(
    layers=[conv2D(out_channels=16,
                   param_size=5,
                   dropout=0.8,
                   weight_init="glorot",
                   flatten=True,
                  activation=Tanh_final()),
            Dense_final(neurons=10, 
                  activation=Linear_final())],
            loss = SoftmaxCrossEntropy(), 
seed=20190402)

In [155]:
# the same MNIST dataset
from tensorflow.keras.datasets import mnist
(train_images, train_labels), (test_images, test_labels) = mnist.load_data()

In [156]:
train_images = train_images.reshape(-1,28*28)
test_images = test_images.reshape(-1,28*28)

X_train = (train_images - np.mean(train_images)) / np.std(train_images)
X_test = (test_images - np.mean(test_images)) / np.std(test_images)

In [157]:
y_train = np.zeros((train_labels.size, train_labels.max()+1))
y_train[np.arange(train_labels.size), train_labels] = 1

y_test = np.zeros((test_labels.size, train_labels.max()+1))
y_test[np.arange(test_labels.size), test_labels] = 1

In [158]:
X_train_conv = X_train.reshape(-1,1,28,28)
X_test_conv = X_test.reshape(-1,1,28,28)

In [159]:
optimizer = SGDMomentum2(0.2,0.05,"exponential", 0.9)

In [None]:
trainer = Trainer_final(model, optimizer)
trainer.fit(X_train_conv, y_train, X_test_conv,y_test,batch_size=64,epochs=2, eval_every=1,seed=42)