In [1]:
# implemented and written by Yeoreum Lee in AI HnV Lab @ Sahmyook University in 2023
__author__ = 'leeyeoreum02'

In [2]:
from typing import Tuple, Callable

import numpy as np
import pandas as pd

### 1. 데이터 불러오기

메인 교재와 서브 강의만을 활용하여 pandas로 csv 파일에 저장되어 있는 MNIST 데이터셋을 불러오시오. (구글링 금지)

In [3]:
train_data = pd.read_csv('data/mnist_train.csv')
test_data = pd.read_csv('data/mnist_test.csv')

print(train_data.shape, test_data.shape)
train_data.head()

(60000, 785) (10000, 785)


Unnamed: 0,label,1x1,1x2,1x3,1x4,1x5,1x6,1x7,1x8,1x9,...,28x19,28x20,28x21,28x22,28x23,28x24,28x25,28x26,28x27,28x28
0,5,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
1,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
2,4,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
3,1,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
4,9,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


In [4]:
# shuffle
train_data = train_data.sample(frac=1)

print(train_data.shape, test_data.shape)
train_data.head()

(60000, 785) (10000, 785)


Unnamed: 0,label,1x1,1x2,1x3,1x4,1x5,1x6,1x7,1x8,1x9,...,28x19,28x20,28x21,28x22,28x23,28x24,28x25,28x26,28x27,28x28
13341,8,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
48698,2,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
31206,8,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
37930,7,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
40323,7,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


### 2. 데이터 나누기(split)

메인 교재와 서브 강의만을 이용하여 data를 feature와 정답(label)로 나누는 코드를 구현하시오. (구글링 금지)


In [5]:
def split_data(train_data: pd.DataFrame, test_data: pd.DataFrame) -> Tuple[np.ndarray]:
    test_x_data = np.array(test_data.iloc[:, 1:])
    test_t_data = np.array(test_data.iloc[:, 0])
    train_x_data = np.array(train_data.iloc[:, 1:])
    train_t_data = np.array(train_data.iloc[:, 0])
    
    return train_x_data, train_t_data, test_x_data, test_t_data


train_x_data, train_t_data, test_x_data, test_t_data = split_data(train_data, test_data)
print(train_x_data.shape, train_t_data.shape, test_x_data.shape, test_t_data.shape,)

(60000, 784) (60000,) (10000, 784) (10000,)


### 3. 데이터 전처리(preprocessing)

a. 메인 교재와 서브 강의를 활용하여 train data를 normalize하시오. (구글링 가능)

$$normalize(X) = \frac {X - min} {max - min} \space (max - min \neq 0)$$

b. 메인 교재와 서브 강의를 활용하여 test data를 one-hot encoding하시오. (구글링 가능)

In [6]:
def preprocess(train_x_data: np.ndarray, train_t_data: np.ndarray, test_x_data: np.ndarray, test_t_data: np.ndarray) -> Tuple[np.ndarray]:
    train_x_data = np.array(train_x_data) / 255.0  # 0~1 min-max scaling (normalization)
    train_x_data = train_x_data * 0.99 + 0.01  # smoothing
    
    test_x_data = np.array(test_x_data) / 255.0  # 0~1 min-max scaling (normalization)
    test_x_data = test_x_data * 0.99 + 0.01  # smoothing

    num_classes = 10  # 0~9
    train_t_data_onehot = np.zeros((train_t_data.shape[0], num_classes), dtype=np.float32) + 0.01  # one-hot encoding (vectorization) + smoothing
    for i in range(len(train_t_data_onehot)):
        train_t_data_onehot[i, train_t_data[i]] = 0.99  # smoothing

    test_t_data_onehot = np.zeros((test_t_data.shape[0], num_classes), dtype=np.float32) + 0.01  # one-hot encoding (vectorization) + smoothing
    for i in range(len(test_t_data_onehot)):
        test_t_data_onehot[i, test_t_data[i]] = 0.99  # smoothing
    
    return train_x_data, train_t_data_onehot, test_x_data, test_t_data_onehot

    
train_x_data, train_t_data_onehot, test_x_data, test_t_data_onehot = preprocess(train_x_data, train_t_data, test_x_data, test_t_data)
print(train_t_data_onehot[:5])
print(train_x_data.shape, train_t_data_onehot.shape, test_x_data.shape, test_t_data_onehot.shape)

[[0.01 0.01 0.01 0.01 0.01 0.01 0.01 0.01 0.99 0.01]
 [0.01 0.01 0.99 0.01 0.01 0.01 0.01 0.01 0.01 0.01]
 [0.01 0.01 0.01 0.01 0.01 0.01 0.01 0.01 0.99 0.01]
 [0.01 0.01 0.01 0.01 0.01 0.01 0.01 0.99 0.01 0.01]
 [0.01 0.01 0.01 0.01 0.01 0.01 0.01 0.99 0.01 0.01]]
(60000, 784) (60000, 10) (10000, 784) (10000, 10)


### 4. 활성 함수(activation function)

메인 교재와 서브 강의만을 이용하여 활성 함수 중 하나인 softmax 함수를 구현하시오. (구글링 금지)

$$\sigma(\boldsymbol{z})_{i} = \frac {e^{z_{i}}} {\sum_{i=1} ^K e^{z_{i}}}$$

In [7]:
def sigmoid(x: np.ndarray) -> np.ndarray:
    return 1 / (1 + np.exp(-x))

def softmax(z: np.ndarray) -> np.ndarray:
    return np.exp(z) / np.sum(np.exp(z))

### 5. 신경망(neural network) 모델

메인 교재와 서브 강의만을 이용하여 신경망(neural network) 모델을 구현하시오. (구글링 금지)

$$f(W^{(2)}, b^{(2)})(\boldsymbol{x}) = sigmoid(W^{(2)}\boldsymbol{x} + b^{(2)})$$
$$g(W^{(3)}, b^{(3)})(\boldsymbol{x}) = \sigma(W^{(3)}\boldsymbol{x} + b^{(3)})$$
$$
\begin{matrix}
y &=& h(W^{(2)}, b^{(2)}, W^{(3)}, b^{(3)})(\boldsymbol{x}) \\
  &=& (g(W^{(3)}, b^{(3)}) \circ f(W^{(2)}, b^{(2)}))(\boldsymbol{x}) \\
  &=& g(W^{(3)}, b^{(3)})(f(W^{(2)}, b^{(2)})(\boldsymbol{x})) \\
  &=& \sigma(W^{(3)}sigmoid(W^{(2)}\boldsymbol{x} + b^{(2)}) + b^{(3)})
\end{matrix}
$$

In [8]:
class NeuralNetwork:
    def __init__(self, n_input: int, n_output: int, n_hidden: int = 128) -> None:
        self.W2 = np.random.randn(n_input, n_hidden)  # he initialization
        self.b2 = np.random.randn(n_hidden)
        self.W3 = np.random.randn(n_hidden, n_output)
        self.b3 = np.random.randn(n_output)
        
    def forward(self, x: np.ndarray) -> np.ndarray:
        a1 = x
        z2 = a1 @ self.W2 + self.b2
        a2 = sigmoid(z2)
        z3 = a2 @ self.W3 + self.b3
        y = a3 = softmax(z3)
        return y

    def __call__(self, x: np.ndarray) -> np.ndarray:
        return self.forward(x)
    
    
model = NeuralNetwork(n_input=784, n_output=10)

### 6. 오차 함수 (error function, loss function)

주어진 수식만을 이용하여 CE(cross entropy) 오차 함수를 구현하시오. (구글링, 메인 교재 참고, 서브 강의 참고 금지)
- delta는 log의 진수 조건을 만족하기 위해 필요하므로 반드시 사용
- N은 데이터 개수 (행 개수)
- $y$는 정답(label) $\hat{y}$은 예측값(prediction)

$$CE = -\sum_{i=1} ^N (\boldsymbol{y_{i}} \cdot \log \boldsymbol{\hat{y_{i}}} + (1 - \boldsymbol{y_{i}}) \cdot \log (1 - \boldsymbol{\hat{y_{i}}}))$$

In [9]:
def cross_entropy_loss(y_data: np.ndarray, t_data: np.ndarray) -> np.ndarray:
    delta = 1e-4
    return -np.sum(t_data * np.log(y_data + delta) + (1 - t_data) * np.log((1 - y_data) + delta))

### 7. 편미분 함수 (partial numerical derivative)

- n은 input node 개수, m은 hidden node 개수, o는 output node 개수

$$J(W^{(2)}) = \frac{dE} {dW^{(2)}} = \begin{pmatrix} 
\frac{\partial E} {\partial W^{(2)}_{11}} & \cdots & \frac {\partial E} {\partial W^{(2)}_{m1}} \\
\vdots & \ddots & \vdots \\ 
\frac {\partial E} {\partial W^{(2)}_{1n}} & \cdots & \frac {\partial E} {\partial W^{(2)}_{mn}} \end{pmatrix}$$

$$J(W^{(3)}) = \frac{dE} {dW^{(3)}} = \begin{pmatrix} 
\frac{\partial E} {\partial W^{(3)}_{11}} & \cdots & \frac {\partial E} {\partial W^{(3)}_{o1}} \\
\vdots & \ddots & \vdots \\ 
\frac {\partial E} {\partial W^{(3)}_{1m}} & \cdots & \frac {\partial E} {\partial W^{(3)}_{om}} \end{pmatrix}$$

$$J(\boldsymbol{b^{(2)}}) = \frac{dE} {d\boldsymbol{b^{(2)}}} = \begin{pmatrix} \frac{\partial E} {\partial b^{(2)}_{1}} \frac{\partial E} {\partial b^{(2)}_{2}} \end{pmatrix}$$

$$J(\boldsymbol{b^{(3)}}) = \frac{dE} {d\boldsymbol{b^{(3)}}} = \begin{pmatrix} \frac{\partial E} {\partial b^{(3)}_{1}} \frac{\partial E} {\partial b^{(3)}_{2}} \end{pmatrix}$$

In [10]:
def numerical_derivative(f: Callable, x: np.ndarray) -> np.ndarray:
    h = 1e-4
    grad = np.zeros_like(x)
    
    it = np.nditer(x, flags=['multi_index'], op_flags=['readwrite'])
    
    while not it.finished:
        idx = it.multi_index
        
        temp = x[idx]
        x[idx] = float(temp) + h
        fx1 = f(x)
        
        x[idx] = temp - h
        fx2 = f(x)
        grad[idx] = (fx1 - fx2) / (2 * h)
        
        x[idx] = temp
        it.iternext()
        
    return grad

### 8-1. 모델 학습 (train)

메인 교재와 서브 강의만을 이용하여 다음 순서를 가지는 학습 코드를 구현하시오. (구글링 금지)
- 한꺼번에 다 학습 -> batch(뭉텅이) gradient descent

1. 모델 순전파 (forward)
2. 오차 계산 (loss)
3. 모델 파라미터(가중치 + 편향) 별 오차 함수의 편미분값 계산 (numerical derivative)
4. 가중치(weight), 편향(bias) 갱신 (경사 하강법, gradient descent)

In [11]:
def train_batch() -> None:
    lr = 1e-2
    
    for epoch in range(10):
        y_data = model(train_x_data)
        loss = cross_entropy_loss(y_data, train_t_data_onehot)

        f = lambda W2, b2: sigmoid(train_x_data @ W2 + b2)  # f(W2, b2)(x) = sigmoid(W2x + b2)
        h = lambda W2, b2, W3, b3: softmax(f(W2, b2) @ W3 + b3)  # y = h(W2, b2, W3, b3)(x) = g(W3, b3)(f(W2, b2)(x)) = softmax(W3(sigmoid(W3x + b2)) + b3)
        
        E_w2 = lambda W2: cross_entropy_loss(h(W2, model.b2, model.W3, model.b3), train_t_data_onehot)
        E_b2 = lambda b2: cross_entropy_loss(h(model.W2, b2, model.W3, model.b3), train_t_data_onehot)
        E_w3 = lambda W3: cross_entropy_loss(h(model.W2, model.b2, W3, model.b3), train_t_data_onehot)
        E_b3 = lambda b3: cross_entropy_loss(h(model.W2, model.b2, model.W3, b3), train_t_data_onehot)
        
        model.W2 -= lr * numerical_derivative(E_w2, model.W2)
        model.b2 -= lr * numerical_derivative(E_b2, model.b2)
        model.W3 -= lr * numerical_derivative(E_w3, model.W3)
        model.b3 -= lr * numerical_derivative(E_b3, model.b3)

        print(f'Epoch: {epoch}, loss {loss}')


train_batch()

KeyboardInterrupt: 

### 8-2. 모델 학습 (train)

메인 교재와 서브 강의만을 이용하여 다음 순서를 가지는 학습 코드를 구현하시오. (구글링 금지)
- mini-batch 단위로 학습 -> mini-batch gradient descent
- 만약 batch size가 1인 경우 -> stocastic gradient descent
- 메모리가 부족해 한꺼번에 다 안들어갈 경우 위의 방식들을 사용
- 여기선 메모리가 부족하진 않지만 loss 변화가 너무 늦게 출력되니까 사용

1. 모델 순전파 (forward)
2. 오차 계산 (loss)
3. 모델 파라미터(가중치 + 편향) 별 오차 함수의 편미분값 계산 (numerical derivative)
4. 가중치(weight), 편향(bias) 갱신 (경사 하강법, gradient descent)

In [12]:
def train_mini_batch() -> None:
    lr = 1e-3
    batch_size = 1
    
    for epoch in range(10):
        for step in range(len(train_x_data) // batch_size):
            x_batch = train_x_data[step*batch_size:(step+1)*batch_size]
            t_batch = train_t_data_onehot[step*batch_size:(step+1)*batch_size]
            
            y_data = model(x_batch)
            loss = cross_entropy_loss(y_data, t_batch)

            f = lambda W2, b2: sigmoid(x_batch @ W2 + b2)  # f(W2, b2)(x) = sigmoid(W2x + b2)
            h = lambda W2, b2, W3, b3: softmax(f(W2, b2) @ W3 + b3)  # y = h(W2, b2, W3, b3)(x) = g(W3, b3)(f(W2, b2)(x)) = softmax(W3(sigmoid(W3x + b2)) + b3)
            
            E_w2 = lambda W2: cross_entropy_loss(h(W2, model.b2, model.W3, model.b3), t_batch)
            E_b2 = lambda b2: cross_entropy_loss(h(model.W2, b2, model.W3, model.b3), t_batch)
            E_w3 = lambda W3: cross_entropy_loss(h(model.W2, model.b2, W3, model.b3), t_batch)
            E_b3 = lambda b3: cross_entropy_loss(h(model.W2, model.b2, model.W3, b3), t_batch)
            
            model.W2 -= lr * numerical_derivative(E_w2, model.W2)
            model.b2 -= lr * numerical_derivative(E_b2, model.b2)
            model.W3 -= lr * numerical_derivative(E_w3, model.W3)
            model.b3 -= lr * numerical_derivative(E_b3, model.b3)

            if step % 1 == 0:
                print(f'Epoch: {epoch}, step: {step}, loss {loss}')
                
                
train_mini_batch()

Epoch: 0, step: 0, loss 15.511046708496744
Epoch: 0, step: 1, loss 14.832540182267893
Epoch: 0, step: 2, loss 14.159318050945908
Epoch: 0, step: 3, loss 16.963974420043456
Epoch: 0, step: 4, loss 15.270109633053956
Epoch: 0, step: 5, loss 11.896099195846023
Epoch: 0, step: 6, loss 12.174195559059925
Epoch: 0, step: 7, loss 17.550386223579913
Epoch: 0, step: 8, loss 18.25124486926252
Epoch: 0, step: 9, loss 12.393886603924248
Epoch: 0, step: 10, loss 10.784711657983447
Epoch: 0, step: 11, loss 10.039738444914
Epoch: 0, step: 12, loss 11.053194409087952
Epoch: 0, step: 13, loss 13.270621767406599


KeyboardInterrupt: 