In [4188]:
import tensorflow as tf
import numpy as np
import pickle
import sys
import mnist
np.set_printoptions(threshold=sys.maxsize)
np.set_printoptions(suppress=True)

In [4189]:
gpus = tf.config.experimental.list_physical_devices('GPU')
try:
    tf.config.experimental.set_virtual_device_configuration(gpus[0], [tf.config.experimental.VirtualDeviceConfiguration(memory_limit=1024)])
except RuntimeError as e:
    print(e)

In [4190]:
from tensorflow import keras
fashion_mnist =keras.datasets.fashion_mnist
(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()
class_names = ['T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat', 
               'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']




test_images = np.expand_dims(test_images, axis=3)
train_images = np.expand_dims(train_images, axis=3)
train_images = train_images/255.0
test_images = test_images/255.0

In [4191]:
class Conv:
    def __init__(self, name, num_filters, kernel_size=3, padding=((1,1),(1,1)), stride=(1,1)):
        self.name = name
        self.kernel_size = kernel_size
        self.num_filters = num_filters        
        
        self.padding = padding
        self.stride = stride
        self.filters=None
        self.bias=None
        with open('initial_weights_lenet/'+self.name+'.weight', 'rb') as fp:
            variables = pickle.load(fp)
            self.filters = variables[0].numpy()
            self.bias = variables[1].numpy()
            
        
    def get_input_pixel(self, image, in_row, in_col, in_ch, height, width):
        row = in_row - self.padding[0][0]
        col = in_col - self.padding[1][0]
        if (row<0 or col<0 or row >= height or col >= width):
            return np.zeros((image.shape[0]))
        else:
            return image[...,row,col,in_ch]

    def iterate_regions(self, image, out_h, out_w):
   
        
        im_region = np.zeros((image.shape[0],self.kernel_size, self.kernel_size, image.shape[3]))

        for i in range(out_h):
            for j in range(out_w):
                for k_h in range(self.kernel_size):
                    for k_w in range(self.kernel_size):
                        in_h = k_h + i*self.stride[0]
                        in_w = k_w + j*self.stride[1]
                        for ch in range(image.shape[3]):                            
                            im_region[:,k_h,k_w,ch] = self.get_input_pixel(image, 
                                                                      in_h, in_w, ch, 
                                                                      image.shape[1], image.shape[2])
                yield im_region, i, j
                
    def forward(self, input):
        in_n, in_h, in_w, in_ch = input.shape
        
        #self.filters = np.random.randn(self.num_filters, self.kernel_size, self.kernel_size, 
         #                              in_ch) / 9
        self.filters = np.array(self.filters)
        self.bias = np.array(self.bias)
       
        u_pad = self.padding[0][0]
        d_pad = self.padding[0][1]
        l_pad = self.padding[1][0]
        r_pad = self.padding[1][1]
                                        
        out_h = int((in_h + u_pad + d_pad - self.kernel_size)//self.stride[0] + 1)
        out_w = int((in_w + l_pad + r_pad - self.kernel_size)//self.stride[1] + 1)
        
        output = np.zeros((in_n, out_h, out_w, self.num_filters))
        self.last_input = input
        

        for im_region, i, j in self.iterate_regions(input, out_h, out_w):
            for ch in range(self.num_filters):
                output[:,i,j,ch] = np.sum(im_region * 
                                          self.filters[...,ch], 
                                          axis=(1,2,3))+self.bias[ch]
        output=np.maximum(output,0)
            
        return output
    
    def backward(self, dL_dout, out, learning_rate,update=True):        
        dact_dconv = np.ones(dL_dout.shape) 
        dact_dconv = np.where(out>0, 1, 0)
        

        dL_dconv = dL_dout * dact_dconv
        in_n, in_h, in_w, in_ch = self.last_input.shape
        u_pad = self.padding[0][0]
        d_pad = self.padding[0][1]
        l_pad = self.padding[1][0]
        r_pad = self.padding[1][1]
        out_h = int((in_h + u_pad + d_pad - self.kernel_size)//self.stride[0] + 1)
        out_w = int((in_w + l_pad + r_pad - self.kernel_size)//self.stride[1] + 1)
        
        dL_dw = np.zeros((self.last_input.shape[0],
                          self.filters.shape[0], self.filters.shape[1], 
                          self.filters.shape[2], self.filters.shape[3]))
        
        dL_din = np.zeros(self.last_input.shape)
        
        dL_dbias = np.zeros((self.last_input.shape[0],self.bias.shape[0]))
        
        for im_region, i, j in self.iterate_regions(self.last_input, out_h, out_w):
            origin_start_in_h = i*self.stride[0]-u_pad
            origin_end_in_h = origin_start_in_h + self.kernel_size
            start_kernel_h = - origin_start_in_h if origin_start_in_h < 0 else 0
            end_kernel_h = self.kernel_size - (origin_end_in_h - in_h) if origin_end_in_h > in_h else self.kernel_size
            
            origin_start_in_w = j*self.stride[0]-l_pad
            origin_end_in_w = origin_start_in_w + self.kernel_size
            start_kernel_w = - origin_start_in_w if origin_start_in_w < 0 else 0
            end_kernel_w = self.kernel_size - (origin_end_in_w - in_w) if origin_end_in_w > in_w else self.kernel_size
            
            start_in_h = max(origin_start_in_h,0)
            end_in_h = min(origin_end_in_h,in_h)
            start_in_w = max(origin_start_in_w,0)
            end_in_w = min(origin_end_in_w,in_w)
            
            #print('in_h:{}'.format(in_h))
            #print('origin_start_in_h:{}, origin_end_in_h:{}'.format(origin_start_in_h, origin_end_in_h))
            #print('origin_start_in_w:{}, origin_end_in_w:{}'.format(origin_start_in_w, origin_end_in_w))
            #print('start_in_h:{}, end_in_h:{}'.format(start_in_h, end_in_h))
            #print('start_in_w:{}, end_in_w:{}'.format(start_in_w, end_in_w))
            #print('start_kernel_h:{}, end_kernel_h:{}'.format(start_kernel_h, end_kernel_h))
            #print('start_kernel_w:{}, end_kernel_w:{}\n'.format(start_kernel_w, end_kernel_w))
            
            for n in range(im_region.shape[0]):
                for f in range(self.num_filters):
                    dL_dw[n,...,f] += dL_dconv[n,i,j,f] * im_region[n,...]
                    in_h = i*self.stride[0]
                    in_w = j*self.stride[1]
                    
                    
                    #dL_din[n,in_h:in_h+self.kernel_size, in_w: in_w+self.kernel_size,:] += dL_dconv[n,i,j,f] * self.filters[...,f]
                    dL_din[n,start_in_h:end_in_h, start_in_w:end_in_w,:] += dL_dconv[n,i,j,f] * self.filters[start_kernel_h:end_kernel_h, start_kernel_w:end_kernel_w,:,f]
                    
        
        
        dL_dbias[...] = np.sum(dL_dconv[...], axis=(0,1,2))
        if self.name == 'conv1':
            print('{} filters'.format(self.name))
            print(self.bias)
            print('grads')
            print(dL_dbias)
        if update:
            for n in range(dL_dw.shape[0]):
                self.filters -= learning_rate * dL_dw[n]
                self.bias -= learning_rate * dL_dbias[n]

        return dL_din
    


In [4192]:
a = np.array([[[1,1,2,1], [2,2,3,2], [3,3,4,3]],[[1,1,5,1], [2,2,6,2], [3,3,7,3]]])
print(a)
np.sum(a,axis=(0,1))

[[[1 1 2 1]
  [2 2 3 2]
  [3 3 4 3]]

 [[1 1 5 1]
  [2 2 6 2]
  [3 3 7 3]]]


array([12, 12, 27, 12])

In [4193]:
class MaxPool:
    def __init__(self, name,kernel_size=2,padding=((1,1),(1,1)), stride=(1,1)):
        self.name = name
        self.kernel_size = kernel_size
        self.stride = stride
        self.padding = padding
        
    def get_input_pixel(self, image, in_row, in_col, in_ch, height, width):
        row = in_row - self.padding[0][0]
        col = in_col - self.padding[1][0]
        if (row<0 or col<0 or row >= height or col >= width):
            return np.zeros((image.shape[0]))
        else:
            return image[...,row,col,in_ch]

    def iterate_regions(self, image, out_h, out_w, out_ch):   
        
        im_region = np.zeros((image.shape[0],self.kernel_size, self.kernel_size))

        ch = out_ch
        for i in range(out_h):
            for j in range(out_w):
                for k_h in range(self.kernel_size):
                    for k_w in range(self.kernel_size):
                        in_h = k_h + i*self.stride[0]
                        in_w = k_w + j*self.stride[1]                           
                        im_region[...,k_h,k_w] = self.get_input_pixel(image, 
                                                                      in_h, in_w, ch, 
                                                                      image.shape[1], image.shape[2])
                yield im_region, i, j, ch
                
    def forward(self, input):
        in_n,in_h, in_w, in_ch = input.shape
        u_pad = self.padding[0][0]
        d_pad = self.padding[0][1]
        l_pad = self.padding[1][0]
        r_pad = self.padding[1][1]
        
        self.last_input = input
        self.input_shape = input.shape
        out_h = int((in_h + u_pad + d_pad - self.kernel_size)//self.stride[0] + 1)
        out_w = int((in_w + l_pad + r_pad - self.kernel_size)//self.stride[1] + 1)
        out_ch = in_ch
        output = np.zeros((in_n,out_h, out_w, out_ch))
        self.output_shape = output.shape
        for ch in range(out_ch):
            for im_region, i, j, ch in self.iterate_regions(input, out_h, out_w, ch):
                output[...,i,j,ch] = np.amax(im_region, axis=(1,2))

        return output
    
    def backward(self, d_L_d_out):
        in_n,in_h, in_w, in_ch = self.input_shape
        u_pad = self.padding[0][0]
        d_pad = self.padding[0][1]
        l_pad = self.padding[1][0]
        r_pad = self.padding[1][1]
        
        d_L_d_input = np.zeros(self.last_input.shape)
        out_h = int((in_h + u_pad + d_pad - self.kernel_size)//self.stride[0] + 1)
        out_w = int((in_w + l_pad + r_pad - self.kernel_size)//self.stride[1] + 1)
        out_ch = in_ch
        for ch in range(out_ch):
            for im_region, i, j, ch in self.iterate_regions(self.last_input, out_h, out_w, ch):
                n,h,w = im_region.shape
                amax = np.amax(im_region, axis=(1,2))
                for ni in range(n):
                    for i2 in range(h):
                        for j2 in range(w):
                            if np.all(im_region[ni,i2, j2] == amax[ni,...]):
                                d_L_d_input[ni,i*self.stride[0]+i2, j*self.stride[1]+j2,ch]=d_L_d_out[ni,i,j,ch]
                            
        return d_L_d_input

In [4194]:
class FC:
    def __init__(self, name, num_filters, activation):
        self.name = name
        self.num_filters = num_filters
        self.activation = activation
        with open('initial_weights_lenet/'+self.name+'.weight', 'rb') as fp:
            variables = pickle.load(fp)
            self.filters = variables[0].numpy()
            self.bias = variables[1].numpy()
        
    def forward(self, input):
        
        self.filters = np.array(self.filters)
        self.bias = np.array(self.bias)
        
        output = np.zeros((input.shape[0],self.num_filters))
        
        if len(input.shape) is 4:
            flattend_in = input.flatten().reshape(input.shape[0],input.shape[1]*input.shape[2]*input.shape[3])
        elif len(input.shape) is 2:
            flattend_in = input
        
        self.input_shape = input.shape
        self.cache = flattend_in

        for ch in range(self.num_filters):
            output[...,ch] = np.sum(flattend_in*self.filters[...,ch], axis=1)+self.bias[ch]
            if self.activation == 'relu':
                output = np.maximum(output,0)
                    
        return output
 
    def backward(self, dL_dout, out, learning_rate=0.01,update=True):
        #dout_dw = np.zeros((dL_dout.shape[0], self.cache.shape[1], self.num_filters))
        #for i,d in enumerate(dL_dout):
        #    dout_dw[i,...] = np.repeat(self.cache[i,:,np.newaxis],self.num_filters, axis=1)
        #    dL_dw = dout_dw[i,...] * d[np.newaxis,np.newaxis,:]
        dL_dact = np.ones(dL_dout.shape) 
        if self.activation == 'relu':
            dL_dact = np.expand_dims(np.where(out>0, 1, 0),axis=1)
        dL_dact = dL_dact * dL_dout
            
        
        dL_dw = np.zeros((dL_dout.shape[0],self.cache.shape[1], self.num_filters))
        dL_din = np.zeros((self.cache.shape[0], 1, self.cache.shape[1]))
        dL_dbias = np.zeros((dL_dout.shape[0], self.bias.shape[0]))
        
        for n in range(dL_dout.shape[0]):
            for i in range(self.num_filters):
                dL_dw[n,:,i] = dL_dact[n,:,i] * self.cache[n,:]
            dL_dbias[n,...] = dL_dact[n,...]
        for i in range(dL_din.shape[0]):
            dL_din[i,0,:] = np.sum(dL_dact[i,:,:] * self.filters[:,:],axis=1)

        if self.name == 'fc0':
            print('{} filters'.format(self.name))
            print(self.bias)
            print('grads')
            print(dL_dbias)
        if update:
            for n in range(self.cache.shape[0]):  
                self.filters -= learning_rate * dL_dw[n]
                self.bias -= learning_rate * dL_dbias[n]

        return dL_din

In [4195]:
class Softmax:
    def __init__(self):
        pass
        
    def forward(self, input):
        exp = np.exp(input)
        prob = exp / np.expand_dims(np.sum(exp, axis=1),axis=1)
        self.cache = prob
        
        return prob
    
    def backward(self, dL_dout, labels):
        dout_din = np.zeros((dL_dout.shape[0], dL_dout.shape[1], self.cache.shape[1]))
        for i,(p, l) in enumerate(zip(self.cache, labels)):
            dout_din[i] = (-self.cache[i]*self.cache[i][l]) * dL_dout[i]
            dout_din[i][0][l] = self.cache[i][l]*(1-self.cache[i][l])* dL_dout[i]
        return dout_din
        

In [4196]:
from decimal import Decimal
class CrossEntropy:
    def __init__(self):
        pass
    
    def __call__(self, predictions, labels):
        loss=0.0
        for p,l in zip(predictions, labels):
            loss = np.float64(Decimal(-np.log(p[l])))
        return loss
    
    def backward(self, predictions, labels):
        input_shape = predictions.shape
        dL_dout = np.zeros((input_shape[0], 1))
        for i,(p,l) in enumerate(zip(predictions,labels)):
            dL_dout[i][0] = -1/p[l]
        return dL_dout


In [4197]:
conv1 = Conv('conv1',6,5,((2,2),(2,2)), (1,1))
pool1 = MaxPool('pool1',2,padding=((0,0),(0,0)),stride=(2,2))
conv2 = Conv('conv2',16,5,((0,0),(0,0)), (1,1))
pool2 = MaxPool('pool2',2,padding=((0,0),(0,0)),stride=(2,2))
fc1 = FC('fc1', 120, 'relu')
fc2 = FC('fc2', 84, 'relu')
fc3 = FC('fc3', 10, 'linear')
softmax =Softmax()
loss = CrossEntropy()

image_dataset=tf.data.Dataset.from_tensor_slices(train_images[:4]).batch(1)
label_dataset=tf.data.Dataset.from_tensor_slices(train_labels[:4]).batch(1)

for i, (x,y) in enumerate(zip(image_dataset, label_dataset)):
#conv1_output = conv1.forward(train_images[:4])
    conv1_output = conv1.forward(x)
    pool1_output = pool1.forward(conv1_output)
    conv2_output = conv2.forward(pool1_output)
    pool2_output = pool2.forward(conv2_output)
    fc1_output = fc1.forward(pool2_output)
    fc2_output = fc2.forward(fc1_output)
    fc3_output = fc3.forward(fc2_output)
    out = softmax.forward(fc3_output)
    print('model output')
    print(out)
    print('loss')
    print(loss(out,y))

    #dL_dsm = loss.backward(out, train_labels[:4])
    #dsm_dfc3 = softmax.backward(dL_dsm, train_labels[:4])
    dL_dsm = loss.backward(out, y)
    dsm_dfc3 = softmax.backward(dL_dsm, y)
    dfc3_dfc2 = fc3.backward(dsm_dfc3, fc3_output,update=True)
    dfc2_dfc1 = fc2.backward(dfc3_dfc2,fc2_output,update=True)
    dfc1_dpool2 = fc1.backward(dfc2_dfc1, fc1_output,update=True)
    dfc1_dpool2 = dfc1_dpool2.reshape((pool2.output_shape[0],pool2.output_shape[1], pool2.output_shape[2], pool2.output_shape[3]))
    dpool2_dconv2 = pool2.backward(dfc1_dpool2)
    dconv2_dpool1 = conv2.backward(dpool2_dconv2,conv2_output,0.01,update=True)
    dpool1_dconv1 = pool1.backward(dconv2_dpool1)
    dconv1_dinput = conv1.backward(dpool1_dconv1,conv1_output,0.01,update=True)




model output
[[0.10315515 0.0828084  0.089298   0.10259349 0.09754225 0.08758666
  0.1112497  0.10728637 0.09372789 0.12475208]]
loss
2.08142688828893
conv1 filters
[0. 0. 0. 0. 0. 0.]
grads
[[-0.07368661 -0.18158476  0.11003846 -0.26580147 -0.09740951  0.08516611]]
model output
[[0.10172734 0.07764438 0.08911259 0.09566537 0.09221403 0.09939929
  0.10701083 0.11388674 0.09024125 0.13309818]]
loss
2.285459151272034
conv1 filters
[ 0.00073687  0.00181585 -0.00110038  0.00265801  0.0009741  -0.00085166]
grads
[[-0.04435083  0.19257417 -0.09588984  0.04751892 -0.22196138  0.0213418 ]]
model output
[[0.10505752 0.09077225 0.09642118 0.09976182 0.09310231 0.09788315
  0.10053141 0.10240769 0.09307689 0.12098578]]
loss
2.253247305604628
conv1 filters
[ 0.00118037 -0.00010989 -0.00014149  0.00218283  0.00319371 -0.00106508]
grads
[[-0.58939721  0.04575713 -0.27544069 -0.20367812 -0.32374471  0.00046923]]
model output
[[0.11077128 0.08741872 0.09245274 0.10430553 0.08783333 0.09546447
  0.1025

In [None]:
from random import *
import matplotlib.pyplot as plt
test_num = randint(1,1000)

plt.figure()
plt.imshow(train_images[test_num])
print(train_images[test_num].shape)
plt.colorbar()
plt.grid(False)
plt.show()
prediction = forward(test_images[test_num], test_labels[test_num])
print(test_labels[test_num])
print(prediction)
print(np.argmax(forward(test_images[test_num], test_labels[test_num])[0]))
