In [82]:
import numpy as np
import math
from scipy.ndimage.filters import gaussian_filter

from Layers import Base
from scipy import signal
from Layers import Flatten
class L2Loss:

    def __init__(self):
        self.input_tensor = None

    def forward(self, input_tensor, label_tensor):
        self.input_tensor = input_tensor
        return np.sum(np.square(input_tensor - label_tensor))

    def backward(self, label_tensor):
        return 2*np.subtract(self.input_tensor, label_tensor)

In [103]:
import numpy as np
import math
from scipy.signal import correlate, correlate2d
from Layers.Base import BaseLayer
class Conv(Base.BaseLayer):
    def __init__(self, stride_shape, convolution_shape, num_kernels):

        super().__init__()
        self.stride_shape = stride_shape
        self.convolution_shape = convolution_shape
        self.num_kernels = num_kernels

        if len(convolution_shape)==3:
            self.weights = np.random.rand(num_kernels, convolution_shape[0], convolution_shape[1], convolution_shape[2])
        elif len(convolution_shape)==2:
            self.weights = np.random.rand(num_kernels, convolution_shape[0], convolution_shape[1])
            
        self.bias = np.random.rand(num_kernels)
        self._optimizer = None 
        self._bias_optimizer = None
        self._gradient_weights = np.zeros_like(self.weights)
        self._gradient_bias = None



    def forward(self, input_tensor):
        
        self.input_tensor = input_tensor
        
        #initialize zeros, based on input tensor, number of kernels, and the stride shape.       
        if len(self.convolution_shape) == 3: #If the convolution shape is 3D 
            result = np.zeros((input_tensor.shape[0], 
                               self.num_kernels, 
                               int(np.ceil(input_tensor.shape[2]/self.stride_shape[0])),
                               int(np.ceil(input_tensor.shape[3]/self.stride_shape[1]))))
            #result.shape = (batch_size, num_kernels, output_height, output_width)      
        elif len(self.convolution_shape) == 2: #If the convolution shape is 2D
            result = np.zeros((input_tensor.shape[0], 
                               self.num_kernels, 
                               int(np.ceil(input_tensor.shape[2]/self.stride_shape[0]))))
            #result.shape = (batch_size, num_kernels, output_height)
            
        result = [] #stores results of convolution layer
        
        for batch in range(input_tensor.shape[0]): # Iterate over input tensor
            result_batch = [] #stores results of convolution layer of that batch
            
            for output_channel in range(self.weights.shape[0]): # Iterate over each kernel's output channel
                
                conv_sum = 0 # track sum of convolutions
                
                # Iterate over each kernel's input channel
                for input_channel in range(self.weights.shape[1]): #Iterate over each input channel
                    #conv_out = stores result for each input channel
                    conv_out = signal.correlate(input_tensor[batch, input_channel],
                                                self.weights[output_channel, input_channel], 
                                                mode='same', method='direct') # Perform convolution
                    
                    conv_sum += conv_out 

                # Check convolution shape after processing all input channels
                
                # If 3D, select every stride_shape[0]-th element along the height 
                # and every stride_shape[1]-th element along the width of the conv_sum
                if len(self.convolution_shape) == 3:
                    conv_sum = conv_sum[::self.stride_shape[0], ::self.stride_shape[1]]
                # If 2D, subsample along the height dimension
                elif len(self.convolution_shape) == 2:
                    conv_sum = conv_sum[::self.stride_shape[0]]
   
                result_batch.append(conv_sum + self.bias[output_channel]) # Add the bias value corresponding to the current output channel

            result.append(result_batch) # Append result_batch to the final result

        output_tensor = np.array(result) # Convert to numpy array
        
        self.output_shape  = output_tensor.shape # used in backward
        
        return output_tensor
    
    def backward(self, error_tensor):
        
        self.error_tensor_reshaped = error_tensor.reshape(self.output_shape) #reshaping to match shape of output_tensor
        
        if len(self.convolution_shape) == 2: # check for 2D
            self.input_tensor = self.input_tensor[:, :, :, np.newaxis]
        
        # Initialize with zeros, to store upsampled error tensor
        self.upsampled_error_tensor = np.zeros((self.input_tensor.shape[0], self.num_kernels, *self.input_tensor.shape[2:]))
        
        #Initialize return tensor to return gradient weights
        return_tensor = np.zeros(self.input_tensor.shape)
        
        # Initialize with zero, stores input tensor after padding
        self.padded_input_tensor = np.zeros((*self.input_tensor.shape[:2], self.input_tensor.shape[2] + self.convolution_shape[1] - 1,
                                   self.input_tensor.shape[3] + self.convolution_shape[2] - 1))
        
        # Initialize gradient bias with zeros, will store the gradient with respect to the bias terms 
        self.gradient_bias = np.zeros(self.num_kernels)
        
        # Initialize gradient weights with zeros, will store the gradient with respect to the weights
        self.gradient_weights = np.zeros(self.weights.shape)

        # padding along the height dimension (verticle) of the input tensor
        padding_verticle = int(np.floor(self.convolution_shape[2] / 2))  
        
        # padding along the width dimension (horizontal) of the input tensor
        padding_horizontal = int(np.floor(self.convolution_shape[1] / 2))

        # Upsampling and filling unsampled error tensor
        for batch in range(self.upsampled_error_tensor.shape[0]):
            for kernel in range(self.upsampled_error_tensor.shape[1]):
                # gradient with respect to the bias
                self.gradient_bias[kernel] += np.sum(error_tensor[batch, kernel, :])

                for h in range(self.error_tensor_reshaped.shape[2]):
                    for w in range(self.error_tensor_reshaped.shape[3]):
                        self.upsampled_error_tensor[batch, kernel, h * self.stride_shape[0], w * self.stride_shape[1]] = self.error_tensor_reshaped[batch, kernel, h, w]  

                for channel in range(self.input_tensor.shape[1]):  
                    return_tensor[batch, channel, :] += signal.convolve2d(self.upsampled_error_tensor[batch, kernel, :], self.weights[kernel, channel, :], 'same')  # zero padding

            # Delete the padding
            for n in range(self.input_tensor.shape[1]):
                for h in range(self.padded_input_tensor.shape[2]):
                    for w in range(self.padded_input_tensor.shape[3]):
                        if (h > padding_horizontal - 1) and (h < self.input_tensor.shape[2] + padding_horizontal):
                            if (w > padding_verticle - 1) and (w < self.input_tensor.shape[3] + padding_verticle):
                                self.padded_input_tensor[batch, n, h, w] = self.input_tensor[batch, n, h - padding_horizontal, w - padding_verticle]

            for kernel in range(self.num_kernels):
                for c in range(self.input_tensor.shape[1]):
                    # convolution of the upsampled error tensor with the padded input tensor
                    self.gradient_weights[kernel, c, :] += correlate2d(self.padded_input_tensor[batch, c, :], self.upsampled_error_tensor[batch, kernel, :], 'valid')  # valid padding


        if self._optimizer is not None:
            self.weights = self._optimizer.weights.calculate_update(self.weights, self.gradient_weights)
            self.bias = self._optimizer.bias.calculate_update(self.bias, self.gradient_bias)
        
        # In case of 2D input
        if len(self.convolution_shape) == 2:
            return_tensor = return_tensor.squeeze(axis = 3) 
            
        return return_tensor


In [104]:
class TestInitializer:
    def __init__(self):
        self.fan_in = None
        self.fan_out = None

    def initialize(self, shape, fan_in, fan_out):
        self.fan_in = fan_in
        self.fan_out = fan_out
        weights = np.zeros((1, 3, 3, 3))
        weights[0, 1, 1, 1] = 1
        return weights
    
batch_size = 2
input_shape = (3, 10, 14)
input_size = 14 * 10 * 3
uneven_input_shape = (3, 11, 15)
uneven_input_size = 15 * 11 * 3
spatial_input_shape = np.prod(input_shape[1:])
kernel_shape = (3, 5, 8)
num_kernels = 4
hidden_channels = 3

categories = 105
label_tensor = np.zeros([batch_size, categories])
for i in range(batch_size):
    label_tensor[i, np.random.randint(0, categories)] = 1

In [105]:
conv = Conv((1, 1), kernel_shape, num_kernels)
input_tensor = np.array(range(np.prod(input_shape) * batch_size), dtype=float)
input_tensor = input_tensor.reshape(batch_size, *input_shape)
output_tensor = conv.forward(input_tensor)
error_tensor = conv.backward(output_tensor)

In [106]:
difference

0.10281588892389149