In [None]:
import sys
lib_dir = "g:/My Drive/Storage/Github/hyuckjinkim"
sys.path.append(lib_dir)

from lib.python.graph import MatplotlibFontManager
fm = MatplotlibFontManager()
fm.set_korean_font(check=False)

In [None]:
import os
import time
import datetime
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
# Logic gate : 51p ~ 59p
def gate(values, gate_type):
    values = np.array(values, dtype=np.float64)

    gate_type = gate_type.upper()
    weights = np.ones_like(values) / len(values)

    assert all(v in [0,1] for v in values), "Each value must be either 0 or 1."
    assert gate_type in ['AND','OR','NAND','XOR'], "gate_type must be one of ['AND','OR','NAND','XOR']."
    
    # AND, OR, XOR 그래프 참조 : https://blog.naver.com/hyemin8670/222428861609
    if gate_type=='XOR':
        # 재귀함수 생성
        # XOR(x) = AND(NAND(x),OR(x))
        nand_output = gate(values, 'nand')
        or_output = gate(values, 'or')
        xor_output = gate([nand_output, or_output], 'and')
        return xor_output

    else:
        offset = 0.2
        bias_dict = {
            'AND' : - sum(weights[1:]) - offset,
            'OR' :  - weights[0] + offset,
            'NAND' : - sum(weights[1:]) - offset, # AND gate 활용
        }
        bias = bias_dict.get(gate_type)
        z = values @ weights + bias

        if gate_type in ['AND','OR']:
            gate_output = 1 if z>=0 else 0
        elif gate_type in ['NAND']:
            gate_output = 1 if z<0 else 0 # not AND
        return gate_output

In [None]:
# 각 Gate에서의 진리표 테스트
values_comb = [[x,y] for x in [0,1] for y in [0,1]]
# values_comb = [[x,y,z] for x in [0,1] for y in [0,1] for z in [0,1]]
gate_types = ['AND','OR','NAND','XOR']

for gate_type in gate_types:
    print(f'\n## {gate_type=}')
    for values in values_comb:
        print(f'{values=}: output={gate(values, gate_type)}')

In [None]:
# Sigmoid & Arctangent : 73p
def sigmoid(x):
    x = np.array(x, dtype=np.float64)
    return 1 / (1 + np.exp(-x))

x = np.linspace(-10,10,num=100)
y1 = sigmoid(x)
y2 = np.arctan(x)

plt.figure(figsize=(7,5))
plt.plot(x,y1,'-')
plt.plot(x,y2,'-')
plt.grid(alpha=0.3)
plt.axhline(0, color='black', linestyle='--', alpha=0.5)
plt.axvline(0, color='black', linestyle='--', alpha=0.5)
plt.xlabel('x')
plt.ylabel('f(x)')
plt.legend(['sigmoid','arctan'])
plt.show()

In [None]:
# Softmax : 94p
def softmax(x):
    x = np.array(x, dtype=np.float64)

    z = x - np.max(x, axis=-1, keepdims=True) # overflow 방지
    numerator = np.exp(z)
    denominator = np.sum(numerator, axis=-1, keepdims=True)
    softmax = numerator / denominator
    
    return softmax

x = [[199,200,200], [1,2,3]]
print(softmax(x).round(4))
print(softmax(x).sum(axis=1))

In [None]:
# Cross entropy : 115p

# y_true와 y_pred의 모든 요소를 고려하여 계산하는 방법
def cross_entropy_error(y_true, y_pred):
    delta = 1e-7

    y_true = np.array(y_true, dtype=np.float64)
    y_pred = np.array(y_pred, dtype=np.float64)
    
    cee = -np.sum(y_true * np.log(y_pred+delta))
    return cee

# y_true가 원-핫 인코딩일 경우, 정답에 해당하는 클래스의 예측 확률에만 집중해서 계산하는 방법
# (1) 계산 효율성: 정답이 아닌 클래스에 대해서도 불필요한 곱셈과 로그 계산을 줄임
# (2) 정확성: 정답 클래스에 해당하는 예측 확률만을 사용해 손실을 계산하므로, 실제로 필요한 정보를 정확하게 추출
def cross_entropy_error(y_true, y_pred):
    delta = 1e-7

    y_true = np.array(y_true, dtype=np.float64)
    y_pred = np.array(y_pred, dtype=np.float64)

    if y_pred.ndim == 1:
        y_true = y_true.reshape(1, y_true.size)
        y_pred = y_pred.reshape(1, y_pred.size)
        
    # 훈련 데이터가 원-핫 벡터라면 정답 레이블의 인덱스로 반환
    if y_true.size == y_pred.size:
        y_true = y_true.argmax(axis=1)
    else:
        y_true = y_true.astype('uint')

    # 정답 클래스에 해당하는 예측 확률만을 사용해 batch size 내 평균손실을 계산
    batch_size = y_pred.shape[0]
    cee = -np.mean(np.log(y_pred[np.arange(batch_size), y_true] + delta))

    return cee

In [None]:
y_true = [0,0,1,0,0,0,0,0,0,0]
y_pred = [0.1,0.05,0.6,0.0,0.05,0.1,0.0,0.1,0.0,0.0]
cross_entropy_error(y_true, y_pred)

In [None]:
# 수치미분값 : 123p
def numerical_diff(f,x):
    x = np.array(x, dtype=np.float64)
    h = 1e-4
    return (f(x+h) - f(x-h)) / (2*h)

# 접선기울기값 : 125p
def tangent_line(f, x):
    x = np.array(x, dtype=np.float64)
    d = numerical_diff(f, x) # f'(x) : 기울기
    # 접선의 방정식을 f'(x0) = d*x0 + y0로 표현하면, x0값이 주어질 때 이에 해당하는 y축절편의 값인 y0를 모름
    # 이 값을 유도하기위해, 기존 함수인 f(x0)=f'(x0)를 사용한다. (x0에서의 접선이므로, f(x0)값과 f'(x0)값은 같으므로)
    # 즉, y0 = f'(x0) - d*x0 = f(x0) - d*x0가 된다.
    y = f(x) - d*x

    # f'(x) = 기울기 * x값 + y절편값
    return lambda t: d*t + y

In [None]:
def f(x):
    #return x**2
    return (x-2)*(x-1)*(x+3)

x = np.arange(-5.0, 5.0, 0.1)

x0 = 2
tf = tangent_line(f, x0)
y1 = f(x)
y2 = tf(x)

plt.figure()
plt.plot(x, y1, color='black')
plt.plot(x, y2, color='green', alpha=0.5)
plt.xlabel("x")
plt.ylabel("f(x)")
plt.grid()
plt.legend(['f(x)',f'x={x0}에서의 접선방정식'])
plt.show()

In [None]:
# partial derivative : 127p
def _numerical_gradient_no_batch(f, x):
    x = np.array(x, dtype=np.float64)

    h = 1e-4
    grad = np.zeros_like(x, dtype=np.float64)

    for idx in np.ndindex(x.shape):
        # 해당 idx에만 h를 더해주거나 빼주도록 h vector를 생성
        h_vector = np.zeros_like(x)
        h_vector[idx] = h

        grad[idx] = (f(x+h_vector) - f(x-h_vector)) / (2*h)

    return grad

def numerical_gradient(f, X):
    X = np.array(X, dtype=np.float64)
    if X.ndim == 1:
        return _numerical_gradient_no_batch(f, X)
    else:
        grad = np.zeros_like(X)
        for idx, x in enumerate(X):
            grad[idx] = _numerical_gradient_no_batch(f, x)
        return grad

def f(x):
    x = np.array(x, dtype=np.float64)
    if x.ndim == 1:
        return np.sum(x**2)
    else:
        return np.sum(x**2, axis=1)

In [None]:
for x in [[3,4],[0,2],[3,0]]:
    x = np.array(x)
    print(numerical_gradient(f, x))

In [None]:
# 다른 방식의 수치적 기울기 계산 함수
def numerical_gradient_2(f, x):
    # 이 부분이 추가되지 않으면 grad 결과값의 오차가 커짐
    x = np.array(x, dtype=np.float64)

    h = 1e-4
    grad = np.zeros_like(x)
    
    it = np.nditer(x, flags=['multi_index'], op_flags=['readwrite'])
    while not it.finished:
        idx = it.multi_index
        tmp_val = x[idx]
        
        x[idx] = tmp_val + h
        fxh1 = f(x)  # f(x+h)
        
        x[idx] = tmp_val - h
        fxh2 = f(x)  # f(x-h)
        
        grad[idx] = (fxh1 - fxh2) / (2 * h)
        x[idx] = tmp_val  # 원래 값 복원
        it.iternext()
        
    return grad

In [None]:
for x in [[3,4],[0,2],[3,0]]:
    x = np.array(x)
    print(numerical_gradient_2(f, x))

In [None]:
# gradient plot : 129p
def gradient_plot(f, minval, maxval):
    x1 = x2 = np.linspace(minval, maxval, num=20)
    X1, X2 = np.meshgrid(x1, x2)
    Y = f([X1,X2])

    # calculate the gradient
    grad = numerical_gradient(f, [X1.flatten(), X2.flatten()])

    # visualization
    fig = plt.figure(figsize=(15,7))

    # (1) 2d gradient vector field
    ax1 = fig.add_subplot(121)
    ax1.quiver(X1, X2, -grad[0], -grad[1], angles="xy", color="#666666")#,headwidth=10,scale=40,color="#444444")
    ax1.set_xlim([minval, maxval])
    ax1.set_ylim([minval, maxval])
    ax1.set_xlabel('x1')
    ax1.set_ylabel('x2')
    ax1.grid()
    ax1.set_title('2D Gradient Vector Field')

    # (2) 3d plot
    ax2 = fig.add_subplot(122, projection='3d')
    ax2.plot_surface(X1, X2, Y, cmap='viridis')
    ax2.set_xlabel('x1')
    ax2.set_ylabel('x2')
    ax2.set_zlabel('y')
    ax2.grid()
    ax2.set_title('3D Plot')
    
    plt.tight_layout()
    plt.show()

In [None]:
gradient_plot(lambda t: sum(np.array(t)**2), -2, 2)
gradient_plot(lambda t: sum(np.array(t)**3), -100, 100)

In [None]:
# gradient descent : 131p
def gradient_descent(f, init, lr=0.01, step_num=100):
    x = np.array(init, dtype=np.float64)
    x_history = []

    for i in range(step_num):
        x_history.append(x.copy())

        grad = numerical_gradient(f, x)
        x -= lr * grad

    return x, np.array(x_history)

# gradient descent plot : 132p
def gradient_descent_plot(f, init, lr, step_num, minval, maxval):
    x, x_history = gradient_descent(f, init, lr=lr, step_num=step_num)

    offset = 1.2
    x1 = x2 = np.linspace(minval*offset, maxval*offset, num=20)
    X1, X2 = np.meshgrid(x1, x2)
    # [X1,X2]로 하게되면 2개의 batch로 인식 할 수 있으므로 [[X1,X2]]로 묶어주고, squeeze를 통해 차원이 1인 축을 제거
    Y = f([[X1,X2]]).squeeze()

    # visualization
    plt.figure(figsize=(7,7))

    # (1) contour
    plt.contour(X1, X2, Y, levels=max(abs(minval),abs(maxval)), colors='gray', linestyles='dashed')

    # (2) gradient descent moving
    for i in range(len(x_history)-1):
        val_dict = {
            'X' : x_history[i, 0],
            'Y' : x_history[i, 1],
            'U' : x_history[i+1, 0] - x_history[i, 0],
            'V' : x_history[i+1, 1] - x_history[i, 1],
        }
        plt.quiver(*list(val_dict.values()), angles='xy', scale_units='xy', scale=1.1, color='blue', width=0.003, alpha=0.5)
        plt.annotate(str(i+1), [val_dict['X']-0.3, val_dict['Y']], color='blue', alpha=0.5)

    # base line
    plt.axvline(0, color='black', linestyle='-', alpha=0.8)
    plt.axhline(0, color='black', linestyle='-', alpha=0.8)
    plt.plot(x_history[:,0], x_history[:,1], 'ob', markersize=3)

    plt.xlim(minval, maxval)
    plt.ylim(minval, maxval)
    plt.xlabel("x1")
    plt.ylabel("x2")
    plt.show()

In [None]:
gradient_descent_plot(f, init=[-3,4], lr=0.1, step_num=20, minval=-6, maxval=6)

In [None]:
# simple network : 134p
# -> loss() 함수에 w를 넣지 않으면 lambda w: net.loss(x,t)에서 w 매개변수가 없어 기울기를 계산할 수 없음
class SimpleNet:
    def __init__(self, input_size, output_size):
        self.input_size = input_size    # 입력된 데이터의 개수
        self.output_size = output_size  # true label의 nunique

        np.random.seed(42)
        self.weight = np.random.randn(input_size, output_size)
        self.bias = np.zeros(output_size)

    def predict(self, x, weight=None, bias=None):
        if weight is None:
            weight = self.weight
        if bias is None:
            bias = self.bias
        z = x @ weight + bias
        return z

    def loss(self, x, y_true, weight=None, bias=None):
        if weight is None:
            weight = self.weight
        if bias is None:
            bias = self.bias
        z = self.predict(x, weight, bias)
        y_pred = softmax(z)
        loss = cross_entropy_error(y_true=y_true, y_pred=y_pred)
        return loss

In [None]:
x = [0.6,0.9]
t = [1,0,0] # true label: 0,1,2 중 2이라는 뜻의 one-hot encoding

# x = [0.5, 0.4, 0.8]
# t = [0,1]

net = SimpleNet(input_size=len(x), output_size=len(t))
print('> Weights:') # weight matrix
print(net.weight,'\n')

z = net.predict(x) # z
print(f'> z: {z} \n')

y_pred = softmax(z)
print(f'> Predicted Probability: {y_pred} \n')

prediction = np.argmax(z) # prediction label
print(f'> Predicted Label: {prediction} \n')

loss = net.loss(x, t)
print(f'> Cross Entropy Loss: {loss:.5f}')

In [None]:
# dw (weight의 기울기)

# (1) 수치적 기울기 계산
# -> loss() 함수에 w를 넣지 않으면 lambda w: net.loss(x,t)에서 w 매개변수가 없어 기울기를 계산할 수 없음
# -> 따라서, w를 매개변수로 넣어줘야함
f = lambda w: net.loss(x, t, w, None)
dW1 = numerical_gradient(f, net.weight[np.newaxis, :, :])
print(dW1)

In [None]:
# (2) 해석적 기울기 계산
# 참조 : https://davidbieber.com/snippets/2020-12-12-derivative-of-softmax-and-the-softmax-cross-entropy-loss/
def analytical_gradient(x, y_true, y_pred):
    return np.outer(x, (y_pred - y_true))

# 해석적 기울기 계산
dW2 = analytical_gradient(x, t, y_pred)
print(dW2)

In [None]:
(dW1-dW2).round(7)

In [None]:
# 2층 신경망 구현 : 137p
from tqdm import tqdm, trange
from collections import OrderedDict
from copy import deepcopy
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed

def identity(x):
    return x

def relu(x):
    return np.maximum(0, x)

def relu_derivative(x):
    return np.where(x>0, 1, 0)

def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def sigmoid_derivative(x):
    return sigmoid(x) * (1 - sigmoid(x))

class MultiLayerNet:
    def __init__(self, input_size, output_size, hidden_sizes, activation=None):
        self.input_size = input_size
        self.output_size = output_size
        self.hidden_sizes = hidden_sizes

        self.node_sizes = [input_size] + hidden_sizes + [output_size]
        self.n_layers = len(self.node_sizes)-1

        if activation in ['sigmoid',None]:
            self.activation = sigmoid
            self.activation_derivative = sigmoid_derivative
        elif activation=='relu':
            self.activation = relu
            self.activation_derivative = relu_derivative

        if activation is None:
            self.weight_init()
        elif activation=='sigmoid':
            self.weight_init(method='Xavier')
        elif activation=='relu':
            self.weight_init(method='He')

    # def weight_init(self, weight_init_std=0.01, seed=42):
    #     np.random.seed(seed)
    #     self.params = {'weights': OrderedDict(), 'biases': OrderedDict()}

    #     for i in range(self.n_layers):
    #         self.params['weights'][f'w{i}'] = weight_init_std * np.random.randn(self.node_sizes[i], self.node_sizes[i+1])
    #         self.params['biases'] [f'b{i}'] = np.zeros(self.node_sizes[i+1])

    # Xavier, He, Default(torch에서 사용하는 방식)
    # > 책 205p
    # > torch 공식문서
    #   - nn.Linear: https://pytorch.org/docs/stable/generated/torch.nn.Linear.html
    #   - nn.Conv2d: https://pytorch.org/docs/stable/generated/torch.nn.Conv2d.html
    def weight_init(self, method=None, seed=42):
        np.random.seed(seed)
        self.params = {'weights': OrderedDict(), 'biases': OrderedDict()}

        for i in range(self.n_layers):
            input_dim = self.node_sizes[i]
            output_dim = self.node_sizes[i+1]

            if method is None:
                limit = 0.01
            elif method == 'Xavier':
                limit = np.sqrt(1 / input_dim)   # Xavier initialization
            elif method == 'He':
                limit = np.sqrt(2 / input_dim)   # He initialization
            else:
                raise ValueError

            self.params['weights'][f'w{i}'] = np.random.randn(input_dim, output_dim) * limit
            self.params['biases'] [f'b{i}'] = np.random.randn(output_dim) * limit

    def predict(self, x, params=None, return_activations=False):
        if params is None:
            weights = deepcopy(self.params['weights'])
            biases = deepcopy(self.params['biases'])
        else:
            weights = params['weights']
            biases = params['biases']

        if return_activations:
            activations = []

        # 초기 activation은 x로 초기화
        activation = x
        
        # layer들의 선형결합
        for j, ((w_name, w_ij), (b_name, b_j)) in enumerate(zip(weights.items(), biases.items())):
            # logit (linear combination)
            logits = activation @ w_ij + b_j

            # activation function
            if j+1 != self.n_layers:
                activation = self.activation(logits)
            else:
                activation = logits
            
            if return_activations:
                activations.append(activation)

        if return_activations:
            return activation, activations
        else:
            return activation

    def loss(self, x, y_true, params=None):
        activation = self.predict(x, params)
        prob = softmax(activation)
        loss = cross_entropy_error(y_true=y_true, y_pred=prob)
        return loss

    def accuracy(self, x, y_true):
        y_pred = self.predict(x)
        y_pred = np.argmax(y_pred, axis=1)
        y_true = np.argmax(y_true, axis=1)
        
        acc = np.sum(y_pred==y_true) / len(y_pred)
        return acc

    def _compute_numerical_gradient(self, x, y_true, name, idx, what):
        h = 1e-4

        # set params
        params = {'weights': deepcopy(self.params['weights']), 'biases': deepcopy(self.params['biases'])}
        val = params[what][name][idx]

        # f(x+h)
        params[what][name][idx] = val + h
        fxh1 = self.loss(x, y_true, params)
        # f(x-h)
        params[what][name][idx] = val - h
        fxh2 = self.loss(x, y_true, params)

        # ( f(x+h) - f(x-h) ) / 2h
        return (fxh1 - fxh2) / (2*h)

    def numerical_gradient(self, x, y_true, max_workers=1):
        Executor = ThreadPoolExecutor # ThreadPoolExecutor. ProcessPoolExecutor
        params = {'weights': OrderedDict(), 'biases': OrderedDict()}
        weights = self.params['weights']
        biases = self.params['biases']

        for j, ((w_name, w_ij), (b_name, b_j)) in enumerate(zip(weights.items(), biases.items())):
            
            # (1) weight gradients
            iterable = np.ndindex(weights[w_name].shape)
            total = weights[w_name].size
            desc = f'[{j+1}/{len(weights)}] - weights'

            grad_weight = np.zeros_like(weights[w_name], dtype=np.float64)

            if max_workers==1:
                pbar = tqdm(iterable, total=total, desc=desc)
                for idx in pbar:
                    grad_weight[idx] = self._compute_numerical_gradient(x, y_true, w_name, idx, 'weights')
            else:
                with Executor(max_workers=max_workers) as executor:
                    futures = {
                        executor.submit(self._compute_numerical_gradient, x, y_true, w_name, idx, 'weights'): idx
                        for idx in iterable
                    }
                    #pbar = tqdm(as_completed(futures), total=total, desc=desc, position=0, leave=False)
                    pbar = as_completed(futures)
                    for future in pbar:
                        idx = futures[future]
                        grad_weight[idx] = future.result()

            params['weights'][w_name] = grad_weight

            # (2) bias gradients
            grad_bias = np.zeros_like(biases[b_name], dtype=np.float64)

            iterable = np.ndindex(biases[b_name].shape)
            total = biases[b_name].size
            desc = f'[{j+1}/{len(biases)}] - biases'

            if max_workers==1:
                pbar = tqdm(iterable, total=total, desc=desc)
                for idx in pbar:
                    grad_bias[idx] = self._compute_numerical_gradient(x, y_true, b_name, idx, 'biases')
            else:
                with Executor(max_workers=max_workers) as executor:
                    futures = {
                        executor.submit(self._compute_numerical_gradient, x, y_true, b_name, idx, 'biases'): idx
                        for idx in pbar
                    }
                    #pbar = tqdm(as_completed(futures), total=total, desc=desc, position=0, leave=False)
                    pbar = as_completed(futures)
                    for future in pbar:
                        idx = futures[future]
                        grad_bias[idx] = future.result()

            params['biases'][b_name] = grad_bias

        return params

In [None]:
n = 5
input_size = 50
output_size = 3
hidden_sizes = [16]

net = MultiLayerNet(input_size, output_size, hidden_sizes, activation=None)
print(f"> node sizes: {net.node_sizes}")
print(f"> weights sizes: {[v.shape for k,v in net.params['weights'].items()]}")
print(f"> bias sizes: {[v.shape for k,v in net.params['biases'].items()]}")

x = np.random.rand(n, input_size)
y_true = np.random.rand(n, output_size)

y_pred = net.predict(x)
print(f"> y_pred size: {y_pred.shape}")

loss = net.loss(x, y_true)
print(f"> loss: {loss:.4f}")

acc = net.accuracy(x, y_true)
print(f"> accuracy: {acc:.4f}")

In [None]:
# relu인 경우 출력값이 음수인 경우 0이 되므로 가중치가 0이 될 수 있음
numerical_grads = net.numerical_gradient(x, y_true, max_workers=8)
numerical_grads['weights']['w0']

In [None]:
# 미니배치 구현 : 141p

def label_onehot(label, num_classes):
    label = np.array(label)
    label = label - min(label)
    onehot = np.zeros((len(label), num_classes))
    for i, l in enumerate(label):
        onehot[i, l] = 1
    return onehot

# # HTTP 403 에러 뜸
# from dataset.mnist import load_mnist
# (X_train, y_train), (X_test, y_test) = load_mnist(normalize=True, one_hot_label=True)

import tensorflow as tf
mnist = tf.keras.datasets.mnist
(X_train, y_train), (X_test, y_test) = mnist.load_data()

X_train = X_train.astype(np.float64)
X_test = X_test.astype(np.float64)

# normalize
X_train, X_test = X_train / 255.0, X_test / 255.0

# X -> flatten
X_train = X_train.reshape(X_train.shape[0], -1)
X_test  = X_test .reshape(X_test .shape[0], -1)

In [None]:
# max_workers = os.cpu_count() // 2
# iters_num = 100
# train_size = X_train.shape[0]
# batch_size = 100
# lr = 0.1

# input_size = 784
# output_size = 10
# hidden_sizes = [50]

# model = MultiLayerNet(input_size, output_size, hidden_sizes, activation=None)

# elapsed_times = []
# train_loss_list = []
# val_loss_list = []
# for i in range(iters_num):
#     start_time = time.time()

#     # get mini-batch (실제로는 모든 6만개의 데이터에 대해서 모두 가중치 업데이트를 해야 1개의 epoch로 쳐줌)
#     batch_mask = np.random.choice(train_size, batch_size)
#     X_batch, y_batch = X_train[batch_mask], y_train[batch_mask]
    
#     # calculate gradients
#     grads = model.numerical_gradient(X_batch, y_batch, max_workers=max_workers)

#     # gradient descent
#     for grad_key in grads.keys():
#         for key in grads[grad_key]:
#             model.params[grad_key][key] -= lr * grads[grad_key][key]

#     # loss
#     train_loss = model.loss(X_batch, y_batch)
#     val_loss   = model.loss(X_test, y_test)

#     train_loss_list.append(train_loss)
#     val_loss_list  .append(val_loss)

#     # progress
#     now_time = str(datetime.datetime.now())[:-7]
#     end_time = time.time()
#     elapsed = end_time-start_time
#     elapsed_times.append(elapsed)
#     total = sum(elapsed_times)
#     remaining = (iters_num-i-1) * elapsed

#     str_i = str(i+1).zfill(len(str(iters_num)))
#     progress = f'{now_time} | [{str_i}/{iters_num}] {train_loss=:.4f}, {val_loss=:.4f}, {elapsed=:.1f}s, {total=:.1f}s, {remaining=:.1f}s'
#     print(progress)

In [None]:
# 곱셈 계층 : 161p
class MulLayer:
    def __init__(self):
        self.x = None
        self.y = None
    
    def forward(self, x, y):
        self.x = x
        self.y = y
        return x * y # 순전파는 곱셈으로 반환

    def backward(self, dout):
        dx = dout * self.y # x와 y를 바꾸어서 미분값에 곱함
        dy = dout * self.x
        return dx, dy

# 덧셈 계층 : 163p
class AddLayer:
    def __init__(self):
        self.x = None
        self.y = None
    
    def forward(self, x, y):
        self.x = x
        self.y = y
        return x + y # 순전파는 덧셈으로 반환

    def backward(self, dout):
        dx = dout * 1
        dy = dout * 1
        return dx, dy # 값 그대로(1을 곱한채로) 반환

In [None]:
# 곱셈 계층 예제 : 162p
mul_apple_layer = MulLayer()
mul_tax_layer = MulLayer()

# (1) forward : input 2, output 1
apple = 100
apple_num = 2
tax = 1.1

apple_price = mul_apple_layer.forward(apple, apple_num) # (1-1) 사과가격(100) * 사과개수(2)
price = mul_tax_layer.forward(apple_price, tax)         # (1-2) 전체사과가격(200) * 세금(1.1)
print(price)

# (2) backward : input 1, output 2
dprice = 1

dapple_price, dtax = mul_tax_layer.backward(dprice)         # (2-2) 전체가격 -> 세금
dapple, dapple_num = mul_apple_layer.backward(dapple_price) # (2-1) 세금 -> 사과가격,개수
print(dapple, dapple_num, dtax)

In [None]:
# 곱셈+덧셈 계층 예제 : 164p

# (1) forward

# (1-1) 사과와 오렌지의 각각의 가격 및 개수
apple_price, apple_num = 100, 2   # 100원짜리 사과 2개
orange_price, orange_num = 150, 3 # 150원짜리 오렌지 3개

mul_apple_layer = MulLayer()
mul_orange_layer = MulLayer()

tot_apple_price = mul_apple_layer.forward(apple_price, apple_num)
tot_orange_price = mul_orange_layer.forward(orange_price, orange_num)

# (1-2) 사과와 오렌지의 전체 가격 합치기
add_apple_orange_layer = AddLayer()

tot_price = add_apple_orange_layer.forward(tot_apple_price, tot_orange_price)

# (1-3) 소비세 곱하기
tax_ratio = 1.1

mul_tax_layer = MulLayer()

final_price = mul_tax_layer.forward(tot_price, tax_ratio)

# forward result
print(f'\n> forward',
      f'\n   step1: {tot_apple_price=:.2f}, {tot_orange_price=:.2f},',
      f'\n   step2: {tot_price=:.2f},',
      f'\n   step3: {final_price=:.2f}')

# (2) backward
# - forward와는 순서가 반대로
# - forward에 input이었던 것들이 output으로, output이었던 것이 input으로

# (2-1) 전체 가격과 세금 나누기
dfinal_price = 1

dtot_price, dtax_ratio = mul_tax_layer.backward(dfinal_price)

# (2-2) 사과와 오렌지의 전체 가격 나누기
dtot_apple_price, dtot_orange_price = add_apple_orange_layer.backward(dtot_price)

# (2-3) 사과와 오렌지의 가격 및 개수 나누기
dapple_price, dapple_num = mul_apple_layer.backward(dtot_apple_price)
dorange_price, dorange_num = mul_orange_layer.backward(dtot_orange_price)

# backward result
print(f'\n> backward',
      f'\n   step1: {dtot_price=:.2f}, {dtax_ratio=:.2f},',
      f'\n   step2: {dtot_apple_price=:.2f}, {dtot_orange_price=:.2f},',
      f'\n   step3: {dapple_price=:.2f}, {dapple_num=:.2f}, {dorange_price=:.2f}, {dorange_num=:.2f}')

In [None]:
# relu 계층 구현 : 166p
class ReLU:
    def __init__(self):
        self.negative_mask = None

    def forward(self, x):
        x = np.array(x)
        self.negative_mask = (x <= 0)
        out = np.where(self.negative_mask, 0, x) # 0보다 큰 값은 값그대로, 0보다 작은 값은 0으로
        return out

    def backward(self, dout):
        dout = np.array(dout)
        dx = np.where(self.negative_mask, 0, dout) # 0보다 큰 값은 값그대로, 0보다 작은 값은 0으로 (dout 값에 relu_derivative를 곱해줌)
        return dx

In [None]:
x = np.linspace(-5,5,num=100)
activation = ReLU()
dout = activation.forward(x)
dx = activation.backward(dout)

fig = plt.figure(figsize=(10,4))
fig.add_subplot(1,2,1)
plt.plot(x, dout)
plt.grid(alpha=0.5)
plt.xlabel('x')
plt.ylabel('dout')
plt.title('x vs dout')
fig.add_subplot(1,2,2)
plt.plot(x, dx)
plt.grid(alpha=0.5)
plt.xlabel('x')
plt.ylabel('dx')
plt.title('x vs dx')
plt.suptitle(f'[{activation.__class__.__name__}]')
plt.show()

In [None]:
class LeakyReLU:
    def __init__(self, negative_slope=0.01):
        self.negative_slope = negative_slope
        self.negative_mask = None

    def forward(self, x):
        x = np.array(x)
        self.negative_mask = (x <= 0)
        out = np.where(self.negative_mask, self.negative_slope*x, x) # 0보다 큰 값은 값그대로, 0보다 작은 값은 negative slope를 곱한 값으로
        return out

    def backward(self, dout):
        dout = np.array(dout)
        dx = np.where(self.negative_mask, self.negative_slope*dout, dout) # 0보다 큰 값은 값그대로, 0보다 작은 값은 negative slope를 곱한 값으로 (dout 값에 leakyrelu_derivative를 곱해줌)
        return dx

In [None]:
x = np.linspace(-5,5,num=100)
activation = LeakyReLU(negative_slope=0.1)
dout = activation.forward(x)
dx = activation.backward(dout)

fig = plt.figure(figsize=(10,4))
fig.add_subplot(1,2,1)
plt.plot(x, dout)
plt.grid(alpha=0.5)
plt.xlabel('x')
plt.ylabel('dout')
plt.title('x vs dout')
fig.add_subplot(1,2,2)
plt.plot(x, dx)
plt.grid(alpha=0.5)
plt.xlabel('x')
plt.ylabel('dx')
plt.title('x vs dx')
plt.suptitle(f'[{activation.__class__.__name__}]')
plt.show()

In [None]:
# sigmoid 계층 구현 : 170p
class Sigmoid:
    def __init__(self):
        pass

    def forward(self, x):
        x = np.array(x)
        self.out = np.where(x>0, 1 / (1 + np.exp(-x)), np.exp(x) / (1 + np.exp(-x))) # overflow 방지
        return self.out

    def backward(self, dout):
        dout = np.array(dout)
        dx = dout * self.out * (1.0-self.out)
        return dx

In [None]:
x = np.linspace(-5,5,num=100)
activation = Sigmoid()
dout = activation.forward(x)
dx = activation.backward(dout)

fig = plt.figure(figsize=(10,4))
fig.add_subplot(1,2,1)
plt.plot(x, dout)
plt.grid(alpha=0.5)
plt.xlabel('x')
plt.ylabel('dout')
plt.title('x vs dout')
fig.add_subplot(1,2,2)
plt.plot(x, dx)
plt.grid(alpha=0.5)
plt.xlabel('x')
plt.ylabel('dx')
plt.title('x vs dx')
plt.suptitle(f'[{activation.__class__.__name__}]')
plt.show()

In [None]:
# Affine 계층 구현 : 175p
class Affine:
    def __init__(self, W, b):
        self.W = np.array(W)
        self.b = np.array(b)

        # 가중치와 편향의 gradient descent를 위해 각각의 기울기를 저장
        self.dW = None
        self.db = None

    def forward(self, X):
        self.X = np.array(X)
        self.origin_shape = self.X.shape

        # 다차원인 경우, batch에 대해서 적용하기위해 평탄화(2D로 변환) 시켜줌
        if self.X.ndim > 2:
            self.X = self.X.reshape(self.X.shape[0], -1)
        
        out = self.X @ self.W + self.b # out = XW + b
        
        return out

    def backward(self, dout):
        dout = np.array(dout)
        self.dW = self.X.T @ dout        # dL/dW = X^T * dL/dY
        self.db = np.sum(dout, axis=0)   # dL/dB = dL/dY = sum(dL_j/dY)
        dX = dout @ self.W.T             # dL/dX = dL/dY * W^T
        dX = dX.reshape(self.origin_shape)
        return dX

In [None]:
batch_size = 20
input_dim = 3
output_dim = 4

W = np.random.randn(input_dim,output_dim) # W : (input_size, output_size)
b = np.zeros(output_dim)                  # b : (output_size)
X = np.random.randn(batch_size,input_dim) # X : (batch_size, input_size)

affine = Affine(W,b)
print(affine.forward(X).shape)            # forward : (batch_size, output_size) = (batch_size, input_size) @ (input_size, output_sizse)
dout = np.ones((batch_size,output_dim))
print(affine.backward(dout).shape)        # backward : (batch_size, input_size)
print(affine.dW.shape, affine.db.shape)   # shape of dW, db = shape of W, b

In [None]:
# 참조 : https://slamwithme.oopy.io/305bb7e0-1062-4785-a82e-9e2a5debd0f4

# (1) Binary Class
#   (1-1) torch.nn.BCELoss() : Layer(affine, activation, batchnorm 등의 조합) 마지막에 sigmoid 추가 필요
#   (1-2) torch.nn.BCEWithLogitsLoss() : Layer 마지막에 sigmoid 추가 불필요 (sigmoid가 포함되어있음)
#

# (2) Multiple Class
#   : torch.nn.CrossEntropyLoss() : Layer 마지막에 softmax 추가 불필요 (softmax가 포함되어있음)

In [None]:
# Softmax with loss 계층 구현 : 179p
# -> torch.nn.CrossEntropyLoss() 구현 예제
class SoftmaxWithLoss:
    def __init__(self):
        self.y = None
        self.t = None

    def forward(self, y, t):
        self.y = softmax(np.array(y))
        self.t = np.array(t)

        # cross entropy loss
        self.loss = cross_entropy_error(y_pred=self.y, y_true=self.t)

        return self.loss

    def backward(self): # dout의 영향을 받지 않음
        # true값이 onehot인 경우
        batch_size = self.t.shape[0]
        if self.y.size == self.t.size:
            dx = (self.y - self.t)
        # true값이 정수형인 경우
        else:
            dx = self.y.copy()
            dx[np.arange(batch_size), self.t] -= 1  # 예측 확률에서 실제 정수형 레이블에 해당하는 값을 1 뺌

        # 오차를 batch size로 나눠서 역전파
        dx /= batch_size

        return dx

In [None]:
# 역전파를 이용한 MultiLayerNet 구현 : 182p
from tqdm import tqdm, trange
from collections import OrderedDict
from copy import deepcopy
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed

class MultiLayerNet:
    def __init__(self, input_size, output_size, hidden_sizes, activation=None, weight_init='auto', weight_init_limit=None):
        self.input_size = input_size
        self.output_size = output_size
        self.hidden_sizes = hidden_sizes
        self.weight_init_limit = weight_init_limit

        self.node_sizes = [input_size] + hidden_sizes + [output_size]
        self.n_layers = len(self.node_sizes)-1

        # activation
        if activation is None:
            activation = 'sigmoid'
        else:
            activation = activation.lower()

        activation_dict = {
            'sigmoid' : Sigmoid(),
            'relu' : ReLU(),
        }

        self.activation = activation_dict.get(activation)

        # weight init
        if weight_init=='auto':
            if activation=='sigmoid':
                self.weight_init(method='Xavier')
            elif activation=='relu':
                self.weight_init(method='He')
        else:
            self.weight_init(method=weight_init)

        # define layers
        self.layers_backward = OrderedDict()
        for i in range(self.n_layers):
            W = self.params['weights'][f'w{i+1}']
            b = self.params['biases'] [f'b{i+1}']
            self.layers_backward[f'Affine{i+1}'] = Affine(W,b)
            if i+1 != self.n_layers:
                self.layers_backward[f'Activation{i+1}'] = deepcopy(self.activation)
        self.last_layer = SoftmaxWithLoss()

    # > 책 205p
    # > torch 공식문서
    #   - nn.Linear: https://pytorch.org/docs/stable/generated/torch.nn.Linear.html
    #   - nn.Conv2d: https://pytorch.org/docs/stable/generated/torch.nn.Conv2d.html
    def weight_init(self, method=None, seed=42):
        np.random.seed(seed)
        self.params = {'weights': OrderedDict(), 'biases': OrderedDict()}

        for i in range(self.n_layers):
            input_dim = self.node_sizes[i]
            output_dim = self.node_sizes[i+1]

            if method is None:
                limit = self.weight_init_limit
            elif method == 'Xavier':
                limit = np.sqrt(1 / input_dim)   # Xavier initialization
            elif method == 'He':
                limit = np.sqrt(2 / input_dim)   # He initialization
            else:
                raise ValueError

            self.params['weights'][f'w{i+1}'] = np.random.randn(input_dim, output_dim) * limit
            self.params['biases'] [f'b{i+1}'] = np.random.randn(output_dim) * limit

    def predict_numerical_gradient(self, x, params=None, return_activations=False):
        if params is None:
            weights = deepcopy(self.params['weights'])
            biases = deepcopy(self.params['biases'])
        else:
            weights = params['weights']
            biases = params['biases']

        if return_activations:
            activations = []

        # 초기 activation은 x로 초기화
        activation = x
        
        # layer들의 선형결합
        for j, ((w_name, w_ij), (b_name, b_j)) in enumerate(zip(weights.items(), biases.items())):
            # logit (linear combination)
            logits = activation @ w_ij + b_j

            # activation function
            if j+1 != self.n_layers:
                activation = self.activation.forward(logits)
            else:
                activation = logits
            
            if return_activations:
                activations.append(activation)

        if return_activations:
            return activation, activations
        else:
            return activation

    def loss_numerical_gradient(self, x, y_true, params=None):
        activation = self.predict_numerical_gradient(x, params)
        prob = softmax(activation)
        loss = cross_entropy_error(y_true=y_true, y_pred=prob)
        return loss

    def _compute_numerical_gradient(self, x, y_true, name, idx, what):
        h = 1e-4

        # set params
        params = {'weights': deepcopy(self.params['weights']), 'biases': deepcopy(self.params['biases'])}
        val = params[what][name][idx]

        # f(x+h)
        params[what][name][idx] = val + h
        fxh1 = self.loss_numerical_gradient(x, y_true, params)
        # f(x-h)
        params[what][name][idx] = val - h
        fxh2 = self.loss_numerical_gradient(x, y_true, params)

        # ( f(x+h) - f(x-h) ) / 2h
        return (fxh1 - fxh2) / (2*h)

    def numerical_gradient(self, x, y_true, max_workers=1):
        Executor = ThreadPoolExecutor # ThreadPoolExecutor. ProcessPoolExecutor
        params = {'weights': OrderedDict(), 'biases': OrderedDict()}
        weights = self.params['weights']
        biases = self.params['biases']

        for j, ((w_name, w_ij), (b_name, b_j)) in enumerate(zip(weights.items(), biases.items())):
            
            # iterable define
            iterable_weight = np.ndindex(weights[w_name].shape)
            iterable_bias = np.ndindex(biases[b_name].shape)

            # pbar args
            total_weight = weights[w_name].size
            total_bias   = biases[b_name].size
            desc_weight = f'[{j+1}/{len(weights)}] - weights'
            desc_bias   = f'[{j+1}/{len(biases)}] - biases'

            # grad init
            grad_weight = np.zeros_like(weights[w_name], dtype=np.float64)
            grad_bias = np.zeros_like(biases[b_name], dtype=np.float64)

            if max_workers==1:
                pbar = tqdm(iterable_weight, total=total_weight, desc=desc_weight)
                for idx in pbar:
                    grad_weight[idx] = self._compute_numerical_gradient(x, y_true, w_name, idx, 'weights')

                pbar = tqdm(iterable_bias, total=total_bias, desc=desc_bias)
                for idx in pbar:
                    grad_bias[idx] = self._compute_numerical_gradient(x, y_true, b_name, idx, 'biases')

            else:
                with Executor(max_workers=max_workers) as executor:
                    futures_weight = {
                        executor.submit(self._compute_numerical_gradient, x, y_true, w_name, idx, 'weights'): idx
                        for idx in iterable_weight
                    }
                    futures_bias = {
                        executor.submit(self._compute_numerical_gradient, x, y_true, b_name, idx, 'biases'): idx
                        for idx in iterable_bias
                    }

                    pbar = tqdm(as_completed(futures_weight), total=total_weight, desc=desc_weight, position=0, leave=False)
                    # pbar = as_completed(futures_weight)
                    for future_weight in pbar:
                        idx = futures_weight[future_weight]
                        grad_weight[idx] = future_weight.result()

                    pbar = tqdm(as_completed(futures_bias), total=total_bias, desc=desc_bias, position=0, leave=False)
                    # pbar = as_completed(futures_bias)
                    for future_bias in pbar:
                        idx = futures_bias[future_bias]
                        grad_bias[idx] = future_bias.result()

            # get gradient
            params['weights'][w_name] = grad_weight
            params['biases'][b_name] = grad_bias

        return params

    def predict_backward(self, x):
        for layer in self.layers_backward.values():
            x = layer.forward(x)
        return x

    def loss_backward(self, x, y_true):
        y_pred = self.predict_backward(x)
        loss = self.last_layer.forward(y=y_pred, t=y_true)
        return loss

    def gradient(self, x, y_true):
        params = {'weights': {}, 'biases': {}}

        # 순전파
        self.loss_backward(x, y_true)

        # 역전파
        dout = self.last_layer.backward()

        for key, layer in reversed(self.layers_backward.items()):
            dout = layer.backward(dout)
            if key.startswith('Affine'):
                i = int(key.replace('Affine',''))
                params['weights'][f'w{i}'] = layer.dW
                params['biases'] [f'b{i}'] = layer.db

        return params

In [None]:
def batch_split(X, y, batch_size, shuffle=False, random_state=42):
    if shuffle:
        np.random.seed(random_state)
        shuffled_indices = np.random.permutation(len(X))
        X = X[shuffled_indices]
        y = y[shuffled_indices]

    return [
        [X[i:i + batch_size] for i in range(0, len(X), batch_size)],
        [y[i:i + batch_size] for i in range(0, len(X), batch_size)],
    ]

In [None]:
# optimizer : 191p ~ 199p

# https://github.com/WegraLee/deep-learning-from-scratch/blob/master/common/optimizer.py
# *NAG(모멘텀에서 한단계 발전한 방법)은 위 사이트 참조
__all__ = ['SGD', 'Momentum', 'AdaGrad', 'RMSProp', 'Adam']
print(f'{__all__ = }')

params_dict = {'weights':{}, 'biases':{}, 'gamma':{}, 'beta':{}}

class SGD:
    def __init__(self, lr=0.01):
        self.lr = lr

    def update(self, params, grads):
        for key1 in grads.keys():
            for key2 in grads[key1].keys():
                params[key1][key2] -= lr * grads[key1][key2]
        return params

# 기울기 방향으로 가속
class Momentum:
    def __init__(self, lr=0.01, momentum=0.9):
        self.lr = lr
        self.momentum = momentum
        self.v = None

    def update(self, params, grads):
        if self.v is None:
            self.v = params_dict
            for key1 in grads.keys():
                for key2 in grads[key1].keys():
                    self.v[key1][key2] = np.zeros_like(grads[key1][key2])

        for key1 in grads.keys():
            for key2 in grads[key1].keys():
                # 초기에는 v의 모든값이 0으로 SGD와 같지만, 업데이트됨에따라 momentum이 더해져 가속됨
                self.v[key1][key2] = (self.momentum * self.v[key1][key2]) - (lr * grads[key1][key2]) 
                params[key1][key2] += self.v[key1][key2]

        return params

# learning rate가 너무 작으면 학습이 느리고, 너무 크면 발산함
# -> 학습을 진행하면서 학습률을 점차 줄여감
class AdaGrad:
    def __init__(self, lr=0.01):
        self.lr = lr
        self.h = None
        self.eps = 1e-7

    def update(self, params, grads):
        if self.h is None:
            self.h = params_dict
            for key1 in grads.keys():
                for key2 in grads[key1].keys():
                    self.h[key1][key2] = np.zeros_like(grads[key1][key2])
        
        for key1 in grads.keys():
            for key2 in grads[key1].keys():
                self.h[key1][key2] += grads[key1][key2]**2 # 가중치 감쇠
                params[key1][key2] -= lr * grads[key1][key2] / (np.sqrt(self.h[key1][key2]) + self.eps)
                
        return params

# AdaGrad는 기울기를 제곱해서 계속 더해가므로, 무한히 학습한다면 갱신량이 0으로 수렴함
# -> 이 문제를 개선한 방법으로 RMSProp이 제안됨
class RMSProp:
    def __init__(self, lr=0.01, decay_rate=0.99):
        self.lr = lr
        self.decay_rate = decay_rate
        self.h = None
        self.eps = 1e-7

    def update(self, params, grads):
        if self.h is None:
            self.h = params_dict
            for key1 in grads.keys():
                for key2 in grads[key1].keys():
                    self.h[key1][key2] = np.zeros_like(grads[key1][key2])
        
        for key1 in grads.keys():
            for key2 in grads[key1].keys():
                self.h[key1][key2] += (self.decay_rate) * (1-self.decay_rate) * (grads[key1][key2]**2) # 가중치 감쇠
                params[key1][key2] -= lr * grads[key1][key2] / (np.sqrt(self.h[key1][key2]) + self.eps)
                
        return params

# Momentum + RMSProp
class Adam:
    def __init__(self, lr=0.001, beta1=0.9, beta2=0.999):
        self.lr = lr
        self.beta1 = beta1
        self.beta2 = beta2
        self.iter = 0
        self.m = None
        self.v = None
        self.eps = 1e-2 #1e-7

    def update(self, params, grads):
        self.iter += 1
        lr_iter = self.lr * (1.0 - self.beta2**self.iter) / (1.0 - self.beta1**self.iter)

        if self.m is None:
            self.m = params_dict
            self.v = params_dict
            for key1 in grads.keys():
                for key2 in grads[key1].keys():
                    self.m[key1][key2] = np.zeros_like(grads[key1][key2])
                    self.v[key1][key2] = np.zeros_like(grads[key1][key2])

        for key1 in grads.keys():
            for key2 in grads[key1].keys():
                #(기존 Adam 알고리즘)
                # # 모멘텀 업데이트
                # self.m[key1][key2] = (self.beta1 * self.m[key1][key2]) + ((1.0 - self.beta1) * (grads[key1][key2]))
                # self.v[key1][key2] = (self.beta2 * self.v[key1][key2]) + ((1.0 - self.beta2) * (grads[key1][key2]**2))

                # # 편향 보정
                # m_hat = self.m[key1][key2] / (1.0 - (self.beta1**self.iter))
                # v_hat = self.v[key1][key2] / (1.0 - (self.beta2**self.iter))

                #(차분방식 Adam 알고리즘)
                self.m[key1][key2] += (1.0 - self.beta1) * (grads[key1][key2] - self.m[key1][key2])
                self.v[key1][key2] += (1.0 - self.beta1) * (grads[key1][key2]**2 - self.v[key1][key2])

                # 가중치 업데이트
                print(self.v[key1][key2][:5])
                params[key1][key2] -= lr_iter * self.m[key1][key2] / (np.sqrt(self.v[key1][key2]) + self.eps)

        return params

In [None]:
batch_size = 64

input_size = 784
output_size = 10
hidden_sizes = [8]

model = MultiLayerNet(input_size, output_size, hidden_sizes, activation='sigmoid')
X_batches, y_batches = batch_split(X_train, y_train, batch_size, shuffle=True, random_state=42)

i=0
X_batch = X_batches[i]
y_batch = y_batches[i]

# predict 확인
a = model.predict_backward(X_batch)
b = model.predict_numerical_gradient(X_batch)
print('predict diff:',np.sum((a-b)**2))

# loss 확인
a = model.loss_backward(X_batch, y_batch)
b = model.loss_numerical_gradient(X_batch, y_batch)
print('loss diff:',a-b)

# gradient 확인
grads1 = model.gradient(X_batch, y_batch)
grads2 = model.numerical_gradient(X_batch, y_batch, max_workers=1)
for key1 in grads2.keys():
    for key2 in grads2[key1].keys():
        diff = np.mean((grads1[key1][key2] - grads2[key1][key2])**2)
        print(key1,key2,diff)

(1) Custom Train

In [None]:
# epochs = 100
# batch_size = 64
# lr = 0.1

# input_size = 784
# output_size = 10
# hidden_sizes = [8]

# model = MultiLayerNet(input_size, output_size, hidden_sizes, activation='sigmoid')
# optimizer = Adam(lr=lr) # SGD, Momentum, AdaGrad, RMSProp, Adam

# X_batches, y_batches = batch_split(X_train, y_train, batch_size, shuffle=True, random_state=42)

# elapsed_times = []
# train_loss_list = []
# val_loss_list = []
# for i in range(epochs):
#     start_time = time.time()

#     train_loss = 0
#     val_loss = 0

#     # get mini-batch (실제로는 모든 6만개의 데이터에 대해서 모두 가중치 업데이트를 해야 1개의 epoch로 쳐줌)
#     pbar_batch = tqdm(zip(X_batches, y_batches), total=len(X_batches))
#     for X_batch, y_batch in pbar_batch:
#         # calculate gradients
#         grads = model.gradient(X_batch, y_batch)

#         # gradient descent
#         model.params = optimizer.update(model.params, grads)

#         for k in range(model.n_layers):
#             W = model.params['weights'][f'w{k+1}']
#             b = model.params['biases'] [f'b{k+1}']
#             model.layers_backward[f'Affine{k+1}'] = Affine(W, b)

#         # loss
#         train_loss += model.loss_backward(X_batch, y_batch)
#         val_loss   += model.loss_backward(X_test, y_test)

#     train_loss /= len(X_batches)
#     val_loss /= len(X_batches)

#     train_loss_list.append(train_loss)
#     val_loss_list  .append(val_loss)

#     # progress
#     now_time = str(datetime.datetime.now())[:-7]
#     end_time = time.time()
#     elapsed = end_time-start_time
#     elapsed_times.append(elapsed)
#     total = sum(elapsed_times)
#     remaining = (epochs-i-1) * elapsed

#     str_i = str(i+1).zfill(len(str(epochs)))
#     progress = f'{now_time} | [{str_i}/{epochs}] {train_loss=:.4f}, {val_loss=:.4f}, {elapsed=:.1f}s, {total=:.1f}s, {remaining=:.1f}s'
#     print(progress)

(2) Tensorflow

In [None]:
# import tensorflow as tf
# from tensorflow.keras.models import Sequential
# from tensorflow.keras.layers import Dense, BatchNormalization, ReLU, Dropout
# from tensorflow.keras import backend as K
# from tensorflow.keras.optimizers import Adam
# from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
# from tensorflow.keras.regularizers import l2

# def f1_score(y_true, y_pred):
#     y_pred = K.round(y_pred)
    
#     tp = K.sum(K.cast(y_true * y_pred, 'float'), axis=0)
#     fp = K.sum(K.cast((1 - y_true) * y_pred, 'float'), axis=0)
#     fn = K.sum(K.cast(y_true * (1 - y_pred), 'float'), axis=0)

#     precision = tp / (tp + fp + K.epsilon())
#     recall = tp / (tp + fn + K.epsilon())
    
#     f1 = 2 * (precision * recall) / (precision + recall + K.epsilon())
#     return K.mean(f1)

# # label -> onehot encoding
# num_classes = len(np.unique(y_train))
# y_train_onehot, y_test_onehot = label_onehot(y_train, num_classes), label_onehot(y_test, num_classes)

# with tf.device('/cpu:0'):
#     optimizer = Adam(learning_rate=0.001, beta_1=0.9, beta_2=0.999)

#     model = Sequential()
#     model.add(Dense(units=64, input_dim=784, activation='relu'))
#     model.add(Dense(units=32, input_dim=64, activation='relu'))
#     model.add(Dense(units=10, activation='softmax'))
#     model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy',f1_score])
#     model.summary()

#     # model = Sequential()
#     # model.add(Dense(units=64, input_dim=784)) # (1) Dense
#     # model.add(BatchNormalization())           # (2) Batch Normalization
#     # model.add(ReLU())                         # (3) Activation
#     # model.add(Dropout(0.5))                   # (4) Dropout
#     # model.add(Dense(units=32))
#     # model.add(BatchNormalization())
#     # model.add(ReLU())
#     # model.add(Dropout(0.5))
#     # model.add(Dense(units=10, activation='softmax'))
#     # model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy',f1_score])
#     # model.summary()

#     early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
#     # checkpoint = ModelCheckpoint('best_model.h5', save_best_only=True)

#     history = model.fit(
#         tf.convert_to_tensor(X_train),
#         tf.convert_to_tensor(y_train_onehot),
#         validation_data=(tf.convert_to_tensor(X_test), tf.convert_to_tensor(y_test_onehot)),
#         epochs=50,
#         batch_size=64,
#         callbacks=[early_stopping],
#     )

(3) Pytorch (pytorch lightning)

In [None]:
# import pytorch_lightning as pl
# import torch
# from torch import nn
# from torch.utils.data import Dataset, DataLoader
# import torchmetrics
# from pytorch_lightning.callbacks import EarlyStopping, TQDMProgressBar, RichProgressBar

# class SimpleModel(pl.LightningModule):
#     def __init__(self, input_dim, output_dim):
#         super().__init__()
#         self.layer = nn.Sequential(
#             nn.Linear(input_dim, 64),
#             nn.ReLU(),
#             nn.Linear(64, 32),
#             nn.ReLU(),
#             nn.Linear(32, output_dim)
#         )
#         self.criterion = nn.CrossEntropyLoss()
#         self.optimizer = torch.optim.AdamW(self.parameters(), lr=0.001)
#         self.f1 = torchmetrics.F1Score(task='multiclass', num_classes=output_dim, average='weighted')

#         self.prog_bar = False
#         self._callback_init()

#     def _callback_init(self):
#         self.callbacks = {
#             'train': {'loss':[], 'f1':[]},
#             'val'  : {'loss':[], 'f1':[]},
#         }

#     def forward(self, x):
#         return self.layer(x)

#     def configure_optimizers(self):
#         return self.optimizer

#     def training_step(self, batch, batch_idx):
#         x, y = batch
#         y_hat = self(x)

#         # loss
#         loss = self.criterion(y_hat, y)

#         # f1 score
#         preds = torch.argmax(y_hat, dim=1)
#         f1 = self.f1(preds, y)
        
#         self.log("train_loss", loss, prog_bar=self.prog_bar)
#         self.log("train_f1", f1, prog_bar=self.prog_bar)

#         self.callbacks['train']['loss'].append(loss.item())
#         self.callbacks['train']['f1'].append(f1.item())

#         return loss

#     def validation_step(self, batch, batch_idx):
#         x, y = batch
#         y_hat = self(x)

#         # loss
#         loss = self.criterion(y_hat, y)

#         # f1 score
#         preds = torch.argmax(y_hat, dim=1)
#         f1 = self.f1(preds, y)
        
#         self.log("val_loss", loss, prog_bar=self.prog_bar)
#         self.log("val_f1", f1, prog_bar=self.prog_bar)

#         self.callbacks['val']['loss'].append(loss.item())
#         self.callbacks['val']['f1'].append(f1.item())
        
#         return loss

#     # def on_train_epoch_end(self):
#     #     # 한 epoch이 끝나면 모든 배치의 손실을 출력
#     #     train_loss = np.mean(self.callbacks['train']['loss'])
#     #     val_loss   = np.mean(self.callbacks['val']['loss'])
#     #     train_f1   = np.mean(self.callbacks['train']['f1'])
#     #     val_f1     = np.mean(self.callbacks['val']['f1'])

#     #     # Early Stopping 콜백을 확인
#     #     for callback in self.trainer.callbacks:
#     #         if isinstance(callback, pl.callbacks.EarlyStopping):
#     #             mark = '*' if callback.wait_count==0 else ' '
#     #             es_info = '' if callback.wait_count==0 else f', early_stopping: {callback.wait_count}/{callback.patience}'

#     #     print(f'\r{mark}{train_loss=:.4f}, {val_loss=:.4f}, {train_f1=:.4f}, {val_f1=:.4f}{es_info}')
#     #     self._callback_init()

# class CustomDataset(Dataset):
#     def __init__(self, X, y):
#         self.X = torch.tensor(X, dtype=torch.float32)
#         self.y = torch.tensor(y, dtype=torch.long)

#     def __len__(self):
#         return len(self.X)

#     def __getitem__(self, idx):
#         return self.X[idx], self.y[idx]

# # prepare dataset
# train_dataset = CustomDataset(X_train, y_train)
# val_dataset = CustomDataset(X_test, y_test)

# train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
# val_loader = DataLoader(val_dataset, batch_size=64)

# # define model
# model = SimpleModel(input_dim=X_train.shape[1], output_dim=len(np.unique(y_train)))

# # callbacks
# early_stopping = EarlyStopping(
#     monitor='val_loss',   # 모니터링할 메트릭 ('val_loss'를 모니터링)
#     patience=5,           # 성능이 개선되지 않아도 몇 번의 epoch을 더 진행할지 설정
#     verbose=False,        # 학습 중지 메시지를 출력하지 않음
#     mode='min'            # 모니터링하는 값이 작을수록 좋은 경우 'min', 클수록 좋은 경우 'max'
# )
# # progress_bar = TQDMProgressBar(refresh_rate=10, leave=True)
# # progress_bar = RichProgressBar(leave=True)

# # traning
# trainer = pl.Trainer(
#     max_epochs=50,
#     callbacks=[early_stopping],
#     default_root_dir='G:\My Drive\Storage\Github\hyuckjinkim\.logs',
# )
# trainer.fit(model, train_loader, val_loader)

# # tensorboard --logdir="G:\My Drive\Storage\Github\hyuckjinkim\.logs\lightning_logs"

In [None]:
# 가중치 초기값에 따른 활성화 값의 분포 확인 : 204p
batch_size = 1024

input_size = 784
output_size = 10
hidden_sizes = [256,256,256,256,256]

X_batches, y_batches = batch_split(X_train, y_train, batch_size, shuffle=True, random_state=42)

i=0
X_batch, y_batch = X_batches[i], y_batches[i]

In [None]:
print('> Activation Function: Sigmoid')

# (1) N(0,1) + sigmoid
model = MultiLayerNet(input_size, output_size, hidden_sizes, activation='sigmoid', weight_init=None, weight_init_limit=1)
_, activations = model.predict_numerical_gradient(X_batch, return_activations=True)

fig = plt.figure(figsize=(15,3))
for i,activation in enumerate(activations):
    fig.add_subplot(1, len(activations), i+1)
    plt.hist(activation.flatten(), bins=30, range=(0,1))
    plt.grid(alpha=0.5)
    if i+1 == len(activations):
        plt.title(f'{i+1}-layer\n(softmax)')
    else:
        plt.title(f'{i+1}-layer\n(sigmoid)')
plt.suptitle('(1) N(0,1)', fontsize=20)
plt.tight_layout()
plt.show()

# (2) N(0,0.01) + sigmoid
model = MultiLayerNet(input_size, output_size, hidden_sizes, activation='sigmoid', weight_init=None, weight_init_limit=0.01)
_, activations = model.predict_numerical_gradient(X_batch, return_activations=True)

fig = plt.figure(figsize=(15,3))
for i,activation in enumerate(activations):
    fig.add_subplot(1, len(activations), i+1)
    plt.hist(activation.flatten(), bins=30, range=(0,1))
    plt.grid(alpha=0.5)
    if i+1 == len(activations):
        plt.title(f'{i+1}-layer\n(softmax)')
    else:
        plt.title(f'{i+1}-layer\n(sigmoid)')
plt.suptitle('(2) N(0,0.01)', fontsize=20)
plt.tight_layout()
plt.show()

# (3) Xavier + sigmoid
model = MultiLayerNet(input_size, output_size, hidden_sizes, activation='sigmoid', weight_init='Xavier')
_, activations = model.predict_numerical_gradient(X_batch, return_activations=True)

fig = plt.figure(figsize=(15,3))
for i,activation in enumerate(activations):
    fig.add_subplot(1, len(activations), i+1)
    plt.hist(activation.flatten(), bins=30, range=(0,1))
    plt.grid(alpha=0.5)
    if i+1 == len(activations):
        plt.title(f'{i+1}-layer\n(softmax)')
    else:
        plt.title(f'{i+1}-layer\n(sigmoid)')
plt.suptitle('(3) Xavier', fontsize=20)
plt.tight_layout()
plt.show()

# (4) He + sigmoid
model = MultiLayerNet(input_size, output_size, hidden_sizes, activation='sigmoid', weight_init='He')
_, activations = model.predict_numerical_gradient(X_batch, return_activations=True)

fig = plt.figure(figsize=(15,3))
for i,activation in enumerate(activations):
    fig.add_subplot(1, len(activations), i+1)
    plt.hist(activation.flatten(), bins=30, range=(0,1))
    plt.grid(alpha=0.5)
    if i+1 == len(activations):
        plt.title(f'{i+1}-layer\n(softmax)')
    else:
        plt.title(f'{i+1}-layer\n(sigmoid)')
plt.suptitle('(4) He', fontsize=20)
plt.tight_layout()
plt.show()

In [None]:
print('> Activation Function: ReLU')

# (1) N(0,1) + ReLU
model = MultiLayerNet(input_size, output_size, hidden_sizes, activation='relu', weight_init=None, weight_init_limit=1)
_, activations = model.predict_numerical_gradient(X_batch, return_activations=True)

fig = plt.figure(figsize=(15,3))
for i,activation in enumerate(activations):
    fig.add_subplot(1, len(activations), i+1)
    plt.hist(activation.flatten(), bins=30, range=(0,1))
    plt.grid(alpha=0.5)
    if i+1 == len(activations):
        plt.title(f'{i+1}-layer\n(softmax)')
    else:
        plt.title(f'{i+1}-layer\n(relu)')
plt.suptitle('(1) N(0,1)', fontsize=20)
plt.tight_layout()
plt.show()

# (2) N(0,0.01) + ReLU
model = MultiLayerNet(input_size, output_size, hidden_sizes, activation='relu', weight_init=None, weight_init_limit=0.01)
_, activations = model.predict_numerical_gradient(X_batch, return_activations=True)

fig = plt.figure(figsize=(15,3))
for i,activation in enumerate(activations):
    fig.add_subplot(1, len(activations), i+1)
    plt.hist(activation.flatten(), bins=30, range=(0,1))
    plt.grid(alpha=0.5)
    if i+1 == len(activations):
        plt.title(f'{i+1}-layer\n(softmax)')
    else:
        plt.title(f'{i+1}-layer\n(relu)')
plt.suptitle('(2) N(0,0.01)', fontsize=20)
plt.tight_layout()
plt.show()

# (3) Xavier + ReLU
model = MultiLayerNet(input_size, output_size, hidden_sizes, activation='relu', weight_init='Xavier')
_, activations = model.predict_numerical_gradient(X_batch, return_activations=True)

fig = plt.figure(figsize=(15,3))
for i,activation in enumerate(activations):
    fig.add_subplot(1, len(activations), i+1)
    plt.hist(activation.flatten(), bins=30, range=(0,1))
    plt.grid(alpha=0.5)
    if i+1 == len(activations):
        plt.title(f'{i+1}-layer\n(softmax)')
    else:
        plt.title(f'{i+1}-layer\n(relu)')
plt.suptitle('(3) Xavier', fontsize=20)
plt.tight_layout()
plt.show()

# (4) He + ReLU
model = MultiLayerNet(input_size, output_size, hidden_sizes, activation='relu', weight_init='He')
_, activations = model.predict_numerical_gradient(X_batch, return_activations=True)

fig = plt.figure(figsize=(15,3))
for i,activation in enumerate(activations):
    fig.add_subplot(1, len(activations), i+1)
    plt.hist(activation.flatten(), bins=30, range=(0,1))
    plt.grid(alpha=0.5)
    if i+1 == len(activations):
        plt.title(f'{i+1}-layer\n(softmax)')
    else:
        plt.title(f'{i+1}-layer\n(relu)')
plt.suptitle('(4) He', fontsize=20)
plt.tight_layout()
plt.show()

In [None]:
# batch normalization : 212p
# -> https://github.com/WegraLee/deep-learning-from-scratch/blob/3e4f6368b17ee4159c820197426eefb0b1315a9b/common/layers.py#L114
class BatchNormalization:
    def __init__(self, gamma, beta, momentum=0.9, running_mean=None, running_var=None, eps=1e-6):
        self.gamma = gamma
        self.beta = beta
        self.momentum = momentum
        self.eps = eps

        # 합성곱 계층은 4차원, 완전연결 계층은 2차원
        self.input_shape = None

        # 시험할 때 사용할 평균과 분산
        self.running_mean = running_mean
        self.running_var = running_var

        # backward 시에 사용할 중간 데이터
        self.batch_size = None
        self.xc = None
        self.std = None
        self.dgamma = None
        self.dbeta = None

    def __reshape(self, x):
        if x.ndim != 2:
            # (batch_size, channel, height, width)
            # - channel: RGB(3), Gray(1)
            # - height, width : height, width of image (64x64 or 256x256)
            N, C, H, W = x.shape 
            x = x.reshape(N, -1)
        return x

    def __forward(self, x, train_flg):
        if self.running_mean is None:
            # N: batch size, D: input size(feature size)
            N, D = x.shape
            self.running_mean = np.zeros(D)
            self.running_var = np.zeros(D)

        if train_flg:
            mu = np.mean(x, axis=0)
            xc = x - mu                  # centered
            var = np.mean(xc**2, axis=0) # (mu, xc, xn)에 대한 정보를 얻기위해서 np.std를 사용하지 않고 분산공식으로 계산
            std = np.sqrt(var + self.eps)
            xn = xc / std                # normalized

            self.batch_size = x.shape[0]
            self.xc = xc
            self.xn = xn
            self.std = std

            self.running_mean = self.momentum * self.running_mean + (1-self.momentum) * mu
            self.running_var  = self.momentum * self.running_var  + (1-self.momentum) * var
        else:
            xc = x - self.running_mean
            xn = xc / np.sqrt(self.running_var + self.eps)

        # (x-mu)/std의 inverse transform = scale * normalized + shift
        out = self.gamma * xn + self.beta

        return out

    def forward(self, x, train_flg):
        self.input_shape = x.shape

        x = self.__reshape(x)
        out = self.__forward(x, train_flg)
        out = out.reshape(*self.input_shape)

        return out
        
    def __backward(self, dout):
        # backward by Chain Rule
        # - (역전파 그림 참조) https://velog.io/@clayryu328/밑바닥부터-시작하는-딥러닝-13-배치정규화
        #   -> (4번, 6번에서 참조하면 좋음)

        # (1) y = gamma * xn + beta
        dbeta = np.sum(dout, axis=0)            #-> dL/dbeta = dL/dy * dy/dbeta = dL/dy * 1 = dL/dy
        dgamma = np.sum(dout * self.xn, axis=0) #-> dL/dgamma = dL/dy * dy/dgamma = dL/dy * xn
        # print(dgamma[:3])
        dxn = dout * self.gamma                 #-> dL/dxn = dL/dy * dy/dxn = dL/dy * gamma

        # (2) xn = xc / std
        dstd = - np.sum(dxn * self.xc / (self.std**2), axis=0) #-> dL/dstd = dL/dxn * dxn/dstd = dL/dxn * xc * -(1/std)**2
        
        # (3) std = var**(1/2)
        dvar = dstd * 0.5 / self.std #dL/dvar = dL/dstd * dstd/dvar = dL/dstd * (1/2) * var**(-1/2) = dL/dstd * (1/2) * (1/std)

        # (4) var = 1/n * sum(xc**2)
        #  두개의 xc가 합류를 하기 때문에 더해준다
        #  (4-1) xn = xc / std           -> dL/dxc = dL/dxn * dxn/dxc = dL/dxn * (1/std)
        #  (4-2) var = 1/n * sum(xc**2)  -> dL/dxc = dL/dvar * dvar/dxc = dL/dvar * (2/n) * xc
        dxc = (dxn / self.std) + (dvar * (2.0/self.batch_size) * self.xc)

        # (5) xc = x - mu
        dmu = np.sum(dxc, axis=0) # dL/dmu = dL/dxc * dxc/dmu = dL/dxc * 1 = dL/dxc

        # (6) xc와 mu의 평균이 합류를 하기 때문에 더해준다
        #  (6-1) xc = x - mu        -> dL/dx = dL/dxc * dxc/dx = dL/dxc * 1 = dL/dxc
        #  (6-2) mu = 1/N * sum(x)  -> dL/dx = dL/dmu * dmu/dx = dL/dmu * (1/N)
        dx = dxc - (dmu / self.batch_size)

        self.dgamma = dgamma
        self.dbeta = dbeta

        return dx

    def backward(self, dout):
        dout = self.__reshape(dout)
        dout = self.__backward(dout)
        dout = dout.reshape(*self.input_shape)
        return dout

In [None]:
# dropout : 219p
class Dropout:
    def __init__(self, dropout_rate=0.5):
        self.dropout_rate = dropout_rate
        self.mask = None

    def forward(self, x, train_flg=True):
        if train_flg:
            self.mask = np.random.rand(*x.shape) > self.dropout_rate # dropout_rate보다 높은건(삭제안할거) True, 낮은건(삭제할거) False
            return x * self.mask # dropout_rate보다 높은건 그대로 보내고, 낮은건 0으로 없앰
        else:
            return x * (1.0 - self.dropout_rate) # 모든 뉴런을 활성화하되, 훈련 시의 비활성화된 뉴런을 고려하여 값을 "보정"해야함. 이때, 1 - dropout_rate로 보정하는 것이 일반적임.
                                                 # dropout_rate 만큼 감소되었다고 치고 보정함.

    def backward(self, dout):
        return dout * self.mask

In [None]:
from tqdm import tqdm, trange
from collections import OrderedDict
from copy import deepcopy
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed

class MultiLayerNet:
    def __init__(self, input_size, output_size, hidden_sizes, activation=None, weight_init='auto',
                 dropout=None, weight_decay_lambda=0, random_state=42, batchnorm=False):
        assert str(activation).lower() in ['none','sigmoid','relu','leakyrelu'], "activation must be one of ['sigmoid','relu','leakyrelu'] or None."
        assert str(weight_init).lower() in ['auto','xavier','he','none'], "weight_init must be one of ['auto','Xavier','He',None]."
        assert (dropout is None) or (0<=dropout<=1), "dropout must be between 0 and 1 or None."

        self.input_size = input_size
        self.output_size = output_size
        self.hidden_sizes = hidden_sizes
        self.dropout = dropout
        self.weight_decay_lambda = weight_decay_lambda
        self.batchnorm = batchnorm

        self.node_sizes = [input_size] + hidden_sizes + [output_size]
        self.n_layers = len(self.node_sizes)-1

        # activation
        self.activation = self.__activation_init(activation)

        # weight init
        self.__weight_init(weight_init, activation, random_state)

        # define layers
        self.__layers_init()

    def __activation_init(self, activation):
        if str(activation).lower()=='none':
            activation = 'sigmoid'
        else:
            activation = str(activation).lower()

        activation_dict = {
            'sigmoid' : Sigmoid(),
            'relu' : ReLU(),
            'leakyrelu' : LeakyReLU(),
        }
        return activation_dict.get(activation)

    # > 책 205p
    # > torch 공식문서
    #   - nn.Linear: https://pytorch.org/docs/stable/generated/torch.nn.Linear.html
    #   - nn.Conv2d: https://pytorch.org/docs/stable/generated/torch.nn.Conv2d.html
    def __weight_init(self, method, activation, seed=42):
        method = str(method).lower()
        if method == 'auto':
            if activation == 'sigmoid':
                method = 'xavier'
            elif activation in ['relu','leakyrelu']:
                method = 'he'
            else:
                method = 'none'

        np.random.seed(seed)
        self.params = {'weights': OrderedDict(), 'biases': OrderedDict(), 'gamma': OrderedDict(), 'beta': OrderedDict()}

        for i in range(self.n_layers):
            input_dim = self.node_sizes[i]
            output_dim = self.node_sizes[i+1]

            if method == 'none':
                scale = 0.01
            elif method == 'xavier':
                scale = np.sqrt(1 / input_dim)   # Xavier initialization
            elif method == 'he':
                scale = np.sqrt(2 / input_dim)   # He initialization
            else:
                raise ValueError

            self.params['weights'][f'w{i+1}'] = scale * np.random.randn(input_dim, output_dim)
            self.params['biases'] [f'b{i+1}'] = scale * np.random.randn(output_dim)

    def __layers_init(self):
        self.layers = OrderedDict()
        for i in range(self.n_layers):
            is_last = (i+1 == self.n_layers)
            input_dim = self.node_sizes[i]
            output_dim = self.node_sizes[i+1]

            # Affine
            W = self.params['weights'][f'w{i+1}']
            b = self.params['biases'] [f'b{i+1}']
            self.layers[f'Affine{i+1}'] = Affine(W,b)

            # Batch Normalization
            if (self.batchnorm) and (not is_last):
                self.params['gamma'][f'g{i+1}'] = np.ones(output_dim)
                self.params['beta'] [f'b{i+1}'] = np.zeros(output_dim)
                self.layers[f'BatchNorm{i+1}'] = BatchNormalization(self.params['gamma'][f'g{i+1}'], self.params['beta'] [f'b{i+1}'])

            # Activation
            if not is_last:
                self.layers[f'Activation{i+1}'] = deepcopy(self.activation)

            # Dropout
            if (self.dropout is not None) and (not is_last):
                self.layers[f'Dropout{i+1}'] = Dropout(self.dropout)

        self.last_layer = SoftmaxWithLoss()

    def predict(self, x, train_flg=False):
        for key, layer in self.layers.items():
            if ("dropout" in key.lower()) or ("batchnorm" in key.lower()):
                x = layer.forward(x, train_flg)
            else:
                x = layer.forward(x)
        return x

    def loss(self, x, y_true, train_flg=False):
        logits = self.predict(x, train_flg)

        # weight decay (L2 정규화) : 217p
        # -> https://github.com/WegraLee/deep-learning-from-scratch/blob/3e4f6368b17ee4159c820197426eefb0b1315a9b/common/multi_layer_net_extend.py#L91
        # -> 큰 가중치에 대해서 그에 상응하는 큰 페널티를 부과하여 오버피팅을 억제
        weight_decay = 0
        for i in range(self.n_layers):
            W = self.params['weights'][f'w{i+1}']
            weight_decay += 0.5 * self.weight_decay_lambda * np.sum(W**2)

        loss = self.last_layer.forward(y=logits, t=y_true) + weight_decay
        
        return loss

    def accuracy(self, x, y_true):
        logits = self.predict(x, train_flg=False)
        y_pred = np.argmax(logits, axis=1)
        if y_true.ndim != 1:
            y_true = np.argmax(y_true, axis=1)

        acc = np.sum(y_true==y_pred) / float(x.shape[0])
        
        return acc

    def gradient(self, x, y_true):
        # 순전파
        self.loss(x, y_true, train_flg=True)

        # 역전파
        dout = self.last_layer.backward()
        for key, layer in reversed(self.layers.items()):
            dout = layer.backward(dout)

        # params 저장
        params = {'weights': OrderedDict(), 'biases': OrderedDict(), 'gamma': OrderedDict(), 'beta': OrderedDict()}
        for i in range(self.n_layers):
            is_last = (i+1 == self.n_layers)
            params['weights'][f'w{i+1}'] = self.layers[f'Affine{i+1}'].dW + (self.weight_decay_lambda * self.params['weights'][f'w{i+1}'])
            params['biases'] [f'b{i+1}'] = self.layers[f'Affine{i+1}'].db
            if self.batchnorm and (not is_last):
                params['gamma'][f'g{i+1}'] = self.layers[f'BatchNorm{i+1}'].dgamma
                params['beta'] [f'b{i+1}'] = self.layers[f'BatchNorm{i+1}'].dbeta

        return params

In [None]:
epochs = 100
batch_size = 64
lr = 0.01

input_size = 784
output_size = 10
hidden_sizes = [8]

model = MultiLayerNet(input_size, output_size, hidden_sizes, activation='LeakyReLU', dropout=0.5, weight_decay_lambda=1e-5, batchnorm=True)
optimizer = SGD(lr=lr) # SGD, Momentum, AdaGrad, RMSProp, Adam

X_batches, y_batches = batch_split(X_train, y_train, batch_size, shuffle=True, random_state=42)

elapsed_times = []
for i in range(epochs):
    start_time = time.time()

    train_loss = 0
    val_loss = 0

    train_acc = 0
    val_acc = 0

    # get mini-batch (실제로는 모든 6만개의 데이터에 대해서 모두 가중치 업데이트를 해야 1개의 epoch로 쳐줌)
    pbar_batch = tqdm(zip(X_batches, y_batches), total=len(X_batches))
    for X_batch, y_batch in pbar_batch:
        # calculate gradients
        grads = model.gradient(X_batch, y_batch)

        # gradient descent
        model.params = optimizer.update(model.params, grads)

        for k in range(model.n_layers):
            W = model.params['weights'][f'w{k+1}']
            b = model.params['biases'] [f'b{k+1}']
            model.layers[f'Affine{k+1}'] = Affine(W, b)

        # loss
        train_loss += model.loss(X_batch, y_batch, train_flg=False)
        val_loss   += model.loss(X_test , y_test , train_flg=False)

        # acc
        train_acc += model.accuracy(X_batch, y_batch)
        val_acc   += model.accuracy(X_test , y_test)

    # calculate loss
    train_loss /= len(X_batches)
    val_loss /= len(X_batches)

    # calculate accuracy
    train_acc /= len(X_batches)
    val_acc /= len(X_batches)

    # progress
    now_time = str(datetime.datetime.now())[:-7]
    end_time = time.time()
    elapsed = end_time-start_time
    elapsed_times.append(elapsed)
    total = sum(elapsed_times)
    remaining = (epochs-i-1) * elapsed

    str_i = str(i+1).zfill(len(str(epochs)))
    progress = f'{now_time} | [{str_i}/{epochs}] {train_loss=:.4f}, {val_loss=:.4f}, {train_acc=:.4f}, {val_acc=:.4f}, {elapsed=:.1f}s, {total=:.1f}s, {remaining=:.1f}s'
    print(progress)