## [Problem 1] Creating a one-dimensional convolutional layer class that limits the number of channels to one

In [1]:
import numpy as np
import math
from keras.datasets import mnist
from sklearn.preprocessing import OneHotEncoder
from sklearn.metrics import accuracy_score
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import f1_score
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
%matplotlib inline
from sklearn.preprocessing import OneHotEncoder
from sklearn.model_selection import train_test_split

In [33]:
# Activation functions
class Sigmoid:
    def forward(self, A):
        self.A = A
        return self.sigmoid(A)
    def backward(self, dZ):
        _sig = self.sigmoid(self.A)
        return dZ * (1 - _sig)*_sig
    def sigmoid(self, X):
        return 1 / (1 + np.exp(-X))

class Tanh:
    def forward(self, A):
        self.A = A
        return np.tanh(A)
    def backward(self, dZ):
        return dZ * (1 - (np.tanh(self.A))**2)

class Softmax:
    def forward(self, X):
        self.Z = np.exp(X) / np.sum(np.exp(X), axis=1).reshape(-1,1)
        return self.Z
    def backward(self, Y):
        self.loss = self.loss_func(Y)
        return self.Z - Y
    def loss_func(self, Y, Z=None):
        if Z is None:
            Z = self.Z
        return (-1)*np.average(np.sum(Y*np.log(Z), axis=1))

class ReLU:
    def forward(self, A):
        self.A = A
        return np.clip(A, 0, None)
    def backward(self, dZ):
        return dZ * np.clip(np.sign(self.A), 0, None)

# FC = Neural network
class FC:
    def __init__(self, n_nodes1, n_nodes2, initializer, optimizer, dropout_rate=0.5):
        self.optimizer = optimizer
        self.W = initializer.W(n_nodes1, n_nodes2)
        self.B = initializer.B(n_nodes2)
        self.W_feedback = 0
        self.B_feedback = 0
        self.dZ = 0
        self.dA = 0
        self.dropout_rate = dropout_rate
        self.mask = None
        self.input_X_forward = 0

    def forward(self, X):
        self.input_X_forward = X
        A = np.dot(X, self.W) + self.B

        return A

    def backward(self, dA):

        dW = np.dot(self.input_X_forward.T, dA)
        dZ = np.dot(dA, self.W.T)
        self.dA = dA
        self.dW = dW
        self.dZ = dZ

        self.W_feedback = self.dW / self.dA.shape[0]
        self.B_feedback = np.average(self.dA, axis=0)

        self = self.optimizer.update(self)
        return dZ

    def dropout_forward(self, X, flag):
        if flag:
            self.mask = np.random.rand(*X.shape) > self.dropout_rate
            return X * self.mask
        else:
            return X * (1.0 - self.dropout_rate)

    def dropout_backward(self, X):
        return X * self.mask


# Defining a Weight Initialization Class
class XavierInitializer:
    def W(self, n_nodes1, n_nodes2):
        self.sigma = math.sqrt(1 / n_nodes1)
        W = self.sigma * np.random.randn(n_nodes1, n_nodes2)
        return W
    def B(self, n_nodes2):
        B = self.sigma * np.random.randn(n_nodes2)
        return B

class HeInitializer():
    def W(self, n_nodes1, n_nodes2):
        self.sigma = math.sqrt(2 / n_nodes1)
        W = self.sigma * np.random.randn(n_nodes1, n_nodes2)
        return W
    def B(self, n_nodes2):
        B = self.sigma * np.random.randn(n_nodes2)
        return B

class SimpleInitializer:
    def __init__(self, sigma):
        self.sigma = sigma
    def W(self, *shape):
        W = self.sigma * np.random.randn(*shape)
        return W
    def B(self, *shape):
        B = self.sigma * np.random.randn(*shape)
        return B

class SimpleInitializerConv1d:
    def __init__(self, sigma):
        self.sigma = sigma
    def W(self, *shape):
        W = self.sigma * np.random.randn(*shape)
        return W
    def B(self, *shape):
        B = self.sigma * np.random.randn(*shape)
        return B

# Defining Gradient Update Class
class SGD:
    def __init__(self, lr):
        self.lr = lr
    def update(self, layer):
        layer.B = layer.B - self.lr * layer.B_feedback
        layer.W = layer.W - self.lr * layer.W_feedback

        return layer

class AdaGrad:
    def __init__(self, lr):
        self.lr = lr
        self.HW = 1
        self.HB = 1
    def update(self, layer):
        self.HW += layer.dW**2
        self.HB += layer.dB**2
        layer.W -= self.lr * np.sqrt(1/self.HW) * layer.dW
        layer.B -= self.lr * np.sqrt(1/self.HB) * layer.dB

# Defining a mini-batch generation iterator
class GetMiniBatch:
    def __init__(self, X, y, batch_size = 20, seed=0):
        self.batch_size = batch_size
        np.random.seed(seed)
        shuffle_index = np.random.permutation(np.arange(X.shape[0]))
        self._X = X[shuffle_index]
        self._y = y[shuffle_index]
        self._stop = np.ceil(X.shape[0]/self.batch_size).astype(np.int64)
    def __len__(self):
        return self._stop
    def __getitem__(self,item):
        p0 = item*self.batch_size
        p1 = item*self.batch_size + self.batch_size
        return self._X[p0:p1], self._y[p0:p1]
    def __iter__(self):
        self._counter = 0
        return self
    def __next__(self):
        if self._counter >= self._stop:
            raise StopIteration()
        p0 = self._counter*self.batch_size
        p1 = self._counter*self.batch_size + self.batch_size
        self._counter += 1
        return self._X[p0:p1], self._y[p0:p1]

In [30]:
class SimpleConv1d():

    def __init__(self, n_input_hight, f_w, f_b, optimizer):
        DIM = 1

        self.optimizer = optimizer
        # 初期化
        # initializerのメソッドを使い、self.Wとself.Bを初期化する
        self.f_hight = len(f_w)
        self.n_input_hight = n_input_hight
        #self.n_input_width = n_input_width
        self.W = f_w[:, np.newaxis]
        self.B = f_b[:, np.newaxis]
        self.dZ = 0
        self.dA = 0
        self.dB = 0
        print("N_input:{} F_hight:{}".format(self.n_input_hight, self.f_hight))
        self.n_output_hight = self.n_input_hight - self.f_hight + 1
        #self.n_output_width = self.n_input_width - f_width +1
        self.input_X_forward = 0
        self.output_X_forward = np.zeros([self.n_output_hight, DIM])
        self.W_feedback = np.zeros([self.f_hight, DIM])
        self.B_feedback = 0
        self.Z_feedback = np.zeros([self.n_input_hight, DIM])

    def forward(self, X):

        self.input_X_forward = X
        for h in range(self.n_output_hight):
            h1 = h
            h2 = h + self.f_hight

            X_seg = X[h1:h2]
            self.output_X_forward[h] = np.dot(X_seg, self.W) + self.B

        return self.output_X_forward

    def backward(self, dA):

        dA = dA[:,np.newaxis]
        for i in range(self.f_hight):
            X_seg = self.input_X_forward[i : (i + self.n_output_hight)]
            X_seg = X_seg[:,np.newaxis]
            self.W_feedback[i] = np.dot(X_seg.T, dA)


        self.B_feedback = np.sum(dA, axis=0)

        dA_padding = np.zeros([self.f_hight-1, 1])
        dA = np.concatenate((dA, dA_padding), axis=0)
        dA = np.concatenate((dA_padding, dA), axis=0)
        for h in range(self.n_input_hight):
            h1 = h
            h2 = h + self.f_hight
            dA_seg = dA[h1:h2]

            dA_seg = np.fliplr(dA_seg.T).T
            self.Z_feedback[h] = np.dot(dA_seg.T, self.W)


        self = self.optimizer.update(self)
        return self.Z_feedback

## [Problem 2] Output size calculation after one-dimensional convolution

In [31]:
def output_size_calculation( n_in, filter_size, padding=0, stride=1):
  """
  Calculate output size after 1d convolution

  Parameters
  -----------------
  n_in: Input size
  F: filter size
  P: padding number
  S: stride number

  Return
  -----------------
  n_out: size of output
  """
  n_out = int((n_in + 2*padding - filter_size) / stride + 1)
  return n_out

a = output_size_calculation(4,3,0,1)
print("output:", a)

output: 2


## [Problem 3] Experiment of one-dimensional convolutional layer with small array

In [36]:
x = np.array([1,2,3,4])
w = np.array([3, 5, 7])
b = np.array([1])

#initializer = SimpleInitializer()
optimizer = SGD(0.001)

scv = SimpleConv1d(len(x), w, b, optimizer)

scv.forward(x)

delta_a = np.array([10, 20])

scv.backward(delta_a)

print("delta_b:",scv.B_feedback)
print("delta_w:",scv.W_feedback)
print("delta_x:",scv.Z_feedback)

delta_b = np.array([30])
delta_w = np.array([50, 80, 110])
delta_x = np.array([30, 110, 170, 140])

print(delta_b)
print(delta_w)
print(delta_x)

N_input:4 F_hight:3
delta_b: [30]
delta_w: [[ 50.]
 [ 80.]
 [110.]]
delta_x: [[ 30.]
 [110.]
 [170.]
 [140.]]
[30]
[ 50  80 110]
[ 30 110 170 140]


## [Problem 4] Creating a one-dimensional convolutional layer class that does not limit the number of channels

In [37]:
class Conv1d():

    def __init__(self, n_input_hight, f_w, f_b, initializer, optimizer):
        self.optimizer = optimizer

        self.n_input_hight = n_input_hight
        self.W = f_w    #(n_output, n_ch, f_size)
        self.B = f_b    #(1, n_ch, n_output)
        self.n_output = self.W.shape[0]
        self.n_input_ch = self.W.shape[1]
        self.f_hight = f_w.shape[2]
        self.n_output_hight = self.n_input_hight - self.f_hight + 1
        self.input_X_forward = 0
        self.output_X_forward = np.zeros((self.W.shape[0], self.n_output_hight))
        self.W_feedback = np.zeros_like(self.W)
        self.B_feedback = np.zeros_like(self.B)
        self.Z_feedback = 0

    def forward(self, X):

        self.input_X_forward = X
        batch_size = self.input_X_forward.shape[0]
        A = np.zeros((batch_size, self.n_output, self.n_input_ch, self.n_output_hight))
        B = self.B[0]
        B = B.T
        B = B[np.newaxis]
        X = X[:,np.newaxis]
        for h in range(self.n_output_hight):
            h1 = h
            h2 = h + self.f_hight
            X_seg = X[:,:,:,h1:h2]
            tmp = np.sum(X_seg * self.W, axis=3)
            tmp = tmp + B
            A[:,:,:,h] = tmp

        A = np.sum(A, axis=2)
        return A

    def backward(self, dA):

        batch_size = self.input_X_forward.shape[0]
        X = np.tile(self.input_X_forward, (dA.shape[1] ,1))
        dL = np.zeros((dA.shape[0], X.shape[1], dA.shape[2]))
        for i in range(self.n_output):
            o1 = i
            o2 = i + self.n_input_ch
            tmp = dA[:,i][:,np.newaxis,:]
            dL[:,o1:o2] = np.tile(tmp, (self.n_input_ch ,1))

        loop = self.n_input_hight - self.n_output_hight + 1
        dW_tmp = np.zeros((batch_size, self.n_output, loop))
        for i in range(loop):
            i1 = i
            i2 = i + self.n_output_hight
            dX_seg = X[:,:, i1:i2]
            dW_tmp[:,:,i] = np.sum(dL * dX_seg, axis=2)

        dW_tmp2 = np.average(dW_tmp, axis=0)
        for i in range(dW_tmp2.shape[0]):
            o1 = i
            o2 = i + self.n_input_ch
            self.W_feedback[i] = dW_tmp2[o1:o2]


        dB = np.sum(dA, axis=2)
        dB = np.average(dB, axis=0)
        for i in range(self.B.shape[1]):
            self.B_feedback[:,i] = dB

        self.Z_feedback = np.zeros_like(self.input_X_forward)
        for i in range(self.n_output):
            dA_padding = np.zeros([batch_size, 1, self.f_hight-1])
            dA_tmp = dA[:,i][:,np.newaxis,:]
            #print("dA_tmp.shape1:",dA_tmp.shape)
            dA_tmp = np.concatenate((dA_tmp, dA_padding), axis=2)
            dA_tmp = np.concatenate((dA_padding, dA_tmp), axis=2)
            #print("dA_tmp.shape2:",dA_tmp.shape)
            dA_tmp = np.tile(dA_tmp, (self.n_input_ch ,1))
            dZ_seg = np.zeros_like(self.Z_feedback)

            for h in range(self.n_input_hight):
                h1 = h
                h2 = h + self.f_hight
                dA_seg = dA_tmp[:,:,h1:h2]
                dA_seg = np.fliplr(dA_seg.T).T
                dZ_seg[:,:,h] = np.sum(dA_seg * self.W[i], axis=2)

            self.Z_feedback += dZ_seg

        self = self.optimizer.update(self)
        return self.Z_feedback

## [Problem 8] Learning and estimation

In [38]:
default_dnn_design = {
    'learning_rate':0.001,
    'total_layer':3,
    'func_layer1':'tanh',
    'func_layer2':'tanh',
    'func_layer3':'softmax',
    'node_layer0':786,
    'node_layer1':400,
    'node_layer2':200,
    'node_layer3':10,
    'initializer':'SimpleInitializer',
    'initializer_sigma':0.05,
    'optimizer':'SGD',
}

class ScratchCNNClassifier():


    def __init__(self, n_epoch, batch_size, verbose = False):
        self.verbose = verbose
        self.batch_size = batch_size
        self.n_epoch = n_epoch
        self.loss = 0
        self.loss_val = 0
        self.activation_func = 0
        self.affine_func = 0
        self.n_layer = 0
        self.layer_instance = [0 for _ in range(64)]


    def _crossentropy(self, y_pred, y):

        INF_AVOIDANCE = 1e-8
        cross_entropy = -1 * y * np.log(y_pred + INF_AVOIDANCE)
        return np.sum(cross_entropy, axis=1)

    def add_layer(self, model):
        self.layer_instance[self.n_layer] = model
        self.n_layer += 1
        return

    def delet_all_layer(self):
        self.layer_instance[0:self.n_layer] = 0
        self.n_layer = 0

        return

    def fit(self, X, y, X_val=None, y_val=None):
        self.loss = [[0 for i in range(X.shape[0])] for j in range(self.n_epoch)]
        self.loss_val = [[0 for i in range(X.shape[0])] for j in range(self.n_epoch)]

        i = 0
        get_mini_batch = GetMiniBatch(x_train, y_train, self.batch_size)
        for epoch in range(self.n_epoch):
            loop_count = 0
            sum_loss = 0
            for mini_X_train, mini_y_train in get_mini_batch:
                X = mini_X_train
                #Forwardの計算
                for layer in range(self.n_layer):
                    X = self.layer_instance[layer].forward(X)


                sum_loss += self._crossentropy(X, mini_y_train)


                dz = mini_y_train
                for layer in reversed(range(0, self.n_layer)):
                    dz = self.layer_instance[layer].backward(dz)

                loop_count += 1

            self.loss[i] = sum_loss / loop_count
            if X_val is not None and y_val is not None:
                y_val_pred = self._predict(X_val)
                self.loss_val[i] = self._crossentropy(y_val_pred, y_val)

            if self.verbose:
                print("Epoch:{} Loss:{} Loss(val):{}".format(i, self.loss[i], self.loss_val[i]))

            i +=1

        return

    def predict(self, X):

        for layer in range(self.n_layer):
            X = self.layer_instance[layer].forward(X)

        max_val = np.max(X, axis=1)
        mask = np.ones_like(X)
        X[X == max_val[:,np.newaxis]] = 1
        X[X != mask] = 0

        return X

    def _predict(self, X):
        #Forwardの計算
        for layer in range(self.n_layer):
            X = self.layer_instance[layer].forward(X)

        return X

In [43]:
class Flatten():

    def __init__(self):
        self.input_X_shape = 0

    def forward(self, X):
        """
        X.shape (batch_size, n_input, n_feature1)

        return (batch_size, n_input * n_feature)
        """
        self.inout_X_shape = X.shape
        output = X.reshape([self.inout_X_shape[0], self.inout_X_shape[1] * self.inout_X_shape[2]])
        return output

    def backward(self, X):
        output = X.reshape(self.inout_X_shape)
        return output

In [42]:
(x_train, y_train), (x_test, y_test) = mnist.load_data()

x_train = x_train.reshape(-1, 784)
x_test = x_test.reshape(-1, 784)


x_train = x_train.astype(np.float32)
x_test = x_test.astype(np.float32)
x_train /= 255
x_test /= 255

enc = OneHotEncoder(handle_unknown='ignore', sparse_output=False)
y_train_one_hot = enc.fit_transform(y_train[:, np.newaxis])
y_test_one_hot = enc.transform(y_test[:, np.newaxis])


x_train = x_train[:,np.newaxis,:]
x_test = x_test[:,np.newaxis,:]

x_train, x_val, y_train, y_val = train_test_split(x_train, y_train_one_hot, test_size=0.95)
print(x_train.shape)
print(x_val.shape)

(3000, 1, 784)
(57000, 1, 784)


In [44]:
CNN = ScratchCNNClassifier(1, 1)

In [45]:
f_w = np.ones((3,1,28))
f_b = np.array([[[1,1,1]]])

optimizer = SGD(0.01)
initializer = XavierInitializer()

In [47]:
CNN.add_layer(Conv1d(x_train.shape[2], f_w, f_b, initializer, optimizer))
CNN.add_layer(Flatten())
CNN.add_layer(FC(f_w.shape[0] * (x_train.shape[2] - f_w.shape[2] + 1), 100, initializer, optimizer))
CNN.add_layer(Sigmoid())
CNN.add_layer(FC(100, 10, initializer, optimizer))
CNN.add_layer(Softmax())

In [48]:
CNN.fit(x_train, y_train, x_val, y_val)

In [50]:
y_pred = CNN.predict(x_val)

In [51]:
print("Pred=\n", y_pred)
print("Yval=\n", y_val)

Pred=
 [[0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 1.]
 [0. 0. 0. ... 0. 0. 1.]
 ...
 [0. 0. 0. ... 0. 0. 1.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]]
Yval=
 [[0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 ...
 [0. 0. 0. ... 0. 0. 1.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]]


In [52]:
print("Accuracy score={:.3f}".format(accuracy_score(y_pred, y_val)))

Accuracy score=0.297
