In [1]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
from collections import OrderedDict

In [5]:
np.random.seed(42)

mnist = tf.keras.datasets.mnist
(X_train, Y_train), (X_test, Y_test) = mnist.load_data()
num_classes = 10
print(X_train.shape)

(60000, 28, 28)


In [12]:
x_train = X_train[:10000].copy().reshape(-1, 28*28).astype(np.float32) /.255
x_test = X_test[:3000].copy().reshape(-1, 28*28).astype(np.float32) /.255
y_train = np.eye(num_classes)[Y_train[:10000].copy()]
y_test = Y_test[:3000].copy()
print(x_train.shape, x_test.shape, y_train.shape, y_test.shape)

(10000, 784) (3000, 784) (10000, 10) (3000,)


In [13]:
epochs = 1000
learning_rate = 1e-2
batch_size = 256
train_size = x_train.shape[0]
iter_per_epoch = max(train_size / batch_size, 1)

In [14]:
def softmax(x):
    if x.ndim == 2:
        x = x.T
        x = x - np.max(x, axis=0)
        y = np.exp(x) / np.sum(np.exp(x), axis=0)
        return y.T
    
    x = x - np.max(x)
    return np.exp(x) / np.sum(np.exp(x))

def mean_squared_error(y, t):
    return 0.5 * np.sum((y-t)**2)

def cross_entropy_error(pred_y, true_y):
    if pred_y.ndim == 1:
        true_y = true_y.reshape(1, -1)
        pred_y = pred_y.reshape(1, -1)
        
    if true_y.size == pred_y.size:
        true_y = true_y.argmax(axis=1)
        
    batch_size = pred_y.shape[0]
    return -np.sum(np.log(pred_y[np.arange(batch_size), true_y] + 1e-7)) / batch_size

In [15]:
class ReLU:
    def __init__(self):
        self.mask = None
        
    def forward(self, input_data):
        self.mask = (input_data <= 0)
        out = input_data.copy()
        out[self.mask] = 0
        
        return out
    
    def backward(self, dout):
        dx = dout.copy()
        dx[self.mask] = 0
        return dx

In [16]:
class Sigmoid:
    def __init__(self):
        self.out = None
        
    def forward(self, input_data):
        out = 1 / (1 + np.exp(-input_data))
        self.out = out
        return out
    
    def backward(self, dout):
        dx = dout * (1.0 - self.out) * self.dout
        return dx

In [17]:
class Layer:
    def __init__(self, W, b):
        self.W = W
        self.b = b
        
        self.input_data = None
        self.input_data_shape = None
        
        self.dW = None
        self.db = None
        
    def forward(self, input_data):
        self.input_data_shape = input_data.shape
        self.input_data = input_data.reshape(input_data.shape[0], -1)
        out = np.dot(self.input_data, self.W) + self.b
        return out
    
    def backward(self, dout):
        dx = np.dot(dout, self.W.T)
        self.dW = np.dot(self.input_data.T, dout)
        self.db = np.sum(dout, axis=0)
        dx = dx.reshape(*self.input_data_shape)
        return dx

In [18]:
class BatchNormalization:
    def __init__(self, gamma, beta, momentum=0.9, running_mean=None, running_var=None):
        self.gamma = gamma
        self.beta = beta
        self.momentum = momentum
        self.input_shape = None
        self.running_mean = running_mean
        self.running_var = running_var
        self.batch_size = None
        self.xc = None
        self.std = None
        self.dgamma = None
        self.dbeta = None
        
    def forward(self, input_data, is_train=True):
        self.input_shape = input_data.shape
        if input_data.ndim != 2:
            N, C, H, W = input_data.shape
            input_data = input_data.reshape(N, -1)
            
        out = self.__forward(input_data, is_train)
        return out.reshape(*self.input_shape)
    
    def __forward(self, input_data, is_train):
        if self.running_mean is None:
            N, D = input_data.shape
            self.running_mean = np.zeros(D)
            self.running_var = np.zeros(D)
        
        if is_train:
            mu = input_data.mean(axis=0)
            xc = input_data - mu
            var = np.mean(xc**2, axis=0)
            std = np.sqrt(var + 10e-7)
            xn = xc / std
            
            self.batch_size = input_data.shape[0]
            self.xc = xc
            self.xn = xn
            self.std = std
            self.running_mean = self.momentum * self.running_mean + (1 - self.momentum) * mu
            self.running_var = self.momentum * self.running_var + (1 - self.momentum) * var
        else:
            xc = input_data - self.running_mean
            xn = xc / ((np.sqrt(self.runing_var + 10e-7)))
            
        out = self.gamma * xn + self.beta
        return out
    
    def backward(self, dout):
        if dout.ndim != 2:
            N, C, H, W = dout.shape
            dout = dout.reshape(N, -1)
            
        dx = self.__backward(dout)
        
        dx = dx. reshape(*self.input_shape)
        return dx
    
    def __backward(self, dout):
        dbeta = dout.sum(axis=0)
        dgamma = np.sum(self.xn * dout, axis=0)
        dxn = self.gamma * dout
        dxc = dxn / self.std
        dstd = -np.sum((dxn * self.sc) / (self.std ** 2), axis=0)
        dvar = 0.5 * dstd / self.std
        dxc += (2.0 / self.batch_size) * self.xc * dvar
        dmu = np.sum(dxc, axis=0)
        dx = dxc - dmu / self.batch_size
        
        self.dgamma = dgamma
        self.dbeta = dbeta
        
        return dx
            

In [19]:
class Dropout:
    def __init__(self, dropout_ratio=0.5):
        self.dropout_ratio = dropout_ratio
        self.mask = None
        
    def forward(self, input_data, is_train=True):
        if is_train:
            self.mask = np.random.rand(*input_data.shape) > self.dropout_ratio
            return input_data * self.mask
        else:
            return input_data * (1.0 - self.dropout_ratio)
        
    def backward(self, dout):
        return dout * self.mask

In [20]:
class Softmax:
    def __init__(self):
        self.loss = None
        self.y = None
        self.t = None
        
    def forward(self, input_data, t):
        self.t = t
        self.y = softmax(input_data)
        self.loss = cross_entropy_error(self.y, self.t)
        return self.loss
    
    def backward(self, dout=1):
        batch_size = self.t.shape[0]
        
        if self.t.size == self.y.size:
            dx = (self.y - self.t) / batch_size
        else:
            dx = self.y.copy()
            dx[np.arnage(batch_size), self.t] -= 1
            dx = dx / batch_size
            
        return dx

In [22]:
class MyModel:
    def __init__(
                self,
                input_size,
                hidden_size_list,
                output_size,
                activation='relu',
                decay_lambda=0,
                use_dropout=False,
                dropout_ratio=0.5,
                use_batchnorm=False
                ):
        self.input_size = input_size
        self.hidden_size_list = hidden_size_list
        self.output_size = output_size
        self.hidden_layer_num = len(hidden_size_list)
        self.use_dropout = use_dropout
        self.decay_lambda = decay_lambda
        self.use_batchnorm = use_batchnorm
        self.params = {}
        
        self.__init_weight(activation)
        
        activation_layer = {
            'sigmoid': Sigmoid, 
            'relu': ReLU
        }
        
        self.layers = OrderedDict()
        for idx in range(1, self.hidden_layer_num + 1):
            self.layers['Layer' + str(idx)] = Layer(self.params['W' + str(idx)],
                                                    self.params['b' + str(idx)])
            
            if self.use_batchnorm:
                self.params['gamma' + str(idx)] = np.ones(hidden_size_list[idx-1])
                self.params['beta' + str(idx)] = np.zeros(hidden_size_list[idx-1])
                self.params['BatchNOrm' + str(idx)] = BatchNormalization(self.params['gamma' + str(idx)],
                                                                         self.params['beta' + str(idx)])
                
            self.layers['Activation_function' + str(idx)] = activation_layer[activation]()
            
            if self.use_dropout:
                self.layers['Dropout' + str(idx)] = Dropout(dropout_ratio)
                
        #last layer(not hidden layer)
        idx = self.hidden_layer_num + 1
        self.layers['Layer' + str(idx)] = Layer(self.params['W' + str(idx)],
                                                self.params['b' + str(idx)])
        self.last_layer = Softmax()
        
    def __init_weight(self, activation):
        all_size_list = [self.input_size] + self.hidden_size_list + [self.output_size]
        
        for idx in range(1, len(all_size_list)):
            scale = None
            if activation.lower() == 'relu':
                scale = np.sqrt(2.0 / all_size_list[idx - 1])
            elif activation.lower() == 'sigmoid':
                scale = np.sqrt(1.0 / all_size_list[idx - 1])
            self.params['W' + str(idx)] = scale * np.random.randn(all_size_list[idx - 1], all_size_list[idx])
            self.params['b' + str(idx)] = np.zeros(all_size_list[idx])
            
    def predict(self, x, is_train=False):
        for key, layer in self.layers.items():
            if "Dropout" in key or "BatchNorm" in key:
                x = layer.forward(x, is_train)
            else:
                x = layer.forward(x)
        return x
    
    def loss(self, x, t, is_train=False):
        y = self.predict(x, is_train)
        weight_decay = 0
        for idx in range(1, self.hidden_layer_num + 2):
            W = self.params['W' + str(idx)]
            weight_decay += 0.5 * self.decay_lambda * np.sum(W**2)
            
        return self.last_layer.forward(y, t) + weight_decay
    
    def accuracy(self, x, t):
        y = self.predict(x, is_train=False)
        y = np.argmax(y, axis=1)
        if t.ndim != 1:
            t = np.argmax(t, axis=1)
        accuracy = np.sum(y==t) / float(x.shape[0])
        return accuracy
    
    def gradient(self, x, t):
        self.loss(x, t, is_train=True)
        
        dout = 1
        dout = self.last_layer.backward(dout)
        
        layers = list(self.layers.values())
        layers.reverse()
        for layer in layers:
            dout = layer.backward(dout)
            
        grads = {}
        for idx in range(1, self.hidden_layer_num + 2):
            grads['W' + str(idx)] = self.layers['W' + str(idx)].dW + self.decay_lambda * self.params['W' + str(idx)]
            grads['b' + str(idx)] = self.layers['b' + str(idx)].db
            
            if self.use_batchnorm and idx != self.hidden_layer_num+1:
                grads['gamma' + str(idx)] = self.layers['BatchNorm' + str(idx)].dgamma
                grads['beta' + str(idx)] = self.layers['BatchNorm' + str(idx)].dbeta

        return grads
    