In [33]:
# CNN model from scratch 

# Using pytorch only for data loading 
# import keras
import numpy as np
import tensorflow.keras as tk
from scipy import signal
from keras.utils import np_utils

In [34]:
(x_train, y_train), (x_test, y_test) = tk.datasets.cifar10.load_data()
assert x_train.shape == (50000, 32, 32, 3)
assert x_test.shape == (10000, 32, 32, 3)
assert y_train.shape == (50000, 1)
assert y_test.shape == (10000, 1)

In [35]:
batch_size = 32

def preprocess_data(x, y, size):
   
    x = x.astype("float32") / 255.0
    # encode output which is a number in range [0,9] into a vector of size 10
    # e.g. number 3 will become [0, 0, 0, 1, 0, 0, 0, 0, 0, 0]
    y = np_utils.to_categorical(y)
    y = y.reshape(y.shape[0], 10, 1)
    return x[:size], y[:size]


x_train , y_train = preprocess_data(x_train , y_train, 10000)
x_test, y_test = preprocess_data(x_test, y_test, 2000)

data_batches = []
for i in range(0, len(x_train), batch_size):
    images_batch = x_train[i:i+batch_size]
    labels_batch = y_train[i:i+batch_size]
    data_batches.append((np.array(images_batch), np.array(labels_batch)))

In [None]:
print(data_batches[0])

# Layers 

In [37]:
class Layer:
    def __init__(self):
        self.input = None
        self.output = None

    def forward(self, input):
        pass

    def backward(self, output_gradient, learning_rate):
        pass

In [38]:
# ================================================================
# ---------------------- Pooling Layer -------------------------
#=================================================================
class maxPool(Layer):
  def __init__(self,input_shape,kernel, stride):
    input_depth, input_height, input_width = input_shape
    self.depth = input_depth
    self.height = input_height
    self.width = input_width
    self.kernel_size = kernel
    self.stride = stride
    self.out_h = int(1 + (self.height - self.kernel_size) / self.stride)
    self.out_w = int(1 + (self.width - self.kernel_size) / self.stride)
    # self.batch_size = batch_size

  def forward(self, input):
    out = np.zeros((self.depth, self.out_h, self.out_w))
    self.input = input
    # for n in range(self.batch_size):
    for c in range(self.depth):
          for hi in range(self.out_h):
              for wi in range(self.out_w):
                  out[c, hi, wi] = np.max(input[c, hi * self.stride : hi * self.stride + self.kernel_size,
                                                wi * self.stride : wi * self.stride + self.kernel_size ]) 
    return out

  def backward(self, output_gradient, learning_rate):
    # dout = np.random.randn( self.depth, self.height, self.width) # output gradients
    
    dx = np.zeros_like(self.input)
    # for n in range(self.batch_size):
    for c in range(self.depth):
          for i in range(self.out_h):
              for j in range(self.out_w):
                start_i = i*self.stride
                start_j = j*self.stride
                end_i = start_i + self.kernel_size
                end_j = start_j + self.kernel_size
                max_idx = np.argmax(self.input[c,start_i:end_i, start_j:end_j])
                (idx_i,idx_j) = np.unravel_index(max_idx,(self.kernel_size,self.kernel_size))
                dx[c,start_i+idx_i,start_j+idx_j] += output_gradient[c,i,j]
    return dx 

In [93]:
class Convolutional(Layer):
    def __init__(self, input_shape, kernel_size, depth):
        input_depth, input_height, input_width = input_shape
        self.depth = depth
        self.input_shape = input_shape
        self.input_depth = input_depth
        self.output_shape = (depth, input_height - kernel_size + 1, input_width - kernel_size + 1)
        self.kernels_shape = (depth, input_depth, kernel_size, kernel_size)
        self.kernels = np.random.randn(*self.kernels_shape)
        self.biases = np.random.randn(*self.output_shape)

    def forward(self, input):
        self.input = input
        self.output = np.copy(self.biases)
        for i in range(self.depth):
            for j in range(self.input_depth):
                self.output[i] += signal.correlate2d(self.input[j], self.kernels[i, j], "valid")
        return self.output

    def backward(self, output_gradient, learning_rate):
        kernels_gradient = np.zeros(self.kernels_shape)
        input_gradient = np.zeros(self.input_shape)

        for i in range(self.depth):
            for j in range(self.input_depth):
                # print("===+")
                kernels_gradient[i, j] = signal.correlate2d(self.input[j], output_gradient[i], "valid")
                # print("===-")
                input_gradient[j] += signal.convolve2d(output_gradient[i], self.kernels[i, j], "full")

        self.kernels -= learning_rate * kernels_gradient
        self.biases -= learning_rate * output_gradient
        return input_gradient

# class Convolutional:                                        # convolution layer using 3x3 filters

#     def __init__(self, num_filters=16, stride=1, size=3, activation=None):
#         # self.name = name
#         self.filters = np.random.randn(num_filters, 3, 3) * 0.1
#         self.stride = stride
#         self.size = size
#         self.activation = activation
#         self.last_input = None
#         # self.leakyReLU = np.vectorize(leakyReLU)
#         # self.leakyReLU_derivative = np.vectorize(leakyReLU_derivative)

#     def forward(self, image):
#         self.last_input = image                             # keep track of last input for later backward propagation

#         input_dimension = image.shape[1]                                                # input dimension
#         output_dimension = int((input_dimension - self.size) / self.stride) + 1         # output dimension

#         out = np.zeros((self.filters.shape[0], output_dimension, output_dimension))     # create the matrix to hold the
#                                                                                         # values of the convolution

#         for f in range(self.filters.shape[0]):              # convolve each filter over the image,
#             tmp_y = out_y = 0                               # moving it vertically first and then horizontally
#             while tmp_y + self.size <= input_dimension:
#                 tmp_x = out_x = 0
#                 while tmp_x + self.size <= input_dimension:
#                     patch = image[:, tmp_y:tmp_y + self.size, tmp_x:tmp_x + self.size]
#                     out[f, out_y, out_x] += np.sum(self.filters[f] * patch)
#                     tmp_x += self.stride
#                     out_x += 1
#                 tmp_y += self.stride
#                 out_y += 1
#         # if self.activation == 'relu':                       # apply ReLU activation function
#         #     self.leakyReLU(out)
#         return out

#     def backward(self, din, learn_rate=0.005):
#         input_dimension = self.last_input.shape[1]          # input dimension

#         # if self.activation == 'relu':                       # back propagate through ReLU
#         #    self.leakyReLU_derivative(din)

#         dout = np.zeros(self.last_input.shape)              # loss gradient of the input to the convolution operation
#         dfilt = np.zeros(self.filters.shape)                # loss gradient of filter

#         for f in range(self.filters.shape[0]):              # loop through all filters
#             tmp_y = out_y = 0
#             while tmp_y + self.size <= input_dimension:
#                 tmp_x = out_x = 0
#                 while tmp_x + self.size <= input_dimension:
#                     patch = self.last_input[:, tmp_y:tmp_y + self.size, tmp_x:tmp_x + self.size]
#                     dfilt[f] += np.sum(din[f, out_y, out_x] * patch, axis=0)
#                     dout[:, tmp_y:tmp_y + self.size, tmp_x:tmp_x + self.size] += din[f, out_y, out_x] * self.filters[f]
#                     tmp_x += self.stride
#                     out_x += 1
#                 tmp_y += self.stride
#                 out_y += 1
#         self.filters -= learn_rate * dfilt                  # update filters using SGD
#         return dout     

In [90]:
# import numpy as np

# Define a 2D convolution layer
class Conv2D(Layer):
    def __init__(self, in_channels, out_channels, kernel_size, stride=1, padding=0):
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.kernel_size = kernel_size
        self.stride = stride
        self.padding = padding
        
        # Initialize the weights and biases
        self.weights = np.random.randn(out_channels, in_channels, kernel_size, kernel_size)
        self.bias = np.zeros((out_channels,))
        
    def forward(self, x):
        self.input = x
        in_channels, height, width = x.shape
        out_height = int((height + 2*self.padding - self.kernel_size) / self.stride + 1)
        out_width = int((width + 2*self.padding - self.kernel_size) / self.stride + 1)
        
        # Add padding to the input
        padded_x = np.pad(x, ( (0,0), (self.padding, self.padding), (self.padding, self.padding)), mode='constant')
        
        # Initialize the output
        out = np.zeros(( self.out_channels, out_height, out_width))
        
        # Perform the convolution operation
        
        for c in range(self.out_channels):
            for i in range(out_height):
                for j in range(out_width):
                    out[ c, i, j] = np.sum(self.weights[c] * padded_x[ :, i*self.stride:i*self.stride+self.kernel_size, j*self.stride:j*self.stride+self.kernel_size]) + self.bias[c]
                    
        return out

    def backward(self, grad_out,learning_rate):
        batch_size, in_channels, height, width = self.input.shape
        grad_x = np.zeros_like(self.input)
        grad_weights = np.zeros_like(self.weights)
        grad_bias = np.zeros_like(self.bias)
        
        # Add padding to the input
        padded_x = np.pad(self.input, ( (0,0), (self.padding, self.padding), (self.padding, self.padding)), mode='constant')
        
        # Compute the gradients
        
        for c in range(self.out_channels):
            for i in range(grad_out.shape[2]):
                for j in range(grad_out.shape[3]):
                    grad_x[ :, i*self.stride:i*self.stride+self.kernel_size, j*self.stride:j*self.stride+self.kernel_size] += grad_out[ c, i, j] * self.weights[c]
                    grad_weights[c] += grad_out[ c, i, j] * padded_x[ :, i*self.stride:i*self.stride+self.kernel_size, j*self.stride:j*self.stride+self.kernel_size]
                    grad_bias[c] += grad_out[ c, i, j]
                    
        # Remove padding from the gradients
        grad_x = grad_x[ :, self.padding:-self.padding, self.padding:-self.padding]
        self.bias -= learning_rate * grad_bias
        self.weights -= learning_rate*grad_weights
        
        return grad_x


In [40]:
# Reshape Layer 

In [41]:
# class Reshape(Layer):
#     def __init__(self, input_shape, output_shape, batch_size):
#         input_depth, input_height, input_width = input_shape
#         output_depth = output_shape[0]
#         self.input_shape = input_shape
#         self.output_shape = output_shape
#         self.batch_size = batch_size
#         self.outshape = (batch_size,output_depth)
#         self.inshape = (batch_size,)

#     def forward(self, input):
#         output = np.zeros(self.outshape)
#         for n in range(self.batch_size):
#           output[n]+= np.reshape(input, self.output_shape)
#         return output

#     def backward(self, output_gradient, learning_rate):
#         in_grad = np.zeros(self.inshape)
#         for n in range(self.batch_size):
#           in_grad[n]+= np.reshape(output_gradient, self.input_shape)
#         return in_grad



In [42]:
# Fully Connected Layer 

In [43]:
class Reshape(Layer):
    def __init__(self, input_shape, output_shape):
        self.input_shape = input_shape
        self.output_shape = output_shape

    def forward(self, input):
        return np.reshape(input, self.output_shape)

    def backward(self, output_gradient, learning_rate):
        return np.reshape(output_gradient, self.input_shape)

In [44]:
class ReLU(Layer):
  def __init__(self):
    pass
  def forward(self, x):
      self.input = x
      return np.maximum(0, x)

  def backward(self, output_gradient, learning_rate):
      return output_gradient * (self.input > 0)

In [84]:
class FullyConnected(Layer):
    def __init__(self, input_size, output_size):
        self.weights_shape = (output_size, input_size)
        self.bias_shape = (output_size, 1)
        weights = np.random.randn(* self.weights_shape)
        std = np.std(weights)
        self.weights =weights/std
        bias = np.random.randn(* self.bias_shape)
        std = np.std(bias)
        self.bias = bias/std

    def forward(self, input):
        self.input = input
        return np.dot(self.weights, self.input) + self.bias

    def backward(self, output_gradient, learning_rate):
        weights_gradient = np.dot(output_gradient, self.input.T)
        input_gradient = np.dot(self.weights.T, output_gradient)
        self.weights -= learning_rate * weights_gradient
        self.bias -= learning_rate * output_gradient
        return input_gradient

In [46]:
# Loss Functions 

def mse(y_true, y_pred):
    return np.mean(np.power(y_true - y_pred, 2))

def mse_prime(y_true, y_pred):
    return 2 * (y_pred - y_true) / np.size(y_true)

# def binary_cross_entropy(y_true, y_pred):
#     return np.mean(-y_true * np.log(y_pred) - (1 - y_true) * np.log(1 - y_pred))

# def binary_cross_entropy_prime(y_true, y_pred):
#     return ((1 - y_true) / (1 - y_pred) - y_true / y_pred) / np.size(y_true)

# CNN Model 

In [94]:


class Net:
  def __init__(self):
    self.conv1 = Convolutional((3,32,32),3,32)  # Kernel size = 3x3  in: 3 out 32
    self.pool1 = maxPool((32,30,30),2,1)     # Kernel size = 2x2
    self.conv2 = Convolutional((32,29,29),5,64) # Kernel size = 3x3  in:  out 64
    self.pool2 = maxPool((64,25,25),2,1)     # Kernel size = 2x2
    self.conv3 = Convolutional((64,24,24),3,64)
    self.reshape = Reshape((64,22,22),(64*22*22,1))
    self.fc1 = FullyConnected(64*22*22,64)
    self.fc2 = FullyConnected(64,10)
    self.relu1 = ReLU()  
    self.relu2 = ReLU()  
    self.relu3 = ReLU()  
    self.relu4 = ReLU()  

  def forward(self,input):
    x = self.conv1.forward(input)
    x = self.relu1.forward(x)
    x = self.pool1.forward(x)
    x = self.conv2.forward(x)
    x = self.relu2.forward(x)
    x = self.pool2.forward(x)
    x = self.conv3.forward(x)
    x = self.relu3.forward(x)
    x = self.reshape.forward(x)
    x = self.fc1.forward(x)
    x = self.relu4.forward(x)
    x = self.fc2.forward(x)
    return x

  def backward(self, out_grad , learning_rate):
      x = self.fc2.backward(out_grad , learning_rate)
      x = self.relu4.backward(x , learning_rate)
      x = self.relu4.backward(x , learning_rate)
      x = self.fc1.backward(x , learning_rate)
      x = self.reshape.backward(x , learning_rate)
      x = self.relu3.backward(x , learning_rate)
      x = self.conv3.backward(x , learning_rate)
      x = self.pool2.backward(x , learning_rate)
      x = self.relu2.backward(x , learning_rate)
      x = self.conv2.backward(x , learning_rate)
      x = self.pool1.backward(x , learning_rate)
      x = self.relu1.backward(x , learning_rate)
      x = self.conv1.backward(x , learning_rate)
     
      return x

     
     
    
def Accuracy(model , X , Y):
  correct_pred = 0
  for x, y in zip(X, Y):
    output = model.forward(x)
    pred = np.argmax(output)
    trueVal = np.argmax(y)
    if trueVal == pred : 
      correct_pred+=1
  acc = float(correct_pred)/float(len(X))
  # print("Accuracy : ",acc)
  return acc

class CNNModel:
  def __init__(self,trainX,trainY,valX, valY,epoch, learning_rate):
    self.train_X = trainX
    self.train_Y = trainY
    self.val_X = valX
    self.val_Y = valY
    self.best = None
    self.Epoch = epoch
    self.learning_rate = learning_rate
    self.models = []
    self.AccT = []
    self.AccV = []


  def train(self):
    model = Net()
    Models = []
    AccT =[]
    AccV = []
    Epoch = []

    for e in range(self.Epoch):
       error = 0 
       i=0
       for x in self.train_X:
        #  print(x)
         y= self.train_Y[i]
         i+=1
         output = model.forward(x)
        #  print(output.shape)
         error+= mse(y, output)
         print("=====================================")
         print(output)
         print(y)
         grad = mse_prime(y, output)
         grad = model.backward(grad, self.learning_rate)
       error /= len(self.train_X)
       print(f"{e + 1}/{self.Epoch}, error={error}")
       accT = Accuracy(model, self.train_X,self.train_Y)
       accV = Accuracy(model , self.val_X,self.val_Y)
       AccT.append(accT)
       AccV.append(accV)
       print(f'Accuracy of the network on the 10000 test images: {AccT} %')
       print(f'Accuracy of the network on the train images: {AccV} %')
       M = model
       Models.append(M)
       Epoch.append(e+1)
    self.models = Models
    self.AccT = AccT
    self.AccV = AccV
    print("Training Done!")

M = CNNModel(x_train , y_train, x_test, y_test, 5 , 0.005)
M.train()


Shape of Output of Reshape :  (30976, 1)
Shape of Output of FC1 :  (64, 1)
Shape of Output of FC2 :  (10, 1)
[[ 958905.69178429]
 [-764103.62866035]
 [-715950.0312968 ]
 [-378893.2879596 ]
 [ 823565.52679697]
 [ 498195.67681581]
 [-105623.30512001]
 [ 194363.68465366]
 [1622740.84396926]
 [-444464.55665676]]
[[0.]
 [0.]
 [0.]
 [0.]
 [0.]
 [0.]
 [1.]
 [0.]
 [0.]
 [0.]]
----------------
----------------
----------------
----------------
----------------
----------------
----------------
----------------
(32, 30, 30)
-3---------------
(32, 30, 30)


ValueError: ignored