# Import Libraries

In [1]:
import numpy as np
import time

np.random.seed(97)

# 1. Layer

In [111]:
class Layer:
    
    def __init__(self, INPUT_DIM=(1,2,1), kernel=(1,2,1,1), stride=(1,1), OUTPUT_DIM=(1,1,1)):
        self.INPUT_DIM = INPUT_DIM                                                                  # INPUT_DIM  == (row_i, column_i, channel_i)
        self.OUTPUT_DIM = OUTPUT_DIM                                                                # OUTPUT_DIM == (row_o, column_o, channel_o)
        self.KERNEL_DIM = kernel                                                                    # kernel     == (row_k, column_k, channel_i, channel_o)
        self.stride = stride                                                                        # stride     == (row_s, column_s)    

        self.w = np.zeros(self.KERNEL_DIM, dtype=np.float64)                                        # w_dim      == (row_k, column_k, channel_i) * channel_o
        self.b = np.zeros(self.OUTPUT_DIM[-1], dtype=np.float64)                                    # b_dim      == 1 * channel_o

    def forward(self, X):
        MIN_MARGIN = 2 ** -53
                    
        for row_idx in range(self.OUTPUT_DIM[0]):
            row_s = row_idx * self.stride[0]
            row_e = row_s + self.KERNEL_DIM[0]                                              
            for column_idx in range(self.OUTPUT_DIM[1]):
                column_s = column_idx * self.stride[1]
                column_e = column_s + self.KERNEL_DIM[1]                                
                for channel_o_idx in range(self.OUTPUT_DIM[2]): 

                    tmp_z = self.w[:,:,:,channel_o_idx] * X[:, row_s:row_e, column_s:column_e, :]   #       (row_k, column_k, channel_i, 1)
                                                                                                    # * (-1, row_k, column_k, channel_i)
                                                                                                    # = (-1, row_k, column_k, channel_i)
                    tmp_z = np.sum(tmp_z, axis=(1,2,3)) + self.b[channel_o_idx]                     # (-1, 1)
                    
                    if row_idx == 0 and column_idx == 0:
                        self.z = tmp_z
                    else:
                        self.z = np.concatenate((self.z, tmp_z), axis=1)
                                                                                                    # (-1, row_o * column_o * channel_o)
        self.z = self.z.reshape((-1,)+self.OUTPUT_DIM)                                              # (-1, row_o , column_o , channel_o)

        
        self.a = 1 / (1 + np.exp(-self.z))                                                          # (-1, row_o , column_o , channel_o)
        self.a = np.maximum(np.minimum(1 - MIN_MARGIN, self.a), MIN_MARGIN)                         # (-1, row_o , column_o , channel_o)
        return self.a                                                                               # (-1, row_o , column_o , channel_o)


    def backward(self, X, dx_next, learning_rate):
        # x_next == a_now
        # dx_next == da_now
        # dx_next == d_loss / dx_next == d_loss / da_now
        # == da
        da = dx_next                                                                                # (-1, row_o , column_o , channel_o)
        dz = self.a * (1 - self.a) * da                                                             # (-1, row_o , column_o , channel_o)
        dx = np.zeros(X.shape, dtype=np.float64)                                                    # (-1, row_i , column_i , channel_i)
        
        # check each "filters" in total kernel!
        for channel_o_idx in range(self.OUTPUT_DIM[2]):
            for row_idx in range(self.OUTPUT_DIM[0]):
                row_s = row_idx * self.stride[0]
                row_e = row_s + self.KERNEL_DIM[0]                                              
                for column_idx in range(self.OUTPUT_DIM[1]):
                    column_s = column_idx * self.stride[1]
                    column_e = column_s + self.KERNEL_DIM[1]                                
                    
                    # set dw
                    X_selected = X[:, row_s:row_e, column_s:column_e, :]                            # (-1, row_k, column_k, channel_i)
                    dz_selected = dz[:, row_idx, column_idx, channel_o_idx].reshape(-1,1,1,1)       # (-1,     1,        1,         1)
                    
                    if row_idx == 0 and column_idx == 0:
                        dw = X_selected * dz_selected                                               #   (-1, row_k, column_k, channel_i) 
                    else:                                                                           # * (-1,     1,        1,         1)
                        dw += X_selected * dz_selected                                              # = (-1, row_k, column_k, channel_i) 
                    
                    # set dx
                    w_selected = self.w[:,:,:,channel_o_idx]                                        #     (row_k, column_k, channel_i, 1)
                    dx[:, row_s:row_e, column_s:column_e, :] += dz_selected * w_selected            # (-1, row_k, column_k, channel_i, 1)
            
            dw = np.mean(dw, axis=0)                                                                # (1, row_k, column_k, channel_i)
            db = np.mean(np.sum(dz[:,:,:,channel_o_idx], axis=(1,2)), axis=0)                       # (1, 1)
            
            # update weights of current "filter" in total kernel
            # current "filter" == channel_th filter of kernel
            self.w[:,:,:,channel_o_idx] -= learning_rate * dw
            self.b[channel_o_idx] -= learning_rate * db

        dx = dz * self.w                                                                            # (-1,1,1,1)*(1,2,1) -> (-1,1,2,1)
        dx = dx.reshape((-1,)+self.INPUT_DIM)                                                       # (-1, row_i, column_i, channel_i)
        
        return dx                                                                                   # (-1, row_i, column_i, channel_i)     

# 2. Interface of neural network

In [113]:
class Network:
    
    def __init__(self, layers):
        self.layers = layers
        self.inputs = ['tmp' for i in layers]
    
    def forward(self, X, option="NULL"):
        input_mat = X
        for idx, layer in enumerate(self.layers):
            self.inputs[idx]=input_mat
            input_mat = layer.forward(input_mat)
            if option=='print': print(input_mat.shape)
        return input_mat
        
    def backward(self, dx_next, learning_rate):
        # dx_next == da_now (x->z->a)
        for layer, input_mat in zip(reversed(self.layers), reversed(self.inputs)):
            dx_next=layer.backward(input_mat, dx_next, learning_rate)
            
    def train(self, X, y, learning_rate, option='NULL'):
        pred_y = self.forward(X, option)
        dy_of_dx = -y / pred_y + (1 - y) / (1 - pred_y)
        self.backward(dy_of_dx, learning_rate)
        
    def predict(self, X):
        return np.round(self.forward(X))
    
    def loss(self, X, y):
        pred_y = self.forward(X)
        return -np.mean(y * np.log(pred_y) + (1 - y) * np.log(1 - pred_y))


# 3. Train Model

In [114]:
def create_samples(sample_num):
    X = np.random.uniform(-10, 10, (sample_num, 2))
    y = (np.sum(X, 1) >0).astype(float)
    return X, y


In [115]:
def create_samples_2(sample_num):
    X = np.random.uniform(-10, 10, (sample_num, 2,2,2))
    y = (np.sum(X, 1) >0).astype(float)
    return X, y


In [116]:
def run_practice_2(train_samples, test_samples, learning_rate, epochs, option='NULL'):
    
    # model = Model([BinaryClassifier((1,2))])
    train_X, train_y = create_samples(train_samples)
    test_X, test_y = create_samples(test_samples)
    print('shape of samples(before): ', train_X.shape, train_y.shape)

    train_X, train_y = train_X.reshape(-1,1,2,1), train_y.reshape(-1,1,1,1)
    test_X, test_y = test_X.reshape(-1,1,2,1), test_y.reshape(-1,1,1,1)
    print('shape of samples(after) : ', train_X.shape, train_y.shape)
 
    # make model
    model = Network(
        layers = [
            Layer(INPUT_DIM=(1,2,1), kernel=(1,2,1,1), stride=(1,1), OUTPUT_DIM=(1,1,1)),
        ] 
    )
    
    for epoch in range(epochs):
        model.train(train_X, train_y, learning_rate, option)
        if option == 'print':
            print('\nepoch #' + str(epoch+1))
            print('w :' + str(model.layers[0].w.reshape((1,2))))
            print('b : ' + str(model.layers[0].b))
            
    return {'w': model.layers[0].w.reshape((1,2)), 
            'b': model.layers[0].b,
            'train_loss': model.loss(train_X, train_y),
            'test_loss': model.loss(test_X, test_y),
            'train_acc': 100 * np.mean(model.predict(train_X).astype(bool) == train_y.astype(bool)),
            'test_acc': 100 * np.mean(model.predict(test_X).astype(bool) == test_y.astype(bool))}

# 4. Run Tasks

In [117]:
train_samples = 100 # m
test_samples = 100 # n
learning_rate = 1e-2
epochs = 100 # K

In [118]:
run_practice_2(train_samples, test_samples, learning_rate, epochs, option='NULL')

shape of samples(before):  (100, 2) (100,)
shape of samples(after) :  (100, 1, 2, 1) (100, 1, 1, 1)


{'w': array([[0.39703262, 0.42310451]]),
 'b': array([0.03871357]),
 'train_loss': 0.18211930090828615,
 'test_loss': 0.15344904172458768,
 'train_acc': 99.0,
 'test_acc': 96.0}