In [None]:
import numpy as np
from numpy.lib.stride_tricks import as_strided

class Convolve():
  def __init__(self,activation=None,pooling=None,filter_size=(3,3),num_filters=1,stride=1,padding=0):

    self.filter_size=filter_size
    self.filter_height,self.filter_width=self.filter_size
    self.num_filters=num_filters
    self.stride=stride
    self.padding=padding
    self.pool=pooling
    self.activation=activation
    self.t = 0

    # self.sdw = np.zeros((self.filter_height, self.filter_width, self.channel, self.num_filters))


    # self.vdw =  np.zeros((self.filter_height,self.filter_width,self.channel,self.num_filters))
  def get_patches(self,input_array,backward=False):
    if backward==True:
      self.padding2=self.padding
    else:
      self.padding2=0

    self.input_array=input_array
    # self.input_array=input_array
    self.batch_size,self.height,self.width,self.channel=self.input_array.shape
    self.output_height = int((self.height +2*self.padding2 - self.filter_height)/self.stride) + 1
    self.output_width = int((self.width +2*self.padding2  - self.filter_width)/self.stride) + 1
    self.new_shape = (self.batch_size, self.output_height, self.output_width, self.filter_height, self.filter_width, self.channel)
    self.new_strides = (self.input_array.strides[0], self.stride * self.input_array.strides[1], self.stride * self.input_array.strides[2],
                  self.input_array.strides[1], self.input_array.strides[2], self.input_array.strides[3])
    self.patches = as_strided(self.input_array, self.new_shape, self.new_strides)
    #print(patches.shape)
    return self.patches

  def forward(self,input_array):
    self.input_array=input_array
    #print(f'input array ko shape{self.input_array.shape}')
    if self.padding > 0:
      self.input_array_padded = np.pad(self.input_array, ((0, 0), (self.padding, self.padding), (self.padding, self.padding), (0, 0)), mode='constant')
    else:
      self.input_array_padded=self.input_array
    self.patches=self.get_patches(self.input_array_padded)

    #print(f'patches ko shape{self.patches.shape}')
    self.patches=self.patches.reshape(self.patches.shape[0],self.patches.shape[1],self.patches.shape[2],-1)
    #print(f'patches{self.patches[0][0][0]}')
    #print(self.patches.shape)
    self.filter = np.random.randn(self.filter_height,self.filter_width,self.channel,self.num_filters)
    self.filter=self.filter.reshape(-1,self.num_filters)
    #print(self.filter.shape)
    self.output_array=np.tensordot(self.patches,self.filter,axes=([3],[0]))
    self.patches=self.patches.reshape(self.batch_size,self.output_height,self.output_width,-1)
    #print(f'output array{self.output_array.shape}')
    self.output_array=self.output_array.reshape(self.batch_size,self.output_height,self.output_width,self.num_filters)
    #print(f'output array{self.output_array.shape}')
    if self.pool:
      self.output_array=self.pool.forward(self.output_array)
    if self.activation:
          self.output_array=self.activation.forward(self.output_array)
    #print(f'convolve forward{self.output_array.shape}')

    return self.output_array

  def l2(self):
        return np.sum(self.filter ** 2)

  def backward(self, gradient):
    self.gradient=gradient

    if self.activation:
      self.gradient=self.activation.backward(self.gradient)

    if self.pool:
      self.gradient=self.pool.backward(self.gradient)
    #print(f'convolve backward ma pako{self.gradient.shape}')
    # print(f'patches shape RESHAPED{self.patches.reshape(self.batch_size, self.output_height, self.output_width, self.filter_height, self.filter_width, self.channel).shape}')
    # self.patches=self.patches.reshape(self.batch_size, self.output_height, self.output_width, self.filter_height, self.filter_width, self.channel)
    # self.gradient=self.gradient.reshape(self.batch_size,self.output_height,self.output_width,self.channel,-1)
    #print(f'convolve backward{self.gradient.shape}')

    self.filter_grad = np.tensordot(self.patches.transpose(3,0, 1, 2), self.gradient, axes=([1,2,3], [0, 1, 2]))
    #print(f'filter grad{self.filter_grad.shape}')
    # Reshape the filter gradient to match the original filter dimensions
    self.filter_grad = self.filter_grad.reshape(self.filter_height, self.filter_width,self.channel, self.num_filters)
    #print(f'filter grad reshape{self.filter_grad.shape}')
    # Calculate gradient for the input
    #gradient_input = np.zeros_like(self.input_array)
    self.flipped_filter = np.flip(self.filter, axis=(0, 1))
    self.flipped_filter=self.flipped_filter.reshape(-1,self.channel)
    #print(f'flipped filter{self.flipped_filter.shape}')
    self.gradient_patches = self.get_patches(self.gradient,backward=True)
    #print(f'gradient patches{self.gradient_patches.shape}')
    self.gradient_patches=self.gradient_patches.reshape(self.gradient_patches.shape[0],self.gradient_patches.shape[1],self.gradient_patches.shape[2],-1)
    #print(f'gradient patches222222222{self.gradient_patches.shape}')
    self.gradient_input = np.tensordot(self.gradient_patches, self.flipped_filter, axes=([3], [0]))
    #print(f'gradient input{self.gradient_input.shape}')
    # return self.gradient_input
    # for i in range(self.output_height):
    #     for j in range(self.output_width):
    #         patch_gradient = np.tensordot(gradient[:, i, j, :], self.filter.T, axes=(1, 0))
    #         patch_gradient = patch_gradient.reshape(self.batch_size, self.filter_height, self.filter_width, self.channel)
    #         gradient_input[:, i*self.stride:i*self.stride+self.filter_height, j*self.stride:j*self.stride+self.filter_width, :] += patch_gradient

    # if self.padding > 0:
    #     print(f'gradient input padding{self.gradient_input.shape}')
    #     self.gradient_input = self.gradient_input[:, self.padding:-self.padding, self.padding:-self.padding, :]
    #     print(f'gradient input padding{self.gradient_input.shape}')

    # else:
    #     self.gradient_input=self.gradient_input

    return self.gradient_input



  def calculate(self, optimizer):


    self.sdw = np.zeros_like(self.filter_grad)



    self.vdw =  np.zeros_like(self.filter_grad)



    if optimizer == 'adam':
        self.t += 1
        beta1, beta2 = 0.9, 0.999
        epsilon = 1e-8

        sdw = beta2 * self.sdw + (1 - beta2) * (self.filter_grad ** 2)
        self.sdw = sdw

        vdw = beta1 * self.vdw + (1 - beta1) * self.filter_grad
        self.vdw = vdw

        # Bias correction for adam optimizer for the starting difference while using exponantially weighted average
        sdw_corrected = self.sdw / (1 - beta2 ** self.t)

        vdw_corrected = self.vdw / (1 - beta1 ** self.t)


        self.sdw_corrected = sdw_corrected

        self.vdw_corrected = vdw_corrected


  def update(self, learning_rate, optimizer):
    if optimizer == 'adam':
        self.filter_grad -= learning_rate * self.vdw_corrected / (np.sqrt(self.sdw_corrected) + 1e-8)

    else:
        self.filter -= learning_rate * self.filter_grad



class MaxPool():
  def __init__(self,pool_size=(2,2),stride=2):

    self.pool_size=pool_size
    self.pool_height,self.pool_width=self.pool_size
    self.stride=stride


  def get_patches(self,input_array):
    self.input_array=input_array
    self.batch_size,self.height,self.width,self.channel=self.input_array.shape
    self.output_height = int((self.height - self.pool_height)/self.stride) + 1
    self.output_width = int((self.width - self.pool_width)/self.stride) + 1
    self.new_shape = (self.batch_size, self.output_height, self.output_width, self.pool_height, self.pool_width, self.channel)
    self.new_strides = (self.input_array.strides[0], self.stride * self.input_array.strides[1], self.stride * self.input_array.strides[2],
    self.input_array.strides[1], self.input_array.strides[2], self.input_array.strides[3])
    self.patches = as_strided(self.input_array, self.new_shape, self.new_strides)

    return self.patches

  def forward(self,input_array):
    self.input_array=input_array

    self.patches2=self.get_patches(self.input_array)
    #print(f'patches2{self.patches2.shape}')
    self.patches2=self.patches2.reshape(self.patches.shape[0],self.patches.shape[1],self.patches.shape[2],self.pool_height*self.pool_width,-1)
    #print(f'patches2{self.patches2.shape}')
    # print(f'patches{self.patches[0][0][0]}')
    #print(f'patches ko shape{self.patches2.shape}')


    self.output=np.max(self.patches2,axis=3)
    #print(f'output of pool{self.output.shape}')
    # self.output=self.output.reshape(self.batch_size,self.output_height,self.output_width,self.channel)
    self.max_indices=np.argmax(self.patches2,axis=3)
    #print(f'maxpool forward{self.output.shape}')

    return self.output

  # def l2(self):
  #       return np.sum(self.output ** 2)

  def backward(self, gradient):
          maxpool_gradient=np.zeros_like(self.input_array)
          self.maxpool_gradient=maxpool_gradient
          gradient=gradient.flatten()
          max_indices=self.max_indices.reshape(-1)


          batch_indices,height_indices,width_indices,channel_indices= np.indices((self.batch_size,self.output_height,self.output_width,self.channel))
          indexes= ( batch_indices.flatten(),
                   (height_indices.flatten()*self.stride).reshape(-1) + max_indices //self.pool_width,
                   (width_indices.flatten()*self.stride).reshape(-1)+max_indices % self.pool_width,

                   channel_indices.flatten() )
          self.maxpool_gradient[indexes]+=gradient.flatten()


          self.maxpool_gradient=self.maxpool_gradient.reshape(self.batch_size,self.height,self.width,self.channel)
          #print(f'maxpool backward{self.maxpool_gradient.shape}')
          return self.maxpool_gradient

class Flatten():
  def __init__(self):
    pass

  def forward(self,input_array):
    self.input_array=input_array
    self.batch_size,self.height,self.width,self.channel=self.input_array.shape
    self.new_shape=(self.batch_size,self.height*self.width*self.channel)
    self.output_array=self.input_array.reshape(self.new_shape)
    #print(f'flatten forward{self.output_array.shape}')
    return self.output_array

  def l2(self):
        return 1

  def backward(self, gradient):
        self.gradient=gradient
        #print(f'flatten backward{self.gradient.reshape(self.input_array.shape).shape}')
        return self.gradient.reshape(self.input_array.shape)


  def calculate(self, optimizer):
    pass

  def update(self, learning_rate, optimizer):
   pass
'''
input_array=np.random.randn(2000,50,50,3)
print(input_array.shape)
filter_size=(3,3)
layer4=Convolve(input_array,filter_size,30,2,1)
output_array=layer4.forward()
print(output_array.shape)
layer5=Max_Pool(output_array)
output_array2=layer5.forward()
print(output_array2.shape)
'''


'\ninput_array=np.random.randn(2000,50,50,3)\nprint(input_array.shape)\nfilter_size=(3,3)\nlayer4=Convolve(input_array,filter_size,30,2,1)\noutput_array=layer4.forward()\nprint(output_array.shape)\nlayer5=Max_Pool(output_array)\noutput_array2=layer5.forward()\nprint(output_array2.shape)\n'

In [None]:
class Dense():
    def __init__(self, ninputs, nnodes,activation=None ):

        self.weight = np.random.randn(ninputs, nnodes) * np.sqrt(2. / ninputs) #xaiver initialization
        self.bias = np.random.rand(nnodes) * 0.01
        self.sdw = np.zeros((ninputs, nnodes))
        self.sdb = np.zeros(nnodes)
        self.vdw = np.zeros((ninputs, nnodes))
        self.vdb = np.zeros(nnodes)
        self.t = 0
        self.activation=activation

    def forward(self, inputs):
        self.input = inputs
        self.output = np.dot(inputs, self.weight) + self.bias
        if self.activation:
          self.output=self.activation.forward(self.output)
        return self.output

    def backward(self, gradient):
        if self.activation:
          gradient=self.activation.backward(gradient)
        self.gradient_weight = np.dot(self.input.T, gradient)
        self.gradient_bias = np.sum(gradient, axis=0)
        self.gradient_input = np.dot(gradient, self.weight.T)


        return self.gradient_input

    def calculate(self, optimizer):
        if optimizer == 'adam':
            self.t += 1
            beta1, beta2 = 0.9, 0.999
            epsilon = 1e-8

            self.sdw = beta2 * self.sdw + (1 - beta2) * (self.gradient_weight ** 2)
            self.sdb = beta2 * self.sdb + (1 - beta2) * (self.gradient_bias ** 2)

            self.vdw = beta1 * self.vdw + (1 - beta1) * self.gradient_weight
            self.vdb = beta1 * self.vdb + (1 - beta1) * self.gradient_bias

            # Bias correction for adam optimizer for the starting difference while using exponantially weighted average
            sdw_corrected = self.sdw / (1 - beta2 ** self.t)
            sdb_corrected = self.sdb / (1 - beta2 ** self.t)
            vdw_corrected = self.vdw / (1 - beta1 ** self.t)
            vdb_corrected = self.vdb / (1 - beta1 ** self.t)

            self.sdw_corrected = sdw_corrected
            self.sdb_corrected = sdb_corrected
            self.vdw_corrected = vdw_corrected
            self.vdb_corrected = vdb_corrected

    def update(self, learning_rate, optimizer):
        if optimizer == 'adam':
            self.weight -= learning_rate * self.vdw_corrected / (np.sqrt(self.sdw_corrected) + 1e-8)
            self.bias -= learning_rate * self.vdb_corrected / (np.sqrt(self.sdb_corrected) + 1e-8)
        else:
            self.weight -= learning_rate * self.gradient_weight
            self.bias -= learning_rate * self.gradient_bias

    def l2(self):
        return np.sum(self.weight ** 2)

class Relu():
    def forward(self, inputs):
        self.input = inputs
        self.output = np.maximum(0, inputs)
        return self.output

    def backward(self, gradients):
        self.gradient = gradients * (self.input > 0) #why not self.output>>>because we need a boolean return
        return self.gradient

class Sigmoid():
    def forward(self, inputs):
        self.input = inputs
        self.output = 1 / (1 + np.exp(-inputs))
        return self.output

    def backward(self, dvalues):
        self.dinputs = dvalues * (1 - self.output) * self.output
        return self.dinputs

class Softmax():
    def __init__(self,final=False):
        self.final = final
    def forward(self, inputs):
        self.input = inputs
        exp = np.exp(inputs - np.max(inputs, axis=1, keepdims=True))
        probabilities = exp / np.sum(exp, axis=1, keepdims=True)
        self.output = probabilities
        return self.output

    def backward(self, gradient):
      if self.final == True:
        return gradient
      else:
        self.dinputs = gradient * self.output * (1 - self.output)  # Derivative of softmax
        return self.dinputs

class CategoricalCrossEntropyLoss():
    def forward(self, probs, true_outputs, layers,lamda=0):
        clipped_probs = np.clip(probs, 1e-7, 1 - 1e-7)
        loss_data = -np.sum(true_outputs * np.log(clipped_probs)) / (len(true_outputs) + 1e-8)

        l2_terms = [lamda * np.sum(layer.l2()) for layer in layers]
        loss_weight = 0.5 * np.sum(l2_terms) / (len(true_outputs) +  1e-8)
        return loss_data + loss_weight

    def accuracy(self, probs, true_outputs):

        prediction=np.argmax(probs, axis=1)
        true_label=np.argmax(true_outputs, axis=1)
        accuracy=np.mean(prediction == true_label)
        return accuracy

    def backward(self, probs, true_outputs):
        samples = len(true_outputs)

        self.dinputs = (probs - true_outputs) / samples
        return self.dinputs

class BinaryCrossEntropyLoss():
    def forward(self, y_pred, y_true, layers):
        y_pred = np.clip(y_pred, 1e-7, 1 - 1e-7)
        loss_data = -np.mean(y_true * np.log(y_pred) + (1 - y_true) * np.log(1 - y_pred))
        return loss_data

    def backward(self, dvalues, y_true):
        dvalues = np.clip(dvalues, 1e-7, 1 - 1e-7)
        self.dinputs = (dvalues - y_true) / len(y_true)
        return self.dinputs


In [None]:
class NeuralNetwork():
  def __init__(self,loss_function='CategoricalCrossEntropyLoss()',optimizer='adam',learning_rate=0.001):
     self.layers=[]
     self.loss_function = loss_function
     self.learning_rate = learning_rate
     self.optimizer = optimizer


  def add(self,layer):
    self.layers.append(layer)



  def fit(self, X_train, y_train,X_test,y_test, batch_size,epochs=10):
      self.epochs=epochs
      for epoch in range(self.epochs):
          epoch_loss = 0
          epoch_loss_val = 0
          for i in range(0, len(X_train), batch_size):
              batch_inputs = X_train[i:i + batch_size]
              batch_validate=X_test[i:i + batch_size]
              batch_true_outputs = y_train[i:i + batch_size]
              batch_validate_outputs = y_test[i:i + batch_size]

              x = batch_inputs
              #print(f'x ko shape{x.shape}')
              for layer in self.layers:
                  x = layer.forward(x)
                  #print(x.shape)


              loss = self.loss_function.forward(x, batch_true_outputs, self.layers)
              epoch_loss += loss  # Accumulate batch loss

              gradient = self.loss_function.backward(x, batch_true_outputs)
              for layer in reversed(self.layers):
                  # print(f'gradient is {gradient.shape}')
                  gradient = layer.backward(gradient)
                  # print(f'gradient is {gradient.shape}')

              for layer in self.layers:
                  layer.calculate(self.optimizer)

              for layer in self.layers:
                  layer.update(self.learning_rate, self.optimizer)


          print(f"Epoch {epoch + 1}, Loss: {epoch_loss / len(X_train) * batch_size}")  # Print average loss for the epoch
          epoch_accuracy = 0
          epoch_loss_val = 0
          # for i in range(0,len(X_test),batch_size):
          #     batch_validate = X_test[i:i + batch_size]
          #     batch_validate_true = y_test[i:i + batch_size]

          x2=X_test
          for layer in self.layers:
              x2=layer.forward(x2)

          loss_validate = self.loss_function.forward(x2, y_test, self.layers)
          accurate=self.loss_function.accuracy(x2, y_test)
          epoch_loss_val += loss_validate
          epoch_accuracy+=accurate
          print(f"Epoch {epoch + 1}, val_Loss: {epoch_loss_val},val_accuracy:{epoch_accuracy}")





In [None]:
import tensorflow as tf
# Load MNIST data
data = tf.keras.datasets.mnist
(X_train, y_train), (X_test, y_test) = data.load_data()

X_train=np.expand_dims(X_train,axis=3)
X_train=X_train/255
# X_train=X_train.reshape(60000,-1)
X_test=np.expand_dims(X_test,axis=3)
X_test=X_test/255
# X_test=X_test.reshape(10000,-1)
# print(X_train[0])
y_train = np.eye(10)[y_train]
y_test = np.eye(10)[y_test]
print(y_train.shape)
print(y_test.shape)
# y_test=np.random.randn(32,10)
# X_train=np.random.randint(0, 256, size=(6000, 28, 28, 1), dtype=np.int64)
# X_train=np.float64(X_train)
# y_train=np.random.randn(6000,10)
# X_test=np.random.randn(32,28,28,1)
# #Define network

loss_function = CategoricalCrossEntropyLoss()
learning_rate = 0.0001
optimizer = 'adam'



nn=NeuralNetwork(loss_function,optimizer,learning_rate)
nn.add(Convolve(Relu(), pooling=MaxPool((2,2)) , filter_size=(3,3),num_filters=8,stride=1,padding=1))
nn.add(Convolve(Relu(), pooling=MaxPool((2,2)) , filter_size=(3,3),num_filters=16,stride=1,padding=1))
nn.add(Flatten())

nn.add(Dense(784,128,Relu()))
nn.add(Dense(128,10,Softmax(final=True)))




batch_size = 30

epochs = 10
nn.fit(X_train, y_train,X_test,y_test, batch_size,30)

  # ''', 28, 28, 9
  # layer1 = Network(128, 784, lamda=0.01)
  # layer2 = Network(64, 128, lamda=0.01)
  # layer3 = Network(10, 64, lamda=0)
  # relu1 = Relu()

  # relu2 = Relu()
  # softmax = Softmax()


  # layers_for_fit = [layer1, relu1, layer2, relu2, layer3, softmax]
  # layers = [layer1, layer2, layer3]
  # '''
  # # Train model

(60000, 10)
(10000, 10)
Epoch 1, Loss: 2.7270918684342855
Epoch 1, val_Loss: 1.2651775577689803,val_accuracy:0.6079
Epoch 2, Loss: 1.241023995501714
Epoch 2, val_Loss: 1.04431728576858,val_accuracy:0.6423
Epoch 3, Loss: 0.9636371621728624
Epoch 3, val_Loss: 0.7946877669171629,val_accuracy:0.7304
Epoch 4, Loss: 0.8357104757592844
Epoch 4, val_Loss: 0.7970775387559589,val_accuracy:0.7295
Epoch 5, Loss: 0.754986335644144
Epoch 5, val_Loss: 0.6272306992364806,val_accuracy:0.8009


KeyboardInterrupt: 