In [1]:
import numpy as np
import math
from keras.datasets import mnist
from sklearn.preprocessing import OneHotEncoder
from sklearn.metrics import accuracy_score
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import f1_score
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
%matplotlib inline
from sklearn.preprocessing import OneHotEncoder
from sklearn.model_selection import train_test_split

In [2]:
class SimpleInitializerConv2d:
    def __init__(self, sigma=0.01):
        self.sigma = sigma
    def W(self, F, C, FH, FW):
        return self.sigma * np.random.randn(F,C,FH,FW)
    def B(self, F):
        return np.zeros(F)

In [3]:
class ReLU:
    def forward(self, A):
        self.A = A
        return np.clip(A, 0, None)
    def backward(self, dZ):
        return dZ * np.clip(np.sign(self.A), 0, None)

In [4]:
class SGD:
    def __init__(self, lr):
        self.lr = lr
    def update(self, layer):
        layer.W -= self.lr * layer.dW
        layer.B -= self.lr * layer.dB
        return

In [5]:
class SimpleConv2d():
    def __init__(self, F, C, FH, FW, P, S,initializer=None,optimizer=None,activation=None):
        self.P = P
        self.S = S
        self.initializer = initializer
        self.optimizer = optimizer
        self.activation = activation
        self.W = self.initializer.W(F,C,FH,FW)
        self.B = self.initializer.B(F)

    def output_shape2d(self,H,W,PH,PW,FH,FW,SH,SW):
        OH = (H +2*PH -FH)/SH +1
        OW = (W +2*PW -FW)/SW +1
        return int(OH),int(OW)

    def forward(self, X, debug=False):
        self.X = X
        N,C,H,W = self.X.shape
        F,C,FH,FW = self.W.shape
        OH,OW = self.output_shape2d(H,W,self.P,self.P,FH,FW,self.S,self.S)
        self.params = N,C,H,W,F,FH,FW,OH,OW
        A = np.zeros([N,F,OH,OW])
        self.X_pad = np.pad(self.X,((0,0),(0,0),(self.P,self.P),(self.P,self.P)))
        for n in range(N):
            for ch in range(F):
                for row in range(0,H,self.S):
                    for col in range(0,W,self.S):
                        if self.P == 0 and (W-2 <= col or H-2<=row):
                            continue
                        A[n,ch,row,col] = np.sum(self.X_pad[n,:,row:row+FH,col:col+FW]*self.W[ch,:,:,:]) +self.B[ch]
        if debug==True:
            return A
        else:
            return  self.activation.forward(A)

    def backward(self, dZ, debug=False):
        if debug==True:
            dA = dZ
        else:
            dA = self.activation.backward(dZ)
            
        N,C,H,W,F,FH,FW,OH,OW = self.params
        dZ = np.zeros(self.X_pad.shape)
        self.dW = np.zeros(self.W.shape)
        self.dB = np.zeros(self.B.shape)
        for n in range(N):
            for ch in range(F):
                for row in range(0,H,self.S):
                    for col in range(0,W,self.S):
                        if self.P == 0 and (W-2 <= col or H-2<=row):
                            continue
                        dZ[n,:,row:row+FH,col:col+FW] += dA[n,ch,row,col]*self.W[ch,:,:,:]
        if self.P == 0:
            dZ = np.delete(dZ,[0,H-1],axis=2)
            dZ = np.delete(dZ,[0,W-1],axis=3)
        else:
            dl_rows = range(self.P),range(H+self.P,H+2*self.P,1)
            dl_cols = range(self.P),range(W+self.P,W+2*self.P,1)
            dZ = np.delete(dZ,dl_rows,axis=2)
            dZ = np.delete(dZ,dl_cols,axis=3)
        for n in range(N):
            for ch in range(F):
                for row in range(OH):
                    for col in range(OW):
                        self.dW[ch,:,:,:] += dA[n,ch,row,col]*self.X_pad[n,:,row:row+FH,col:col+FW]
        for ch in range(F):
            self.dB[ch] = np.sum(dA[:,ch,:,:])
        self = self.optimizer.update(self)
        return dZ

In [6]:
class MaxPool2D():
    def __init__(self,P):
        self.P = P
        self.PA = None
        self.Pindex = None

    def forward(self,A):
        N,F,OH,OW = A.shape
        PH,PW = int(OH/self.P),int(OW/self.P)
        self.params = N,F,OH,OW,self.P,PH,PW
        self.PA = np.zeros([N,F,PH,PW])
        self.Pindex = np.zeros([N,F,PH,PW])
        for n in range(N):
            for ch in range(F):
                for row in range(PH):
                    for col in range(PW):
                        self.PA[n,ch,row,col] = np.max(A[n,ch,row*self.P:row*self.P+self.P,col*self.P:col*self.P+self.P])
                        self.Pindex[n,ch,row,col] = np.argmax(A[n,ch,row*self.P:row*self.P+self.P,col*self.P:col*self.P+self.P])
        return self.PA

    def backward(self,dA):
        N,F,OH,OW,PS,PH,PW = self.params
        dP = np.zeros([N,F,OH,OW])
        for n in range(N): 
            for ch in range(F):
                for row in range(PH):
                    for col in range(PW):
                        idx = self.Pindex[n,ch,row,col]
                        tmp = np.zeros((PS*PS))
                        for i in range(PS*PS):
                            if i == idx:
                                tmp[i] = dA[n,ch,row,col]
                            else:
                                tmp[i] = 0
                        dP[n,ch,row*PS:row*PS+PS,col*PS:col*PS+PS] = tmp.reshape(PS,PS)
        return dP

In [7]:
class AvgPool2D():
    def __init__(self,P):
        self.P = P
        self.PA = None
        self.Pindex = None

    def forward(self,A):
        N,F,OH,OW = A.shape
        PH,PW = int(OH/self.P),int(OW/self.P)
        self.params = N,F,OH,OW,self.P,PH,PW
        self.PA = np.zeros([N,F,PH,PW])
        #self.Pindex = np.zeros([N,F,PH,PW])
        for n in range(N):
            for ch in range(F):
                for row in range(PH):
                    for col in range(PW):
                        self.PA[n,ch,row,col] = np.mean(A[n,ch,row*self.P:row*self.P+self.P,col*self.P:col*self.P+self.P])
                        #self.Pindex[n,ch,row,col] = np.argmax(A[n,ch,row*self.P:row*self.P+self.P,col*self.P:col*self.P+self.P])
        return self.PA

    def backward(self,dA):
        N,F,OH,OW,PS,PH,PW = self.params
        dP = np.zeros([N,F,OH,OW])
        for n in range(N): 
            for ch in range(F):
                for row in range(PH):
                    for col in range(PW):
                        #idx = self.Pindex[n,ch,row,col]
                        tmp = np.zeros((PS*PS))
                        for i in range(PS*PS):
                            #if i == idx:
                            tmp[i] += dA[n,ch,row,col]
                            # else:
                            #     tmp[i] = 0
                        dP[n,ch,row*PS:row*PS+PS,col*PS:col*PS+PS] = tmp.reshape(PS,PS)
        return dP

In [8]:
class Flatten:
    def __init__(self):
        pass
    def forward(self,X):
        self.shape = X.shape
        return X.reshape(len(X),-1)
    def backward(self,X):
        return X.reshape(self.shape)  

In [9]:
### CNN NETWORK
from sklearn.metrics import accuracy_score

In [38]:
class Scratch2dCNNClassifier():
    def __init__(self, NN, CNN, n_epoch=5, n_batch=1, verbose = False):
        self.NN = NN
        self.CNN = CNN
        self.n_epoch = n_epoch
        self.n_batch = n_batch
        self.verbose = verbose
        self.log_loss = np.zeros(self.n_epoch)
        self.log_acc = np.zeros(self.n_epoch)

    def loss_function(self,y,yt):
        delta = 1e-7
        return -np.mean(yt*np.log(y+delta))
        
    def accuracy(self,Z,Y):
        return accuracy_score(Y,Z)

    def fit(self, X, y, X_val=False, y_val=False):
        for epoch in range(self.n_epoch):
            get_mini_batch = GetMiniBatch(X, y, batch_size=self.n_batch)
            self.loss = 0
            
            for mini_X_train, mini_y_train in get_mini_batch:              
                forward_data = mini_X_train[:,np.newaxis,:,:]
                for layer in range(len(self.CNN)):
                    forward_data = self.CNN[layer].forward(forward_data)
                flt = Flatten()
                forward_data = flt.forward(forward_data)
                for layer in range(len(self.NN)):
                    forward_data = self.NN[layer].forward(forward_data)
                    Z = forward_data
                
                backward_data = (Z - mini_y_train)/self.n_batch
                for layer in range(len(self.NN)-1,-1,-1):
                    backward_data = self.NN[layer].backward(backward_data)
                    backward_data = flt.backward(backward_data)
                
                for layer in range(len(self.CNN)-1,-1,-1):
                    backward_data = self.CNN[layer].backward(backward_data)
                   
                self.loss += self.loss_function(Z,mini_y_train)
                if self.verbose:
                    print('batch loss %f'%self.loss_function(Z,mini_y_train))
                    
            if self.verbose:
                print(self.loss/len(get_mini_batch), self.accuracy(self.predict(X),np.argmax(y,axis=1)))
                
            self.log_loss[epoch] = self.loss/len(get_mini_batch)
            self.log_acc[epoch] = self.accuracy(self.predict(X),np.argmax(y,axis=1))

    def predict(self, X):
        pred_data = X[:,np.newaxis,:,:]
        for layer in range(len(self.CNN)):
            pred_data = self.CNN[layer].forward(pred_data)
        
        flt = Flatten()
        pred_data = flt.forward(pred_data)
        
        for layer in range(len(self.NN)):
            pred_data = self.NN[layer].forward(pred_data)
        return np.argmax(pred_data,axis=1)

In [37]:
# Activation functions 

class Sigmoid:
    def forward(self, A):
        self.A = A
        return self.sigmoid(A)
    def backward(self, dZ):
        _sig = self.sigmoid(self.A)
        return dZ * (1 - _sig)*_sig
    def sigmoid(self, X):
        return 1 / (1 + np.exp(-X))

class Tanh:
    def forward(self, A):
        self.A = A
        return np.tanh(A)
    def backward(self, dZ):
        return dZ * (1 - (np.tanh(self.A))**2)

class Softmax:
    def forward(self, X):
        self.Z = np.exp(X) / np.sum(np.exp(X), axis=1).reshape(-1,1)
        return self.Z
    def backward(self, Y):
        self.loss = self.loss_func(Y)
        return self.Z - Y
    def loss_func(self, Y, Z=None):
        if Z is None:
            Z = self.Z
        return (-1)*np.average(np.sum(Y*np.log(Z), axis=1))

In [29]:
# FC layer

class FC:
    def __init__(self, n_nodes1, n_nodes2, initializer, optimizer, activation):
        self.optimizer = optimizer
        self.activation = activation
        self.W = initializer.W(n_nodes1, n_nodes2)
        self.B = initializer.B(n_nodes2)
    def forward(self, X):
        self.X = X
        A = X@self.W + self.B
        A = self.activation.forward(A)
        return A
    def backward(self, dA):
        dA = self.activation.backward(dA)
        dZ = dA@self.W.T
        self.dB = np.sum(dA, axis=0)
        self.dW = self.X.T@dA
        self.optimizer.update(self)
        return dZ

In [13]:
# initializers
class XavierInitializer:
    def W(self, n_nodes1, n_nodes2):
        self.sigma = math.sqrt(1 / n_nodes1)
        W = self.sigma * np.random.randn(n_nodes1, n_nodes2)
        return W
    def B(self, n_nodes2):
        B = self.sigma * np.random.randn(n_nodes2)
        return B
    
class HeInitializer():
    def W(self, n_nodes1, n_nodes2):
        self.sigma = math.sqrt(2 / n_nodes1)
        W = self.sigma * np.random.randn(n_nodes1, n_nodes2)
        return W
    def B(self, n_nodes2):
        B = self.sigma * np.random.randn(n_nodes2)
        return B
        
class SimpleInitializer:
    def __init__(self, sigma):
        self.sigma = sigma
    def W(self, *shape):
        W = self.sigma * np.random.randn(*shape)
        return W
    def B(self, *shape):
        B = self.sigma * np.random.randn(*shape)
        return B

In [14]:
class AdaGrad:
    def __init__(self, lr):
        self.lr = lr
        self.HW = 1
        self.HB = 1
    def update(self, layer):
        self.HW += layer.dW**2
        self.HB += layer.dB**2
        layer.W -= self.lr * np.sqrt(1/self.HW) * layer.dW
        layer.B -= self.lr * np.sqrt(1/self.HB) * layer.dB

In [15]:
#batch size

class GetMiniBatch:
    def __init__(self, X, y, batch_size = 20, seed=0):
        self.batch_size = batch_size
        np.random.seed(seed)
        shuffle_index = np.random.permutation(np.arange(X.shape[0]))
        self._X = X[shuffle_index]
        self._y = y[shuffle_index]
        self._stop = np.ceil(X.shape[0]/self.batch_size).astype('int')
    def __len__(self):
        return self._stop
    def __getitem__(self,item):
        p0 = item*self.batch_size
        p1 = item*self.batch_size + self.batch_size
        return self._X[p0:p1], self._y[p0:p1] 
    def __iter__(self):
        self._counter = 0
        return self
    def __next__(self):
        if self._counter >= self._stop:
            raise StopIteration()
        p0 = self._counter*self.batch_size
        p1 = self._counter*self.batch_size + self.batch_size
        self._counter += 1
        return self._X[p0:p1], self._y[p0:p1]

Test

In [16]:
import numpy as np
x = np.array([[[[ 1,  2,  3,  4],[ 5,  6,  7,  8],[ 9, 10, 11, 12],[13, 14, 15, 16]]]])
w = np.array([[[[ 0.,  0.,  0.],[ 0.,  1.,  0.],[ 0., -1.,  0.]]],[[[ 0.,  0.,  0.],[ 0., -1.,  1.],[ 0.,  0.,  0.]]]])

In [17]:
x.shape, w.shape

((1, 1, 4, 4), (2, 1, 3, 3))

In [18]:
simple_conv_2d = SimpleConv2d(F=2, C=1, FH=3, FW=3, P=0, S=1, initializer=SimpleInitializerConv2d(),optimizer=SGD(lr=0.001),activation=ReLU())
simple_conv_2d.W = w
A = simple_conv_2d.forward(x,True)
print(A.shape)

(1, 2, 2, 2)


In [19]:
da = np.array([[[[ -4,  -4], [ 10,  11]],[[  1,  -7],[  1, -11]]]])
dZ = simple_conv_2d.backward(da,True)
print(dZ)

[[[[-5.  4.]
   [13. 27.]]]]


In [20]:
simple_conv_2d.output_shape2d(H=6,W=6,PH=0,PW=0,FH=3,FW=3,SH=1,SW=1)

(4, 4)

test 2

In [21]:
import keras
(X_train, y_train), (X_test, y_test) = keras.datasets.mnist.load_data()

X_train = X_train / 255.0
X_test = X_test / 255.0

X_train = X_train[:100, :] #because of computation
X_test = X_test[:100, :]
y_train = y_train[:100]
y_test = y_test[:100]

In [22]:
enc = OneHotEncoder(handle_unknown='ignore')
y_train_one_hot = enc.fit_transform(y_train[:, np.newaxis]).toarray()
y_test_one_hot = enc.transform(y_test[:, np.newaxis]).toarray()

In [31]:
NN = {
    0:FC(400, 120, HeInitializer(), AdaGrad(0.01), ReLU()),
    1:FC(120, 84, HeInitializer(), AdaGrad(0.01), ReLU()),
    2:FC(84, 10, SimpleInitializer(0.01), AdaGrad(0.01), Softmax()),
}
CNN = {
    0:SimpleConv2d(F=10, C=1, FH=3, FW=3, P=1, S=1,initializer=SimpleInitializerConv2d(),optimizer=AdaGrad(lr=0.001),activation=ReLU()),
    0:SimpleConv2d(F=10, C=1, FH=3, FW=3, P=1, S=1,initializer=SimpleInitializerConv2d(),optimizer=AdaGrad(lr=0.001),activation=ReLU()),
    #1:MaxPool2D(2),
}

In [39]:
cnn1 = Scratch2dCNNClassifier(NN=NN, CNN=CNN, n_epoch=5, n_batch=20,verbose=True)
cnn1.fit(X_train,y_train_one_hot)

ValueError: matmul: Input operand 1 has a mismatch in its core dimension 0, with gufunc signature (n?,k),(k,m?)->(n?,m?) (size 400 is different from 7840)