In [1]:
import numpy as np
import random

In [2]:
class Conv:
    def __init__(self, kernel_size, in_chanels, out_chanels):
        self.kernel = np.random.rand(out_chanels, in_chanels, kernel_size[0], kernel_size[1])
        self.stride = 1
        self.inputz = None
        
    def corr(self, x, k):
        h, w = k.shape
        stride = self.stride
        height = (x.shape[0] - h + stride)/stride
        width = (x.shape[1] - w + stride)/stride
        Y = np.zeros((int(height), int(width)))
        
        for i in range(Y.shape[0]):
            for j in range(Y.shape[1]):
                Y[i, j] = (x[int(i*stride):int(i*stride) + h, int(j*stride):int(j*stride) + w] * k).sum() 
        return Y
    
    def corr2d_multi_in(self,X, K, pad):
        return sum(self.corr(np.pad(x, pad), k)for x, k in zip(X, K))
    
    def back(self, X, der, pad=0, stride=1):
        return np.stack([self.back_corr(x, der, pad, stride) for x in X], 0)

    def back_corr(self, X, der, pad, stride):
        return sum(self.corr(np.pad(x, pad), d)for x, d in zip(X, der))
    
    def foward(self, X, pad=0, stride=0):
        if stride:
            self.stride = stride
            
        self.inputz = X
        return np.stack([self.corr2d_multi_in(X, k, pad) for k in self.kernel], 0)
    
    def backward(self,der, lr, pad):
        X = np.asarray(self.inputz)
        
        dk = self.back(np.expand_dims(X, axis=1), der, pad=0, stride=1)
        dx = np.stack([self.corr2d_multi_in(der, k, pad) for k in np.flip(np.flip(self.kernel, 2), 1).transpose(1,0,2,3)], 0)
        
        for k in self.kernel:
            k = k - (-lr*dk)
        return dx

In [3]:
class max_pooling:
    def __init__(self, size, stride):
        self.h, self.w = size
        self.stride = stride
        self.gradient = None
        self.Y = None
        self.input = None
        
        
    def fpass(self, x):
        height = (x.shape[0] - self.h + self.stride)/self.stride
        width = (x.shape[1] - self.w + self.stride)/self.stride
        Y = np.zeros((int(height), int(width)))
        for i in range(Y.shape[0]):
            for j in range(Y.shape[1]):
                Y[i, j] = x[i*self.stride:i*self.stride + self.h, j*self.stride:j*self.stride + self.w].max()
        return Y
    
    
    def find_gradient(self, x, Y, der):
        grad = np.zeros((x.shape[0], x.shape[1]))
        for i in range(Y.shape[0]):
            for j in range(Y.shape[1]):
                a, b = np.where(x[i*self.stride:i*self.stride + self.h, j*self.stride:j*self.stride + self.w] == Y[i, j])
                grad[i*self.stride+a[0], j*self.stride+b[0]] = der[i, j]
        return grad
    
    
    def foward(self, X):
        self.input = X
        self.Y =  np.stack([self.fpass(x) for x in X], 0)
        return self.Y
        
        
    def backward(self, Der):
        X = self.input
        self.gradient =  np.stack([self.find_gradient(x, y, der) for x, y, der in zip(X, self.Y, Der)], 0)
        return self.gradient
    

In [4]:
class flatten_layer:
    def __init__(self):
        self.shape = None
        
    def flatten(self, X):
        self.shape = X.shape
        return np.expand_dims(X.flatten(), 0)
    
    def back(self, X):
        return X.resize(self.shape)

In [5]:
class FCL:
    def __init__(self, input_size, output_size):
        self.input_size = input_size
        self.weights = np.random.rand(input_size, output_size)
        self.bias = np.random.rand(1,output_size)
        self.input = None
    
    def foward(self,x):
        self.input = x
        y = x.dot(self.weights) + self.bias
        return y
    
    def backward(self, der, lr):
        x = self.input
        dw = x.T.dot(der)
        db = der
        dx = der.dot(self.weights.T)
        
        self.weights = self.weights - (-lr*dw)
        self.bias = self.bias - (-lr*db)
        
        #print(f"dw: {dw.shape} dx: {dx.shape}  db: {db.shape}")
        return dx

In [6]:
class tanh:
    def __init__(self):
        self.input = None

    def foward(self, x):
        self.input=x
        return np.tanh(x)

    def back(self):
        return 1 - np.tanh(self.input)**2
    
def softmax_cross_entropy_loss(yhat, y):
    yhat -= np.max(yhat, axis=1, keepdims=True)
    exp_scores = np.exp(yhat)
    probs = exp_scores / np.sum(exp_scores, axis=1, keepdims=True)
    logprobs = np.zeros([len(yhat),1])
    for r in range(len(yhat)): # For each element in the batch
        scale_factor = 1 / float(np.count_nonzero(y[r, :]))
        for c in range(len(y[r,:])): # For each class 
            if y[r,c] != 0:  # Positive classes
                logprobs[r] += -np.log(probs[r,c]) * y[r,c] * scale_factor # We sum the loss per class for each element of the batch
    data_loss = np.sum(logprobs) / len(yhat)    
    return probs, data_loss

def cross_backward(y, yhat, probs):
    delta = probs   # If the class label is 0, the gradient is equal to probs
    labels = y
    for r in range(len(yhat)):  # For each element in the batch
        scale_factor = 1 / float(np.count_nonzero(labels[r, :]))
        for c in range(len(labels[r,:])):  # For each class
            if labels[r, c] != 0:  # If positive class
                delta[r, c] = scale_factor * (delta[r, c] - 1) + (1 - scale_factor) * delta[r, c]
    return delta

In [7]:
class Model:
    def __init__(self):

        self.con1 = Conv([2,2],1,4)
        self.pool1 = max_pooling([2,2], 2)
        self.con2 = Conv([3,3],4,2)
        self.con3 = Conv([2,2],2,1)
        self.pool2 = max_pooling([2,2], 2)
        self.flat = flatten_layer()
        self.aclayer1 = tanh()
        self.layer1 = FCL(25,15)
        self.aclayer2 = tanh()
        self.layer2 = FCL(15,10)
        
    def foward(self, X, Y):
        out1 = self.con1.foward(X, stride=1, pad=0)
        #print(out1.shape)
        out2 = self.pool1.foward(out1)
        #print(out2.shape)
        out3 = self.con2.foward(out2, stride=1, pad=0)
        #print(out3.shape)
        out4 = self.con3.foward(out3, stride=1, pad=0)
        out5 = self.pool2.foward(out4)
        out6 = self.flat.flatten(out5)
        z1 = self.aclayer1.foward(out6)
        out7 = self.layer1.foward(z1)
        z2 = self.aclayer2.foward(out7)
        self.final = self.layer2.foward(z2)
        probs, loss = softmax_cross_entropy_loss(self.final, Y[0])
        return probs, loss
    
    def backward(self, probs, lr, Y):
        dl = cross_backward(Y[0], self.final, probs)
        d1 = self.layer2.backward(dl, lr)
        dz1 = d1 * self.aclayer2.back()
        d2 = self.layer1.backward(dz1, lr)
        dz2 = d2 * self.aclayer1.back()
        self.flat.back(dz2)
        d3 = self.pool2.backward(dz2)
        d4 = self.con3.backward(d3, lr, 1)
        #print(d4.shape)
        d5 = self.con2.backward(d4, lr, 2)
        #print(d5.shape)
        d6 = self.pool1.backward(d5)
        #print(d6.shape) 
        d7 = self.con1.backward(d6, lr, 1)
        #print(d7.shape)
        

In [8]:
from keras.datasets import mnist
from keras.utils import to_categorical

(train_image, train_label), (test_image, test_label) = mnist.load_data()

2023-09-20 23:52:32.381506: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE4.1 SSE4.2
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [9]:
train_image = train_image.astype('float32')/255
test_image = test_image.astype('float32')/255
train_image = np.expand_dims(train_image, 1)
test_image = np.expand_dims(test_image, 1)


train_label = to_categorical(train_label)
test_label = to_categorical(test_label)
train_label = np.expand_dims(train_label, 1)
test_label = np.expand_dims(test_label, 1)

In [10]:
test = Model()

In [11]:
from tqdm import tqdm

In [13]:
avg=0
labels = list()
t=0
for i in tqdm(range(len(train_image))):
    probs, loss = test.foward(train_image[i], np.array([train_label[i]]))
    avg += loss
    labels.append(probs)
    
    if i % 500 == 1:
        test.backward(probs, .005, train_label[t:i])
        labels = list()
        t = i
        
    if i % 6000 == 1:
        print(f"loss: {loss}  avg: {avg/i}  \ny = {train_label[i]} \nyhat = {probs}")
        
print(avg/len(train_image))

  0%|          | 12/60000 [00:00<18:42, 53.45it/s]

loss: 3.4391551911008302  avg: 6.4957609627493635  
ry = [[1. 0. 0. 0. 0. 0. 0. 0. 0. 0.]] 
yhat = [[ 0.03209179  0.13208643  0.04602457  0.24766555  0.01973615 -0.95295289
   0.07779267  0.23132864  0.03498796  0.13123914]]


 10%|█         | 6024/60000 [01:40<15:07, 59.49it/s]

loss: 3.717721247854944  avg: 2.643509091455922  
ry = [[1. 0. 0. 0. 0. 0. 0. 0. 0. 0.]] 
yhat = [[ 0.02428925  0.14092945  0.0450502   0.254894    0.01884076  0.04241727
   0.07854595  0.25059993  0.02660291 -0.88216972]]


 20%|██        | 12014/60000 [03:19<13:46, 58.05it/s]

loss: 4.0216271620613275  avg: 2.668580211151948  
ry = [[0. 0. 0. 0. 1. 0. 0. 0. 0. 0.]] 
yhat = [[-0.97859022  0.15154042  0.04394974  0.28377421  0.01792378  0.03801267
   0.07289356  0.25408756  0.02002556  0.09638272]]


 30%|███       | 18010/60000 [05:00<12:05, 57.87it/s]

loss: 1.0999386592922265  avg: 2.706014370103567  
ry = [[0. 0. 0. 1. 0. 0. 0. 0. 0. 0.]] 
yhat = [[ 0.01625186  0.10906076  0.03402533  0.3328915   0.01356172  0.0344398
   0.06302046  0.2879228   0.01646518 -0.90763941]]


 40%|████      | 24016/60000 [06:40<10:19, 58.08it/s]

loss: 3.479364503454057  avg: 2.7564159720979373  
ry = [[0. 0. 0. 0. 0. 1. 0. 0. 0. 0.]] 
yhat = [[ 0.01551245 -0.88586767  0.03049533  0.33675119  0.0119102   0.030827
   0.06294727  0.30894809  0.01450784  0.07396831]]


 50%|█████     | 30020/60000 [08:21<08:31, 58.64it/s]

loss: 1.0918883049248718  avg: 2.8051088113768383  
ry = [[0. 0. 0. 0. 0. 0. 0. 1. 0. 0.]] 
yhat = [[ 0.01252615  0.09264876 -0.97293978  0.37488261  0.00816495  0.02521326
   0.0531298   0.33558221  0.01268448  0.05810757]]


 60%|██████    | 36013/60000 [10:01<06:59, 57.23it/s]

loss: 3.8463733964907005  avg: 2.862528915429319  
ry = [[0. 0. 1. 0. 0. 0. 0. 0. 0. 0.]] 
yhat = [[ 0.0097712   0.0716068   0.02135705  0.46773267  0.00634341  0.01692484
   0.04661529  0.2985306  -0.99010322  0.05122138]]


 70%|███████   | 42021/60000 [11:42<05:07, 58.40it/s]

loss: 4.251908836012375  avg: 2.940383338991663  
ry = [[0. 0. 1. 0. 0. 0. 0. 0. 0. 0.]] 
yhat = [[ 0.00699692  0.05398883  0.01423703  0.59385138  0.00490734  0.01219813
   0.03448011  0.23477896  0.00653922 -0.96197791]]


 80%|████████  | 48011/60000 [13:22<03:28, 57.63it/s]

loss: 1.8317429488949826  avg: 3.036695803776055  
ry = [[0. 0. 0. 0. 0. 0. 0. 1. 0. 0.]] 
yhat = [[ 0.00463431 -0.96571539  0.00745413  0.73542402  0.00299532  0.00748768
   0.0233591   0.16013422  0.0039958   0.0202308 ]]


 90%|█████████ | 54010/60000 [15:02<01:43, 57.70it/s]

loss: 0.1454346455774124  avg: 3.170567044068309  
ry = [[0. 0. 0. 1. 0. 0. 0. 0. 0. 0.]] 
yhat = [[ 0.00253023  0.01502502  0.00320764  0.8646464   0.00139159  0.00409695
   0.01293667 -0.91686976  0.00185789  0.01117736]]


100%|██████████| 60000/60000 [16:43<00:00, 59.81it/s]

3.3369736824896012



