# 성능 고도화 실습

In [1]:
import random

import numpy as np


SEED = 4321
random.seed(SEED)
np.random.seed(SEED)

## MNIST 데이터셋 가져오는 함수 만들기

In [2]:
import gzip
import pickle
from pathlib import Path
from urllib.request import urlretrieve


class MNISTDataset:
    DATASET_DIR = Path("./data/MNIST")  # 다운받을 data directory 먼저 초기화

    URL_MNIST = "https://ossci-datasets.s3.amazonaws.com/mnist/"

    KEY_FILES = {
        "train_img": "train-images-idx3-ubyte.gz",
        "train_label": "train-labels-idx1-ubyte.gz",
        "test_img": "t10k-images-idx3-ubyte.gz",
        "test_label": "t10k-labels-idx1-ubyte.gz",
    }

    DATASET_PATH = DATASET_DIR / "mnist.pkl"

    def __init__(self):
        self.DATASET_DIR.mkdir(parents=True, exist_ok=True)

    def load(
        self,
    ) -> tuple[
        np.ndarray[np.ndarray[np.float32]],
        np.ndarray[np.ndarray[np.int8]],
        np.ndarray[np.ndarray[np.float32]],
        np.ndarray[np.ndarray[np.int8]],
    ]:
        """
        :return: 학습 이미지, 학습 레이블, 테스트 이미지, 테스트 레이블 순으로 ndarray 로 반환
        """
        if not self.DATASET_PATH.exists():
            self._init_dataset()

        with self.DATASET_PATH.open("rb") as f:
            dataset = pickle.load(f)
        return dataset["train_img"], dataset["train_label"], dataset["test_img"], dataset["test_label"]

    def _init_dataset(self):
        self._download_mnist_dataset()

        dataset = {
            "train_img": self._convert_img_to_ndarray(self.DATASET_DIR / self.KEY_FILES["train_img"]),
            "train_label": self._convert_label_gz_to_ndarray(self.DATASET_DIR / self.KEY_FILES["train_label"]),
            "test_img": self._convert_img_to_ndarray(self.DATASET_DIR / self.KEY_FILES["test_img"]),
            "test_label": self._convert_label_gz_to_ndarray(self.DATASET_DIR / self.KEY_FILES["test_label"]),
        }

        with self.DATASET_PATH.open(mode="wb") as f:
            pickle.dump(dataset, f, -1)

    def _download_mnist_dataset(self):
        filelist = self.DATASET_DIR.glob("*")

        for filename in self.KEY_FILES.values():
            if filename not in filelist:
                filepath = self.DATASET_DIR / filename
                urlretrieve(self.URL_MNIST + filename, filepath)
                print("Downloaded", filename, "to", filepath)

    @staticmethod
    def _convert_img_to_ndarray(img_zip_path: Path) -> np.ndarray[np.ndarray[np.float32]]:
        with gzip.open(img_zip_path) as f:
            pixels = np.frombuffer(f.read(), "B", offset=16)
        return pixels.reshape(-1, 28 * 28).astype(np.float32) / 255.0

    def _convert_label_gz_to_ndarray(self, label_zip_path: Path) -> np.ndarray[np.ndarray[np.int8]]:
        with gzip.open(label_zip_path) as f:
            labels = np.frombuffer(f.read(), "B", offset=8)
        return self._encoding_as_onehot(labels)

    @staticmethod
    def _encoding_as_onehot(int_labels: np.ndarray[np.int8]) -> np.ndarray[np.ndarray[np.int8]]:
        num_rows = len(int_labels)
        num_cols = int_labels.max() + 1

        onehot = np.zeros((num_rows, num_cols), dtype=np.uint8)
        onehot[np.arange(num_rows), int_labels] = 1
        return onehot

In [3]:
mnist_dataset = MNISTDataset()
mnist_dataset.load()

(array([[0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.],
        ...,
        [0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.]], shape=(60000, 784), dtype=float32),
 array([[0, 0, 0, ..., 0, 0, 0],
        [1, 0, 0, ..., 0, 0, 0],
        [0, 0, 0, ..., 0, 0, 0],
        ...,
        [0, 0, 0, ..., 0, 0, 0],
        [0, 0, 0, ..., 0, 0, 0],
        [0, 0, 0, ..., 0, 1, 0]], shape=(60000, 10), dtype=uint8),
 array([[0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.],
        ...,
        [0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.]], shape=(10000, 784), dtype=float32),
 array([[0, 0, 0, ..., 1, 0, 0],
        [0, 0, 1, ..., 0, 0, 0],
        [0, 1, 0, ..., 0, 0, 0],
        ...,
        [0, 0, 0, ..., 0, 0, 0],
        [0, 0, 0, ..., 0, 0, 0],
        [

In [4]:
x_train, y_train, x_test, y_test = mnist_dataset.load()

In [5]:
# train data 와 valid data 만들기
indices = np.arange(x_train.shape[0])
np.random.shuffle(indices)  # 랜덤하게 index 섞기

valid_indices = indices[:10000]
train_indices = indices[10000:]

x_valid, y_valid = x_train[valid_indices], y_train[valid_indices]
x_train, y_train = x_train[train_indices], y_train[train_indices]

print("학습 데이터:", len(x_train), "검증 데이터:", len(x_valid), "평가 데이터:", len(x_test))

학습 데이터: 50000 검증 데이터: 10000 평가 데이터: 10000


## 활성화 함수

- Sigmoid

In [6]:
class SigmoidLayer:
    def __init__(self):
        self.out: np.ndarray | None = None

    def forward(self, x: np.ndarray) -> np.ndarray:
        self.out = 1.0 / (1.0 + np.exp(-x))
        return self.out

    def backward(self, dout: np.ndarray) -> np.ndarray:
        return dout * (1.0 - self.out) * self.out  # dx

- ReLU

In [None]:
class ReLULayer:
    def __init__(self):
        self.mask: np.ndarray | None = None

    def forward(self, x: np.ndarray) -> np.ndarray:
        self.mask = x <= 0

        out = x.copy()
        out[self.mask] = 0
        return out

    def backward(self, dout: np.ndarray) -> np.ndarray:
        dout[self.mask] = 0
        return dout

## 완전 연결 레이어(Fully Connected Layer)

In [7]:
class FCLayer:
    def __init__(self, w, b):
        self.w = w
        self.b = b
        self.x: np.ndarray | None = None
        self.original_x_shape: tuple | None = None
        self.dw: np.ndarray | None = None
        self.db: np.ndarray | None = None

    def forward(self, x: np.ndarray) -> np.ndarray:
        self.original_x_shape = x.shape
        self.x = x.reshape(x.shape[0], -1)
        return np.dot(self.x, self.w) + self.b

    def backward(self, dout: np.ndarray) -> np.ndarray:
        dx = np.dot(dout, self.w.T)
        self.dw = np.dot(self.x.T, dout)
        self.db = np.sum(dout, axis=0)
        return dx.reshape(*self.original_x_shape)

## 소프트 맥스 레이어(Softmax Layer)

### Softmax 함수에 대한 설명

- Softmax 는 다중 클래스 분류 문제에서 널리 사용되는 활성화 함수이다.
- 이 함수는 여러 개의 실수값을 입력받아 각 값을 0~1 사이의 확률값으로 변환하며, 모든 출력값의 합은 1이 된다.
- 이렇게 변환된 값은 각 클래스에 속할 확률로 해석할 수 있다.


### Softmax의 수학적 표현

$$
\text{Softmax}(x_i) = \frac{e^{x_i}}{\sum_{j=1}^{n} e^{x_j}}
$$

여기서 $x_i$는 입력 벡터의 $i$번째 요소이고, $n$은 입력 벡터의 차원이다.

In [None]:
class SoftmaxLayer:
    def __init__(self):
        self.loss = None  # 손실값을 저장할 변수
        self.y_true = None  # 실제 레이블(정답) 저장 변수
        self.y_pred = None  # 예측값 저장 변수

    def forward(self, x: np.ndarray, y_pred: np.ndarray) -> np.ndarray:
        self.y_true = self._softmax(x)  # x를 소프트맥스 함수에 통과시켜 확률 분포로 변환
        self.y_pred = y_pred  # 예측값 저장
        # 교차 엔트로피 오차 계산
        self.loss = self._cross_entropy_error(self.y_true, self.y_pred)
        # 계산된 손실 반환
        return self.loss

    def backward(self, _=1):
        # 미니배치 크기 계산
        batch_size = self.y_pred.shape[0]

        # y_pred 와 y_true의 크기가 같은 경우 (확률 분포 vs 확률 분포)
        if self.y_pred.size == self.y_true.size:
            # 소프트맥스 함수와 교차 엔트로피 오차를 사용할 때의 역전파는 (y_true - y_pred)로 간단하게 계산됨
            dx = (self.y_true - self.y_pred) / batch_size
        # y_pred 가 레이블 인덱스인 경우 (원-핫 인코딩 vs 인덱스)
        else:
            # y_true를 복사
            dx = self.y_true.copy()
            # 예측한 클래스의 값에서 1을 빼줌
            dx[np.arange(batch_size), self.y_pred] -= 1
            # 배치 크기로 나누어 정규화
            dx = dx / batch_size
        return dx

    @staticmethod
    def _softmax(x: np.ndarray) -> np.ndarray:
        # 2차원 입력인 경우 (미니배치)
        if x.ndim == 2:
            # x를 전치하여 각 샘플이 열 벡터가 되도록 함
            x = x.T
            # 수치 안정성을 위해 최대값을 빼줌
            x = x - np.max(x, axis=0)
            # 소프트맥스 계산: exp(x) / sum(exp(x))
            y = np.exp(x) / np.sum(np.exp(x), axis=0)
            # 다시 원래 형태로 전치하여 반환
            return y.T

        # 1차원 입력인 경우 (단일 샘플)
        # 수치 안정성을 위해 최대값을 빼줌
        x = x - np.max(x)
        # 소프트맥스 계산 및 반환
        return np.exp(x) / np.sum(np.exp(x))

    @staticmethod
    def _cross_entropy_error(y_true: np.ndarray, y_pred: np.ndarray) -> np.ndarray:
        # 1차원 입력을 2차원으로 변환
        if y_true.ndim == 1:
            y_pred = y_pred.reshape(1, y_pred.size)
            y_true = y_true.reshape(1, y_true.size)

        # y_pred가 확률 분포인 경우 인덱스로 변환
        if y_pred.size == y_true.size:
            y_pred = y_pred.argmax(axis=1)

        # 미니배치 크기 계산
        batch_size = y_true.shape[0]
        # 교차 엔트로피 오차 계산: -log(y[정답 레이블])의 평균
        # 1e-7을 더하는 이유는 log(0)를 방지하기 위함
        return -np.sum(np.log(y_true[np.arange(batch_size), y_pred] + 1e-7)) / batch_size

## 옵티마이저(Optimizer) 구현해보기

### SGD Optimizer 구현해보기

In [None]:
class SGD:
    def __init__(self, learning_rate: float):
        self.learning_rate = learning_rate

    def update(self, params: dict, grads: dict):
        for key in params:
            params[key] = params[key] + self.learning_rate * grads[key]

### Momentum 구현해보기

> Momentum 의 의미
>
> "Momentum"은 물리학에서 유래한 용어로 **"운동량"** 또는 **"탄력"** 을 의미한다. 물리학에서 운동량은 질량과 속도의 곱으로, 물체가 한번 움직이기 시작하면 계속해서 그 방향으로 움직이려는 경향을 설명한다.

**Momentum 최적화 알고리즘의 원리**

- 기본적인 경사 하강법(Gradient Descent)은 현재 지점에서의 기울기만을 고려하여 파라미터를 업데이트한다.
- 반면, Momentum 최적화 알고리즘은 이전 업데이트 방향을 "기억"하고 현재 그래디언트와 결합하여 파라미터를 업데이트한다.


**수식**

- 일반적인 경사 하강법
    ```
    θ = θ - η∇J(θ)
    ```
- Momentum
    ```
    v = γv - η∇J(θ)
    θ = θ + v
    ```
- 여기서,
    - θ: 모델 파라미터
    - η (eta): 학습률(learning rate)
    - ∇J(θ): 비용 함수의 그래디언트
    - v: 속도(velocity) 벡터
    - γ (gamma): 모멘텀 계수 (일반적으로 0.9 정도 사용)

**Momentum 최적화의 직관적 이해**

Momentum 최적화는 공이 언덕을 굴러 내려가는 것과 유사하게 생각할 수 있다.

1. **관성**: 공이 한 방향으로 움직이기 시작하면, 그 방향으로 계속 움직이려는 경향이 있다.
2. **가속**: 같은 방향의 그래디언트가 계속되면 속도가 누적되어 더 빨리 움직인다.
3. **감속**: 그래디언트 방향이 바뀌더라도 이전 속도 때문에 천천히 방향을 전환한다.

이러한 특성 덕분에 Momentum 최적화는 다음과 같은 상황에서 특히 유용하다.
- 비용 함수의 표면이 골짜기처럼 길고 좁은 형태일 때
- 그래디언트가 자주 방향을 바꾸는 불안정한 학습 과정에서
- 지역 최소값이 많은 복잡한 비용 함수를 최적화할 때


In [None]:
class Momentum:
    def __init__(self, learning_rate: float, momentum: float):
        """
        Momentum 최적화 알고리즘 초기화

        매개변수:
            learning_rate: 학습률 - 그래디언트 스텝의 크기를 조절
            momentum: 모멘텀 계수 - 이전 업데이트의 영향력을 결정 (일반적으로 0.9 사용)
        """
        self.learning_rate = learning_rate  # 학습률 설정 (그래디언트 업데이트 크기 조절)
        self.momentum = momentum  # 모멘텀 계수 설정 (이전 속도의 유지 정도)
        self.v = None  # 속도 벡터 초기화 (처음에는 None)

    def update(self, params: dict, grads: dict):
        """
        Momentum 방식으로 파라미터 업데이트

        매개변수:
            params: 업데이트할 신경망 파라미터 딕셔너리 (예: {'W1': 가중치1, 'b1': 편향1, ...})
            grads: 각 파라미터에 대한 그래디언트 딕셔너리 (예: {'W1': dW1, 'b1': db1, ...})
        """
        # 첫 번째 호출 시 속도(v)를 파라미터와 동일한 형상의 0으로 초기화
        # 속도 벡터는 각 파라미터마다 별도로 유지됨
        if self.v is None:
            self.v = {}
            for key, val in params.items():
                # 각 파라미터와 동일한 크기의 0 배열 생성
                self.v[key] = np.zeros_like(val)

        # 모든 파라미터에 대해 업데이트 수행
        for key in params:
            # 속도 벡터 업데이트: v = momentum * v - learning_rate * gradient
            # 1. 이전 속도에 모멘텀 계수를 곱해 일정 비율 유지 (관성)
            # 2. 현재 그래디언트에 학습률을 곱한 후 빼줌 (그래디언트 방향으로의 이동)
            self.v[key] = self.momentum * self.v[key] - self.learning_rate * grads[key]

            # 파라미터 업데이트: params = params + v
            # 계산된 속도 벡터를 파라미터에 더함
            # 이는 기존 그래디언트 하강법의 params -= learning_rate * grads[key]와 다름
            params[key] += self.v[key]

### Nesterov Optimizer 구현해보기

**Nesterov 가속 경사 하강법 (NAG) 수식**

- 일반 경사 하강법 (GD)
    - 일반적인 경사 하강법에서는 파라미터 $\theta$를 다음과 같이 업데이트한다.

    $$\theta_{t+1} = \theta_t - \eta \nabla J(\theta_t)$$

    여기서,
    - $\theta_t$: t 시점의 파라미터
    - $\eta$: 학습률(learning rate)
    - $\nabla J(\theta_t)$: $\theta_t$에서의 비용 함수의 그래디언트

- 모멘텀 방식 (Momentum)
    - 모멘텀 방식에서는 속도 벡터 v를 도입하여 다음과 같이 업데이트한다.

    $$ v_{t+1} = \gamma v_t - \eta \nabla J(\theta_t) \theta_{t+1} = \theta_t + v_{t+1} $$

    여기서,
    - $v_t$: t 시점의 속도 벡터
    - $\gamma$: 모멘텀 계수 (일반적으로 0.9 정도)

- Nesterov 가속 경사 하강법 (NAG)
    - Nesterov 가속 경사 하강법은 모멘텀 방식을 개선한 것으로, 그래디언트를 계산하는 위치에 차이가 있다.

    $$\tilde{\theta}_t = \theta_t + \gamma v_t $$


**알고리즘**

1. 현재 파라미터와 이전 속도를 사용하여 미리 파라미터의 예상 위치 계산
    ```
    θ_ahead = θ + γ*v
    ```
2. 예상 위치에서의 그래디언트를 계산
3. 속도 업데이트
    ```
    v = γ*v - η*∇J(θ_ahead)
    ```
4. 파라미터 업데이트
    ```
    θ = θ + v
    ```

여기서,
- θ: 모델 파라미터
- v: 속도 벡터
- γ (gamma): 모멘텀 계수
- η (eta): 학습률
- ∇J(θ_ahead): 예상 위치에서의 그래디언트


**Nesterov 가속 경사 하강법의 동작 원리**
- Nesterov 가속 경사 하강법은 기본 모멘텀 방식의 개선된 버전이다.
- 주요 차이점은 그래디언트를 계산하는 지점이다.

**기본 모멘텀 vs Nesterov**

- 기본 모멘텀
    - 현재 위치에서 그래디언트 계산
    - 이전 속도와 현재 그래디언트를 사용하여 다음 위치로 이동

- Nesterov 가속 경사 하강법:
    - 모멘텀을 적용한 후의 예상 위치(lookahead)에서 그래디언트 계산
    - 이 그래디언트를 사용하여 속도 업데이트 및 파라미터 업데이트

> 직관적 이해
>
> Nesterov 방식은 마치 공이 언덕을 내려가는 과정에서 "앞을 내다보는" 것과 유사하다.
> 1. 현재 속도(모멘텀)로 인해 공이 어디로 갈지 예측
> 2. 그 예측된 위치에서의 기울기를 확인
> 3. 그 기울기에 따라 실제 움직임을 조정
> 이러한 "내다보기" 전략은 최적화 과정에서 더 빠른 수렴과 향상된 성능을 제공한다.


**Nesterov 의 장점**
- 더 빠른 수렴: 일반 모멘텀보다 더 빠르게 최소값에 도달하는 경우가 많다.
- 반응성 향상: 목표에 가까워질 때 더 빠르게 감속할 수 있다.
- 이론적 보장: 특정 함수 클래스에 대해 최적의 수렴 속도를 제공한다.

In [None]:
class Nesterov:
    def __init__(self, learning_rate: float, momentum: float = 0.9):
        """
        Nesterov 가속 경사 하강법 최적화 알고리즘 초기화

        매개변수:
            learning_rate: 학습률 - 그래디언트 스텝의 크기를 조절
            momentum: 모멘텀 계수 - 이전 업데이트의 영향력을 결정 (일반적으로 0.9 사용)
        """
        self.learning_rate = learning_rate  # 학습률 설정
        self.momentum = momentum  # 모멘텀 계수 설정
        self.v = None  # 속도 벡터 초기화

    def update(self, params: dict, grads: dict):
        """
        Nesterov 방식으로 파라미터 업데이트

        매개변수:
            params: 업데이트할 신경망 파라미터 딕셔너리 (예: {'W1': 가중치1, 'b1': 편향1, ...})
            grads: 각 파라미터에 대한 그래디언트 딕셔너리 (예: {'W1': dW1, 'b1': db1, ...})
        """
        # 첫 번째 호출 시 속도(v)를 파라미터와 동일한 형상의 0으로 초기화
        if self.v is None:
            self.v = {}
            for key, val in params.items():
                self.v[key] = np.zeros_like(val)

        # 모든 파라미터에 대해 업데이트 수행
        for key in params:
            # 기존 속도를 저장
            v_prev = self.v[key].copy()

            # 모멘텀을 적용한 속도 업데이트
            self.v[key] = self.momentum * self.v[key] - self.learning_rate * grads[key]

            # Nesterov 수정: 현재 속도에 모멘텀과 이전 속도 변화를 더함
            # 이는 "lookahead" 개념을 구현
            params[key] += -self.momentum * v_prev + (1 + self.momentum) * self.v[key]

### AdaGrad 구현해보기

#### AdaGrad(Adaptive Gradient) 알고리즘 설명

AdaGrad는 학습 과정에서 각 매개변수에 대해 적응적인 학습률을 적용하는 최적화 알고리즘이다.

##### 핵심 아이디어
- 자주 업데이트되는 매개변수(기울기가 작은 매개변수)에는 작은 학습률 적용
- 드물게 업데이트되는 매개변수(기울기가 큰 매개변수)에는 큰 학습률 적용
- 이를 통해 희소 데이터(sparse data)나 기울기가 크게 다른 특성에서도 효과적으로 학습 가능

##### 수학적 표현
1. 기본 업데이트 규칙: $\theta_{t+1} = \theta_t - \eta \cdot g_t$
    여기서,
    - $\theta_t$: t 시점의 매개변수
    - $\eta$: 기본 학습률
    - $g_t$: t 시점의 기울기

2. AdaGrad 업데이트 규칙: $\theta_{t+1} = \theta_t - \frac{\eta}{\sqrt{G_t + \epsilon}} \cdot g_t$
    여기서,
    - $G_t = \sum_{i=1}^{t} g_i^2$: 기울기의 제곱의 누적합
    - $\epsilon$: 분모가 0이 되는 것을 방지하기 위한 작은 상수 (보통 1e-7 ~ 1e-8)

##### 특징
1. **장점**
    - 각 매개변수마다 학습률을 자동으로 조정
    - 희소 데이터에 효과적
    - 초기 학습률 선택에 덜 민감함

2. **단점**
    - 학습이 진행될수록 G_t가 계속 증가하여 학습률이 너무 작아질 수 있음
    - 이로 인해 긴 학습 과정에서는 학습이 일찍 중단될 수 있음
    - 이러한 단점을 보완하기 위해 RMSProp, Adam 등의 알고리즘이 개발됨

AdaGrad는 특히 자연어 처리와 같이 희소 특성이 많은 문제에서 좋은 성능을 보이며, 전통적인 SGD보다 더 안정적인 학습이 가능하다.

In [None]:
class AdaGrad:
    def __init__(self, learning_rate: float):
        """AdaGrad 옵티마이저 초기화

        Args:
            learning_rate (float): 학습률(η). 기본 스텝 크기를 결정하는 하이퍼파라미터
        """
        self.learning_rate = learning_rate
        self.h = None  # 기울기 제곱 누적값을 저장할 딕셔너리 (처음에는 None으로 초기화)

    def update(self, params: dict, grads: dict):
        """파라미터 업데이트 수행

        Args:
            params (dict): 신경망의 파라미터 (W1, b1, W2, b2, ...)
            grads (dict): 각 파라미터에 대한 기울기 (dW1, db1, dW2, db2, ...)
        """
        # 최초 호출 시 h 딕셔너리 초기화
        if self.h is None:
            self.h = {}
            for key, val in params.items():
                # 각 파라미터와 같은 형상의 0 행렬로 초기화
                # 이 값은 기울기 제곱의 누적합을 저장할 변수
                self.h[key] = np.zeros_like(val)

        # 각 파라미터에 대해 업데이트 수행
        for key in params:
            # AdaGrad의 핵심: 기울기 제곱을 누적
            # h_t = h_{t-1} + g_t^2 수식 구현 부분
            # 이전 스텝까지의 기울기 제곱 누적값에 현재 기울기의 제곱을 더함
            self.h[key] += grads[key] * grads[key]

            # 파라미터 업데이트: θ_{t+1} = θ_t - η * g_t / sqrt(h_t + ε)
            # 학습률을 기울기 제곱의 누적 합의 제곱근으로 나누어 파라미터 갱신
            # 이는 자주 변화하는 파라미터(h값이 큰)는 더 작은 스텝으로,
            # 드물게 변화하는 파라미터(h값이 작은)는 더 큰 스텝으로 업데이트하는 효과
            params[key] -= self.learning_rate * grads[key] / (np.sqrt(self.h[key]) + 1e-7)
            # 1e-7은 0으로 나누는 것을 방지하기 위한 작은 값(epsilon)
            # 특히 학습 초기에 h값이 매우 작을 때 수치적 안정성 확보

### RMSProp 구현해보기

In [None]:
# TODO

### Adam 구현해보기

In [None]:
# TODO

## 평가 지표 구현하기

- 정확도를 함수로 구현

In [None]:
def accuracy(y_true: np.ndarray, y_pred: np.ndarray, batch_size) -> float:
    y_pred = np.argmax(y_pred, axis=1)

    if y_pred.ndim != 1:
        y_pred = np.argmax(y_pred, axis=1)

    assert y_pred.shape == y_true.shape

    # 실제 라벨과 예측 라벨이 일치하는 개수를 계산하여 정확도 반환
    return np.sum(y_true == y_pred) / float(batch_size)

## 드롭아웃(dropout) 구현하기

### Dropout이란?

- Dropout은 신경망의 과적합(overfitting)을 방지하기 위한 정규화(regularization) 기법
- 학습 과정에서 무작위로 일부 뉴런을 비활성화(꺼버림)하여 네트워크가 특정 뉴런에 과도하게 의존하는 것을 방지

### 평가 시 스케일 조정 이유
- 평가(테스트) 시에는 Dropout을 적용하지 않는다.
- 대신, `x * (1.0 - self.dropout_ratio)`와 같이 스케일을 조정한다.
- 이유는 다음과 같다.
    1. **기대값 일치(Expectation Matching)**
        - 학습 시 평균적으로 `(1 - dropout_ratio)` 비율의 뉴런만 활성화
        - 테스트 시에는 모든 뉴런을 사용하므로, 학습 시와 테스트 시의 출력 크기가 달라진다.
        - 이를 보정하기 위해 테스트 시 출력에 `(1 - dropout_ratio)`를 곱해 스케일을 조정한다.

    2. **분산 유지**
        - 이 스케일 조정은 학습 시와 테스트 시의 출력 분포를 일치시켜 준다.
        - 스케일 조정 없이 모든 뉴런을 활성화하면 출력 신호가 학습 시보다 훨씬 커지게 된다.

- 이 방식을 "Inverted Dropout"이라고 하며, 현대 딥러닝 프레임워크들에서 주로 사용하는 방식이다.

In [None]:
class Dropout:
    def __init__(self, dropout_ratio: float = 0.5):
        # 드롭아웃할 뉴런의 비율
        # e.g. 0.5 는 50% 의 뉴련을 무작위 꺼버림
        self.dropout_ratio = dropout_ratio

        # mask: 드롭아웃할 뉴련ㅇ르 결정하는 불리언 마스크
        self.mask: np.ndarray | None = None  # 오직 학습 시에만 사용

    def forward(self, x: np.ndarray, is_train: bool = True) -> np.ndarray:
        if is_train:  # 학습시
            # 1. 입력 데이터 x와 동일한 형태의 0 ~ 1 사이 균일 분포 난수 배열을 생성
            # 2. dropout_ratio 보다 큰 값만 True 로 설정
            self.mask = np.random.rand(*x.shape) > self.dropout_ratio

            # mask 를 사용해서 일부 뉴련을 꺼버림
            return x * self.mask
        # 평가시
        # Dropout 의 비율만큼 스케일 조정하여 출력
        return x * (1.0 - self.dropout_ratio)

    def backward(self, dout: np.ndarray) -> np.ndarray:
        # mask 를 사용하여, 순전파 때 꺼진 뉴런은 그래디언트도 전달되지 않게 함.
        return dout * self.mask

## 가중치 초기화

1. **학습 안정성**: 그래디언트 소실이나 폭발 문제를 방지
2. **수렴 속도**: 더 빠른 학습 수렴을 가능하게 함
3. **성능**: 최종 모델의 성능에 큰 영향을 미침

### Xavier 초기화 (Glorot 초기화)

- Xavier 초기화는 2010년 Xavier Glorot와 Yoshua Bengio가 제안한 방법
- 주로 시그모이드(sigmoid)나 하이퍼볼릭 탄젠트(tanh) 같은 활성화 함수에 적합
- 이 방법은 입력층과 출력층의 노드 수를 고려하여 가중치를 초기화함으로써 그래디언트 소실(vanishing gradient)이나 폭발(exploding gradient) 문제를 완화

- **목표**: 각 레이어의 활성화 값들이 적절한 분산을 유지하도록 함
- **수식**: 가중치 ~ N(0, sqrt(1/n_in))
    - n_in: 이전 레이어의 노드 수

- **특징**: 시그모이드나 탄젠트 하이퍼볼릭 활성화 함수에 적합


### He 초기화

- He 초기화는 2015년 Kaiming He가 제안한 방법으로, ReLU와 같은 활성화 함수에 최적화되어 있음
- Xavier 초기화를 기반으로 하지만, ReLU가 가진 비선형성의 특성을 고려하여 수정된 방법
- **목표**: ReLU 활성화 함수의 특성을 고려하여 분산 유지
- **수식**: 가중치 ~ N(0, sqrt(2/n_in))
    - n_in: 이전 레이어의 노드 수

- **특징**: ReLU 함수는 음수 입력을 0으로 만들기 때문에 분산이 절반으로 줄어드는 것을 보정

In [8]:
class SimpleNet:
    def __init__(self):
        self.input_size = 28 * 28
        self.hidden_size_list = [128, 64]
        self.output_size = 10
        self.weight_init_std = 0.01
        self.params = {}

    # ... 중략 ...

    def __initialize_weights(self, weight_init_std: float | str):
        """
        신경망의 가중치를 초기화하는 함수

        매개변수:
            weight_init_std: 가중치 초기화 방법을 지정
                - float 값: 주어진 표준편차로 가중치 초기화
                - "relu" 또는 "he": He 초기화 방법 사용 (ReLU 활성화 함수에 최적화)
                - "sigmoid" 또는 "xavier": Xavier 초기화 방법 사용 (Sigmoid 활성화 함수에 최적화)
        """
        # 모든 레이어의 크기(노드 수)를 포함하는 리스트 생성
        # 입력층, 은닉층들, 출력층을 순서대로 포함
        all_size_list = [self.input_size] + self.hidden_size_list + [self.output_size]

        # 각 레이어의 가중치와 편향 초기화
        for idx in range(1, len(all_size_list)):
            # 기본적으로 주어진 weight_init_std 값을 스케일로 사용
            scale = weight_init_std

            if str(weight_init_std).lower() in ("relu", "he"):
                # He 초기화: ReLU 활성화 함수에 최적화
                # 이전 레이어의 노드 수를 사용하여 스케일 계산
                # 이론적 근거: ReLU는 입력의 절반을 0으로 만들기 때문에 분산을 2배로 조정
                scale = np.sqrt(2.0 / all_size_list[idx - 1])
            elif str(weight_init_std).lower() in ("sigmoid", "xavier"):
                # Xavier 초기화: Sigmoid나 tanh 활성화 함수에 최적화
                # 이전 레이어의 노드 수를 사용하여 스케일 계산
                # 목표: 각 레이어의 활성화 값 분산을 일정하게 유지
                scale = np.sqrt(1.0 / all_size_list[idx - 1])

            # 계산된 스케일을 사용하여 가중치 초기화
            # np.random.randn(): 평균 0, 표준편차 1인 정규분포 난수 생성
            # 행렬 크기: (이전 레이어 노드 수) x (현재 레이어 노드 수)
            self.params[f"w{idx}"] = scale * np.random.randn(all_size_list[idx - 1], all_size_list[idx])

            # 편향은 모두 0으로 초기화
            # 현재 레이어의 노드 수만큼의 길이를 가진 0 벡터
            self.params[f"b{idx}"] = np.zeros(all_size_list[idx])

## 배치 정규화(Batch Normalization) 구현

- 배치 정규화는 2015년 Sergey Ioffe 와 Christian Szegedy 가 제안한 기법으로,
- 딥러닝 네트워크의 각 레이어에서 입력 데이터의 분포를 정규화하는 방법
- 이 기법은 내부 공변량 변화(Internal Covariate Shift) 문제를 해결하기 위해 고안되었다.

### 배치 정규화의 주요 장점

1. **학습 속도 향상**: 더 높은 학습률을 사용할 수 있어 학습 속도가 빨라진다.
2. **안정적인 학습**: 가중치 초기화에 덜 민감하게 만들어 더 안정적인 학습이 가능하다.
3. **과적합 방지**: 약한 정규화 효과가 있어 과적합을 방지하는 데 도움이 된다.
4. **그래디언트 소실/폭발 완화**: 그래디언트 흐름을 개선하여 깊은 네트워크 학습을 용이하게 한다.

### 배치 정규화를 사용하면 좋은 경우

1. **깊은 신경망**: 레이어가 많은 깊은 모델에서 특히 효과적이다.
2. **학습 데이터가 다양한 분포**: 입력 데이터의 분포가 다양할 때 안정적인 학습에 도움이 된다.
3. **높은 학습률 사용**: 빠른 학습을 위해 높은 학습률을 사용하고 싶을 때 유용하다.
4. **초기화에 민감한 모델**: 가중치 초기화 방법에 민감한 모델의 안정성을 높이는 데 도움이 된다.

### 배치 정규화의 작동 원리 요약

1. **정규화**: 입력 데이터의 평균을 0, 분산을 1로 정규화
2. **스케일 및 이동**: 감마(γ)와 베타(β) 파라미터를 사용하여 정규화된 값을 변환
3. **이동 평균 및 분산**: 학습 중에는 배치별 통계를 사용하고, 테스트 중에는 이동 평균 통계를 사용

In [None]:
class BatchNormalization:
    def __init__(self, gamma, beta, momentum, running_mean=None, running_var=None):
        """
        배치 정규화 레이어 초기화

        매개변수:
            gamma: 정규화된 데이터를 스케일링하는 학습 가능한 파라미터
            beta: 정규화된 데이터에 더해지는 학습 가능한 파라미터 (이동 파라미터)
            momentum: 실행 중 평균과 분산 업데이트에 사용되는 모멘텀 계수 (보통 0.9, 0.99 등의 값)
            running_mean: 테스트 시 사용할 평균 값 (None이면 초기화됨)
            running_var: 테스트 시 사용할 분산 값 (None이면 초기화됨)
        """
        # 학습 가능한 스케일 및 시프트 파라미터
        self.gamma = gamma  # 스케일링 파라미터 (학습 가능)
        self.beta = beta  # 이동 파라미터 (학습 가능)

        # 평균 및 분산의 이동 평균을 계산할 때 사용되는 모멘텀 값
        # 큰 값(0.9~0.99)일수록 이전 배치들의 통계에 더 많은 가중치를 둠
        self.momentum = momentum

        # 테스트 시에 사용할 실행 중 평균 및 분산
        # 학습 중에 계속 업데이트되며, 테스트 시에는 고정된 값으로 사용됨
        self.running_mean = running_mean
        self.running_var = running_var

        # 입력 데이터의 형태를 저장 (역전파 시 원래 형태로 복원하기 위함)
        self.input_shape = None

        # 역전파 계산에 필요한 중간 값들을 저장하는 변수들
        self.batch_size = None  # 현재 배치 크기
        self.xc = None  # 평균을 뺀 입력 데이터 (x - mean)
        self.xn = None  # 정규화된 데이터
        self.std = None  # 표준편차
        self.dgamma = None  # gamma에 대한 그래디언트
        self.dbeta = None  # beta에 대한 그래디언트

    def forward(self, x: np.ndarray, is_train: bool = True) -> np.ndarray:
        """
        배치 정규화 순전파

        매개변수:
            x: 입력 데이터, 형태는 (배치 크기, 특성 수) 또는 (배치 크기, 채널 수, 높이, 너비)
            is_train: 학습 모드 여부 (True: 학습 모드, False: 테스트 모드)

        반환값:
            정규화된 출력 데이터
        """
        self.input_shape = x.shape
        # 4D 텐서인 경우 2D로 변경 (합성곱 레이어에서 사용 시)
        if x.ndim == 4:  # CNN의 경우
            n, c, h, w = x.shape
            x = x.reshape(n, -1)  # (n, c*h*w) 형태로 변경

        # running_mean 및 running_var가 초기화되지 않았으면 초기화
        if self.running_mean is None:
            _, d = x.shape  # d: 특성 수
            self.running_mean = np.zeros(d)  # 평균 초기값: 0
            self.running_var = np.ones(d)  # 분산 초기값: 1

        # 학습 모드
        if is_train:
            # 현재 배치의 평균 계산
            mu = x.mean(axis=0)
            # 평균을 뺀 값 계산 (중앙 집중화)
            xc = x - mu
            # 현재 배치의 분산 계산
            var = np.mean(xc**2, axis=0)
            # 수치 안정성을 위해 작은 값(1e-7)을 더해 표준편차 계산
            std = np.sqrt(var + 1e-7)
            # 정규화 (평균 0, 분산 1로 변환)
            xn = xc / std

            # 역전파 계산을 위한 중간 값들 저장
            self.batch_size = x.shape[0]
            self.xc = xc
            self.xn = xn
            self.std = std

            # 이동 평균과 이동 분산 업데이트 (테스트 시 사용)
            # momentum * 이전_값 + (1-momentum) * 현재_값
            self.running_mean = self.momentum * self.running_mean + (1 - self.momentum) * mu
            self.running_var = self.momentum * self.running_var + (1 - self.momentum) * var
        else:
            # 테스트 모드: 저장된 이동 평균과 이동 분산 사용
            xc = x - self.running_mean
            xn = xc / np.sqrt(self.running_var + 1e-7)

        # 정규화된 값에 학습 가능한 파라미터 적용: gamma * xn + beta
        # gamma: 스케일링, beta: 이동(shift)
        return self.gamma * xn + self.beta

    def backward(self, dout: np.ndarray) -> np.ndarray:
        """
        배치 정규화 역전파

        매개변수:
            dout: 출력에 대한 그래디언트

        반환값:
            입력에 대한 그래디언트
        """
        # 4D 텐서인 경우 2D로 변경 (합성곱 레이어에서 사용 시)
        if dout.ndim == 4:
            n, c, h, w = dout.shape
            dout = dout.reshape(n, -1)

        # gamma에 대한 그래디언트 계산
        # dL/dgamma = Σ(dL/dout * xn)
        self.dgamma = np.sum(self.xn * dout, axis=0)

        # beta에 대한 그래디언트 계산
        # dL/dbeta = Σ(dL/dout)
        self.dbeta = np.sum(dout, axis=0)

        # 정규화된 값(xn)에 대한 그래디언트 계산
        # dL/dxn = dL/dout * gamma
        dxn = self.gamma * dout

        # 중앙 집중화된 값(xc)에 대한 그래디언트 계산
        # dL/dxc = dL/dxn * (1/std)
        dxc = dxn / self.std

        # 표준편차에 대한 그래디언트 계산
        # dL/dstd = Σ(dL/dxn * xc * (-1/std²))
        dstd = -np.sum((dxn * self.xc) / (self.std * self.std), axis=0)

        # 분산에 대한 그래디언트 계산
        # dL/dvar = dL/dstd * (0.5/std)
        dvar = 0.5 * dstd / self.std

        # 중앙 집중화된 값(xc)에 대한 추가 그래디언트 (분산 계산에서 오는 부분)
        # dL/dxc += dL/dvar * (2*xc/batch_size)
        dxc += (2.0 / self.batch_size) * self.xc * dvar

        # 평균에 대한 그래디언트 계산
        # dL/dmu = Σ(dL/dxc * (-1))
        dmu = np.sum(dxc, axis=0)

        # 입력에 대한 최종 그래디언트 계산
        # dL/dx = dL/dxc - dL/dmu/batch_size
        dx = dxc - dmu / self.batch_size

        # 원래 입력 형태로 복원하여 반환
        return dx.reshape(*self.input_shape)

## 전체 신경망 구현하기

In [None]:
class Net:
    # TODO
    def __init__(
        self,
        input_size,
        hidden_size_list,
        output_size,
        use_dropout=False,
        dropout_rate=0.0,
        use_batch_norm=False,
        activation="relu",
        weight_init_std="relu",
        weight_decay_rate=0.0,
    ):
        self.input_size = input_size  # 입력 크기(예: 이미지의 픽셀수)
        self.output_size = output_size  # 출력 크기(예: 분류할 클래스 수)
        self.hidden_size_list = hidden_size_list  # 은닉층의 뉴런 수 리스트
        self.hidden_layer_num = len(self.hidden_size_list)  # 은닉층의 개수
        self.use_dropout = use_dropout  # dropout 사용 여부
        self.dropout_rate = dropout_rate  # dropout 비율
        self.use_batch_norm = use_batch_norm  # 배치 정규화 사용 여부
        self.weight_decay_rate = weight_decay_rate  # 가중치 감쇠

        # 신경망의 가중치
        self.params = {}

        # 가중치 추기화
        self.__init_weight(weight_init_std)

        # 활성화 함수 map
        activation_layer = {"sigmoid": SigmoidLayer, "relu": ReLULayer}

        # 신경망의 레이어
        self.layers = {}

        # 은닉층 생성
        for idx in range(1, self.hidden_layer_num + 1):
            # fully connected layer
            self.layers[f"FC{idx}"] = FCLayer(
                self.params[f"w{idx}"],
                self.params[f"b{idx}"],
            )

            if self.use_batch_norm:  # 배치 정규화 사용 여부에 따른 레이어 추가
                pass

            self.layers[f"Act{idx}"] = activation_layer[activation]()

            if self.use_dropout:  # 드랍아웃 사용 여부에 따른 레이어 추가
                pass

        # 출력층
        postfix = self.hidden_layer_num + 1
        self.layers[f"FC{postfix}"] = FCLayer(
            self.params[f"w{postfix}"],
            self.params[f"b{postfix}"],
        )

        self.last_layer = SoftmaxLayer()

    def __init_weight(self, weight_init_std):
        pass

## Trainer 클래스를 활용하여 딥러닝 파이프라인 만들기