In [1]:
import numpy as np
import random
from IPython.core.debugger import set_trace

def compute_y_shape(x_shape, stride, kernel, padding):
    x_h, x_w = x_shape
    s_h, s_w = stride
    k_h, k_w = kernel
    
    if padding == "valid":
        y_h = int(np.ceil((x_h - k_h + 1) / s_h))
        y_w = int(np.ceil((x_w - k_w + 1) / s_w))
    else: # "same"
        y_h = int(np.ceil(x_h / s_h))
        y_w = int(np.ceil(x_w / s_w))

    y_shape = (y_h, y_w)
    return y_shape

class Conv2D:
                     
    def __init__(self, n_in, n_out, kernel, 
                 padding="same", stride=(1,1), W=None, b=None):
        
        self.n_in = n_in
        self.n_out = n_out
        assert kernel != None and len(kernel) == 2
        self.kernel = kernel

        assert padding == "valid" or padding == "same"
        self.padding = padding 
        self.pads = None
        self.stride = stride

        if W is None:
            k_h, k_w = kernel
            size = n_out * n_in * k_h * k_w 
            weights = np.random.uniform(size=size).reshape(n_out, n_in, k_h, k_w)
            scale = np.sqrt(2./size)
            W = weights * scale       
        if b is None:
            b = np.zeros(n_out)
       
        self.W = W
        self.b = b
        self.X = None
        self.Y = None
        self.X_shape = None
        self.Y_shape = None
        
        self.g_W = None
        self.g_b = None
        self.g_X = None
        self.g_Y = None
        
        return
   
    def _compute_shapes_and_pads(self):
        X_shape = self.X.shape
        x_h = X_shape[1]
        x_w = X_shape[2]

        y_h, y_w = compute_y_shape((x_h, x_w), self.stride, self.kernel, self.padding)        
        k_h, k_w = self.kernel
        s_h, s_w = self.stride

        if self.padding == "valid":
            zeros_h, zeros_w = (0,0)
        else: # "same"
            # (y-1): index of last number of y, (y-1)*s: mapped to index in x
            # (y-1)*s+k: count of needed elements of x, like (y-1)*s+1 + k-1 
            # (y-1)*s+k-x: count of extra elements from x, possibly negative when x is enough to cover
            zeros_h = max((y_h - 1) * s_h + k_h - x_h, 0)
            zeros_w = max((y_w - 1) * s_w + k_w - x_w, 0)

        pad0 = zeros_h // 2
        pad1 = zeros_h - pad0
        pad2 = zeros_w // 2
        pad3 = zeros_w - pad2
        self.pads = (pad0, pad1, pad2, pad3)
        
        self.Y_shape = (self.n_out, y_h, y_w)
        self.X_shape = (self.n_in, x_h+zeros_h, x_w+zeros_w) 
        
        # padding X
        if (np.array(self.pads) == 0).all():
            return self.X

        new_h = x_h + zeros_h
        new_w = x_w + zeros_w

        padded_X = np.zeros(shape=(self.n_in, new_h, new_w))
        padded_X[:, pad0:-pad1, pad2:-pad3] = self.X
        
        return padded_X
        
    def forward(self, X):
    
        assert X.ndim == 3
        assert (np.array(X[0].shape) > np.array(self.kernel)).all()

        self.X = X
        if self.X_shape is None:
            self.X = self._compute_shapes_and_pads() 
        else:
            assert X.shape == self.X_shape

        Y = np.zeros(shape=self.Y_shape)
                
        y_h = self.Y_shape[1]
        y_w = self.Y_shape[2]
        k_h, k_w = self.kernel
        s_h, s_w = self.stride

        for i_Y in range(self.n_out):
            for i_X in range(self.n_in):
                k = self.W[i_Y,i_X]
                x = self.X[i_X]
                for i in range(y_h):
                    for j in range(y_w):
                        r = i * s_h
                        c = j * s_w
                        Y[i_Y, i, j] += (x[r:r+k_h, c:c+k_w] * k).sum()
             
            Y[i_Y] += self.b[i_Y]
        
        self.Y = Y
        return Y
    
    def backward(self, g_Y):
        
        assert g_Y.shape == self.Y_shape
        n_out, y_h, y_w = self.Y_shape

        s_h, s_w = self.stride
        k_h, k_w = self.kernel

        self.g_Y = g_Y
        g_W = np.zeros(shape=(n_out, self.n_in, k_h, k_w))
        g_b = np.zeros(n_out)
        

        for i_Y in range(self.n_out):
            g_y = self.g_Y[i_Y]
            for i_X in range(self.n_in):
                x = self.X[i_X]
                for i_kh in range(k_h):
                    for i_kw in range(k_w):
                        hs = i_kh; he = hs + y_h * s_h
                        ws = i_kw; we = ws + y_w * s_w
                        g_W[i_Y,i_X,i_kh,i_kw] = (g_y * x[hs:he:s_h, ws:we:s_w]).sum()
        
            g_b[i_Y] = g_y.sum()

        g_X = np.zeros(shape=self.X_shape)

        for i_yh in range(y_h):
            i_xh = i_yh * s_h
            for i_yw in range(y_w):
                i_xw = i_yw * s_w
                for i_Y in range(self.n_out):
                    g_X[:, i_xh:i_xh+k_h, i_xw:i_xw+k_w] += self.W[i_Y] * g_Y[i_Y, i_yh, i_yw]  
            
        self.g_Y = g_Y
        self.g_W = g_W
        self.g_b = g_b
        self.g_X = g_X
        
        return g_X
    
    def update(self, g_Y, learning=0.1):
        
        self.W -= learning * self.g_W
        self.b -= learning * self.g_b
        
            
    def __str__(self):
        
        s = "\nX is:" + ("" if self.X is None else str(self.X.shape))
        s += "\n" + str(self.X)
        s += "\npadding is: " + str(self.pads)
        s += "\nY is:" + ("" if self.Y is None else str(self.Y.shape))
        s += "\n" + str(self.Y)
        s += "\nkernel is: " + str(self.W.shape)
        s += "\n" + str(self.W)
        s += "\nbias is: " + str(self.b.shape)
        s += "\n" + str(self.b)
        s += "\nstride is:\n" + str(self.stride)
        
        s += "\ng_Y is:" + ("" if self.g_Y is None else str(self.g_Y.shape))
        s += "\n" + str(self.g_Y)
        s += "\ng_W is:" + ("" if self.g_W is None else str(self.g_W.shape))
        s += "\n" + str(self.g_W)
        s += "\ng_b is:" + ("" if self.g_b is None else str(self.g_b.shape))
        s += "\n" + str(self.g_b)
        s += "\ng_X is:" + ("" if self.g_X is None else str(self.g_X.shape))
        s += "\n" + str(self.g_X)
        return s

# when we set all the kernels, biases, g_Y as ones, the resulted g_X shows 
# how many times one input x element has been used in forwarding

In [2]:
class Pooling:
    def __init__(self, pool="MAX"):
        return
        
    def forward(self, X):
        return X
    
    def backward(self, X):
        return X

In [3]:
class ConvolutionLayer:
    def __init__(self, conv, acti=None, pool=None): 
        if conv is None: raise ValueError("Convolution layer is not assigned.")
        self.conv = conv

        if acti is not None:
            self.acti = Activation(acti)
            self.A = None
        else:
            self.acti = None

        
        self.pool = pool

        self.X = None
        self.Y = None
        self.Y_shape = None
        return
    
    def forward(self, X):
        self.X = X
        self.Y = self.conv.forward(X)
        
        if self.acti is not None:
            self.A = self.acti.func(self.Y)
            self.Y = self.A
        if self.pool is not None:
            self.Y = self.pool.forward(self.Y)
        
        if self.Y_shape is None:
            self.Y_shape = self.Y.shape
        else:
            assert self.Y_shape == self.Y.shape
        
        return self.Y
    
    def backward(self, g_Y):
        self.g_Y = g_Y
    
        if self.pool is not None:
            g_Y = self.pool.backward(g_Y)
        if self.acti is not None:
            g_acti = self.acti.grad(self.A)
            g_Y = g_acti * g_Y
            
        g_Y = self.conv.backward(g_Y)
        return g_Y
        
    def update(self, learning):
        self.conv.update(learning)
        return 
    
    def OutputToFC(self):
        return self.Y.reshape(-1, 1)

    def InputFromFC(self, g_Y):
        return g_Y.reshape(self.Y_shape)
    
    def __str__(self):
        pass

In [4]:
class CNN:
    def __init__(self, learning=0.001):
        
        self.type = "CNN"
        self.learning = learning
        
        self.convlayers = []

        n_in = 1; n_out = 5; kernel = (5,5); stride = (2,2); padding="valid"
        c = Conv2D(n_in, n_out, kernel=kernel, padding=padding, stride=stride)
        convlayer1 = ConvolutionLayer(c, acti="RELU")
        self.convlayers.append(convlayer1)

        x_shape = (28, 28)
        y_h, y_w = compute_y_shape(x_shape, stride, kernel, padding)  
        
        # this layer can be skipped
        # -------------
        # n_in = n_out; n_out = 50; kernel = (5,5); stride = (2,2); padding="valid"
        # c = Conv2D(n_in, n_out, kernel=kernel, padding=padding, stride=stride)
        # convlayer2 = ConvolutionLayer(c, acti="RELU")
        # self.convlayers.append(convlayer2)
        # x_shape = (y_h, y_w)
        # y_h, y_w = compute_y_shape(x_shape, stride, kernel, padding)  
        # -------------

        n_in = n_out * y_h * y_w; n_out = 100
        f = PercepLayer(n_in, n_out, acti="RELU")
        self.perceplayer = f
        
        n_in = 100; n_out = 10
        f = SoftMaxLayer(n_in, n_out)
        self.outlayer = f
        return
    
    def forward(self, X):
        for convlayer in self.convlayers:
            X = convlayer.forward(X)
            
        X = convlayer.OutputToFC()
        X = self.perceplayer.forward(X)
        return self.outlayer.forward(X)
    
    def backward(self, label):
        g_Y = self.outlayer.backward(label)
        g_Y = self.perceplayer.backward(g_Y)
        g_Y = self.convlayers[-1].InputFromFC(g_Y)
        
        for convlayer in self.convlayers[-1::-1]:
            g_Y = convlayer.backward(g_Y)
        return
    
    def update(self, learning):
        self.outlayer.update(learning)
        self.perceplayer.update(learning)
        for convlayer in self.convlayers:
            convlayer.update(learning)

        return
    
    def train_1sample(self, X, label):
        self.forward(X)
        self.backward(label)
        self.update(self.learning)
        return
 
    def predict_1sample(self, X):
        predict = self.forward(X)
        return predict


In [5]:
%%capture
%run 'multilayer-perceptron.ipynb'

def is_main_module():
    return __name__ == '__main__' and '__file__' not in globals()

In [7]:
def run_cnn_test1():
# when we set all the kernels, biases, g_Y as ones, the resulted g_X shows 
# how many times one input x element has been used in forwarding
    n_out = 2; n_in = 2; kernel = (2,3); stride = (1,3); padding="same"
    W = np.ones(shape=(n_out, n_in, kernel[0], kernel[1]))
    bias = np.ones(n_out)

    c = Conv2D(n_out, n_in, kernel=kernel, padding=padding, stride=stride, W=W, b=bias)
    
    l = ConvolutionLayer(c)
    
    a = np.ones(15).reshape(3,5)
    b = np.array([a,a])
    #print(b)

    l.forward(b)
    #print(c)

    g = np.ones(shape=c.Y_shape)
    #print(g)
    l.backward(g)
    #print(c)

    result_g_W = np.array(
        [[[[ 6.,  6.,  3.],
           [ 4.,  4.,  2.]],

          [[ 6.,  6.,  3.],
           [ 4.,  4.,  2.]]],

         [[[ 6.,  6.,  3.],
           [ 4.,  4.,  2.]],

          [[ 6.,  6.,  3.],
           [ 4.,  4.,  2.]]]])
        
    result_g_b = np.array([ 6.,  6.])
    
    result_g_X = np.array(
        [[[ 2.,  2.,  2.,  2.,  2.,  2.],
          [ 4.,  4.,  4.,  4.,  4.,  4.],
          [ 4.,  4.,  4.,  4.,  4.,  4.],
          [ 2.,  2.,  2.,  2.,  2.,  2.]],

         [[ 2.,  2.,  2.,  2.,  2.,  2.],
          [ 4.,  4.,  4.,  4.,  4.,  4.],
          [ 4.,  4.,  4.,  4.,  4.,  4.],
          [ 2.,  2.,  2.,  2.,  2.,  2.]]])
        
    assert (c.g_W == result_g_W).all()
    assert (c.g_b == result_g_b).all()
    assert (c.g_X == result_g_X).all()
    
    
def run_cnn_test2():
    n_in = 3; n_out = 2; kernel = (3,3); stride = (2,2); padding="valid"
    
    a = np.array([
        [[0, 0, 0, 0, 0, 0, 0],
        [0, 2, 2, 0, 2, 0, 0],
        [0, 2, 0, 2, 0, 0, 0],
        [0, 1, 0, 2, 2, 0, 0],
        [0, 2, 2, 0, 0, 2, 0],
        [0, 2, 1, 2, 2, 1, 0],
        [0, 0, 0, 0, 0, 0, 0]],
        
        [[0, 0, 0, 0, 0, 0, 0],
        [0, 0, 2, 2, 1, 0, 0],
        [0, 0, 1, 1, 2, 1, 0],
        [0, 2, 2, 1, 0, 0, 0],
        [0, 1, 0, 0, 0, 0, 0],
        [0, 2, 2, 2, 2, 0, 0],
        [0, 0, 0, 0, 0, 0, 0]],
        
        [[0, 0, 0, 0, 0, 0, 0],
        [0, 2, 0, 2, 1, 0, 0],
        [0, 0, 2, 0, 0, 0, 0],
        [0, 0, 1, 0, 1, 0, 0],
        [0, 0, 1, 1, 0, 2, 0],
        [0, 1, 2, 0, 2, 0, 0],
        [0, 0, 0, 0, 0, 0, 0]]
        ])

    W = np.array([
        [
            [[0,  1, 0],
            [-1, 0, 0],
            [1, 0,  1]],
        
            [[1, -1, 1],
            [0, 0, 0],
            [-1, -1, 1]],
        
            [[1, 0, 0],
            [1, -1, 0],
            [-1, 0, 1]]
        ],
        
        [
            [[-1, 0, 1],
            [-1, 1, 0],
            [0, 0, -1]],

            [[0, 1, -1],
            [0, 0, 0],
            [0, -1, 0]],

            [[0, -1, -1],
            [-1, 1, -1],
            [1, -1, -1]],
        ]
        ])
    
    bias = np.array([1,0])
    
    g_Y = np.array([
         [[ 1., -5., -3.],
          [ 6.,  9.,  1.],
          [ 1.,  3.,  3.]],

         [[ 1.,  0., -4.],
          [-7., -1., -4.],
          [ 3., -6., -5.]]
        ])

    result_Y = np.array([
         [[ 2., -5., -3.],
          [ 6.,  9.,  1.],
          [ 1.,  3.,  3.]],

         [[ 2.,  0., -4.],
          [-7., -1., -4.],
          [ 3., -6., -5.]]
        ])
    
    result_g_X = np.array(
    [
        [[ -1.0, 1.0, 1.0, -5.0, 4.0, -3.0, -4.0],
        [ -2.0, 1.0, 5.0, 0.0, 7.0, -4.0, 0.0],
        [8.0, 6.0, -11.0, 9.0, -5.0, 1.0, -3.0],
        [1.0, -7.0, -8.0, -1.0, 3.0, -4.0, 0.0],
        [3.0, 1.0, 31.0, 3.0, 10.0, 3.0, 0.0],
        [ -4.0, 3.0, 3.0, -6.0, 2.0, -5.0, 0.0],
        [1.0, 0.0, 1.0, 0.0, 12.0, 0.0, 8.0]],

        [[1.0, 0.0, -5.0, 5.0, -8.0, -1.0, 1.0],
        [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
        [5.0, -15.0, 28.0, -5.0, 9.0, 2.0, 2.0],
        [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
        [ -5.0, 3.0, -2.0, -17.0, 20.0, -5.0, 9.0],
        [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
        [ -1.0, -4.0, -2.0, 3.0, 0.0, 2.0, 3.0]],
        
        [[1.0, -1.0, -6.0, 0.0, -3.0, 4.0, 4.0],
        [0.0, 0.0, -6.0, 5.0, 1.0, -1.0, 4.0],
        [6.0, 6.0, 21.0, 1.0, -4.0, 8.0, 5.0],
        [ 13.0, -13.0, 17.0, -10.0, 6.0, -5.0, 4.0],
        [-12.0, 4.0, 3.0, 7.0, 14.0, 9.0, 10.0],
        [ -2.0, 2.0, 6.0, -9.0, 14.0, -8.0, 5.0],
        [2.0, -3.0, -11.0, 6.0, 1.0, 5.0, 8.0]],
    ])

    result_g_W = np.array([
        [
            [[ 6.0, 38.0, 2.0],
            [ -5.0, 37.0, 17.0],
            [ 18.0, 6.0, 12.0]],
            
            [[ 11.0, 11.0, 24.0],
            [ 17.0, 19.0, 17.0],
            [-11.0, -2.0, -9.0]],
            
            [[ 21.0, 9.0, 13.0],
            [ 19.0, -7.0, 18.0],
            [ -1.0, 11.0, 8.0]]
        ],
        [
            [[-12.0, -20.0, 6.0],
            [-32.0, -18.0, -9.0],
            [ -2.0, -20.0, -14.0]],
            
            [[ -9.0, -2.0, -9.0],
            [-28.0, -21.0, -18.0],
            [ -8.0, -11.0, 1.0]],
            
            [[ -8.0, -16.0, -11.0],
            [-31.0, 5.0, -14.0],
            [ -1.0, -9.0, -5.0]]
        ]
        ])

    result_g_b = np.array([16.0, -23.0])
    
    c = Conv2D(n_in, n_out, kernel=kernel, padding=padding, stride=stride, W=W, b=bias)
    
    cl = ConvolutionLayer(c)
  
    cl.forward(a)
   
    cl.backward(g_Y)

    assert (c.Y == result_Y).all()
    assert (c.g_W == result_g_W).all()
    assert (c.g_b == result_g_b).all()
    assert (c.g_X == result_g_X).all()
    
    #print(c)
    
if is_main_module():
    run_cnn_test1()
    run_cnn_test2()

In [6]:
def run_cnn_mnist():
    #set_trace()
    cnn = CNN()
    mnist = MNIST(cnn)
    for i in range(5):
        mnist.train(-1)
        accuracy = mnist.test(-1)
        print("\nEpoch {} accuracy: {}".format(i, accuracy))

if is_main_module():
    np.seterr(all='raise')
    run_cnn_mnist()
            


Epoch 0 accuracy: 0.9578

Epoch 1 accuracy: 0.9657

Epoch 2 accuracy: 0.9661

Epoch 3 accuracy: 0.9697

Epoch 4 accuracy: 0.9726
