# Dataset and DataLoader

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

PyTorch에서는 데이터를 효율적으로 관리하고 모델에 전달하기 위해 `Dataset` 과 `DataLoader` 를 사용합니다

## Dataset 클래스
``torch.utils.data.Dataset``는 데이터셋을 정의할때 사용하는 추상 클래스(abstract class)입니다.

<b>사용자 정의 데이터셋 (Custom dataset)</b>을 정의하려면 이 클래스를 상속하고 다음 세 가지 메서드를 override해야 합니다:

- `__init__` : 생성자
- `__len__`을 구현하여 `len(dataset)` 호출 시 데이터셋의 크기 (전체 샘플 수)를 반환하도록 합니다.
- `__getitem__` 을 구현하여 `dataset[i]` 호출 시 `i`번째 샘플 `(input, target)`을 반환하도록 합니다.

아래는 텐서 `X` 와 `y`로 부터 사용자 정의 데이터셋 클래스 `CustomDatasetExample`를 만드는 코드입니다.

In [None]:
class CustomDatasetExample(Dataset):
    def __init__ (self , X, y):
        self.X = X
        self.y = y
        assert len(X) == len(y)

    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, index):
        return self.X[index], self.y[index]

In [None]:
data_x = torch.rand([10, 3], dtype=torch.float32) #example tensor dataset with 3 feature and 10 examples
data_y = torch.arange(10) # target y

example_dataset = CustomDatasetExample(data_x, data_y)
print(f"len(dataset): {len(example_dataset)}")
print(f"4-th example: {example_dataset[3]}\n")
for X, y in example_dataset:
    print(f"X: {X},\ty: {y}")


<mark>실습</mark> `CustomImageDataset`을 완성하세요

CSV파일(`metadata.csv`)로 부터 이미지 경로(`image_path`)와 라벨(`label`: cat or dog)정보를 읽어 `(image, target)` 튜플을 반환하는 `CustomImageDataset`을 구현해봅니다

- `__init__`
  - `root_dir`: 이미지와 CSV파일이 저장된 경로입니다
  - `metadata_filename` : 메타데이터(라벨 및 이미지 경로)가 저장된 CSV 파일 이름입니다.
- `__len__` : 데이터셋의 크기 (전체 샘플 수)를 반환합니다
- `__getitem__` : `idx`번째 example `(image, target)`을 반환합니다
  - `self.root_dir`과 CSV파일의 "image_path" 컬럼의 값을 이용하여 이미지 경로를 얻을 수 있습니다.
  - `image`는 이미지를 [`PIL.Image.open`](https://pillow.readthedocs.io/en/stable/reference/Image.html) 함수를 이용해 불러온 뒤 반환합니다.
  - `target`은 문자열("cat", "dog")을 정수(`0`, `1`)로 매핑하여 반환합니다. `self.class_to_idx` 딕셔너리를 사용하세요

In [None]:
pd.read_csv('resources/cat_dog_images/metadata.csv')

In [None]:
from PIL import Image
import os

class CustomImageDataset(Dataset):
    def __init__(self, root_dir, metadata_filename, transform=None, target_transform=None):
        self.metadata_df = pd.read_csv(os.path.join(root_dir, metadata_filename))
        self.root_dir = root_dir

        class_names = self.metadata_df["label"].drop_duplicates().sort_values().tolist()
        self.class_to_idx = {cls_name: i for i, cls_name in enumerate(class_names)}

        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        ##### YOUR CODE START #####  

        ##### YOUR CODE END #####

    def __getitem__(self, idx):
        ##### YOUR CODE START #####  




        ##### YOUR CODE END #####

        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            target = self.target_transform(target)

        return image, target

In [None]:
transform = transforms.Compose([
    transforms.ToTensor(),
])

custom_dataset = CustomImageDataset(root_dir = 'resources/cat_dog_images', 
                                   metadata_filename = "metadata.csv",
                                   transform = transform)

for i in range(len(custom_dataset)):
    image, target = custom_dataset[i]
    print(f"{i+1}-th example: X.shape = {image.shape}, label = {target}")

읽어온 이미지 (PIL object)를 `torch.tensor`로 변환하기 위해서는 [`torchvision.transforms.ToTensor()`](https://pytorch.org/vision/main/generated/torchvision.transforms.ToTensor.html)를 이용할 수 있습니다.

이 함수는 `PIL Image` 또는 `numpy.ndarray` 객체를 `torch.FloatTensor`로 변환해줍니다
- 변환된 텐서는 `(C, H, W)`의 shape을 가집니다.
- 변환된 텐서는 `[0.0, 1.0]`사이의 값을 가집니다.

`transforms.ToTensor()`의 `__call__` 메직메서드를 호출하며 PIL image를 전달하면 결과값을 텐서로 반환받습니다.

In [None]:
image = Image.open("resources/cat_dog_images/images_001/dog-01.jpg")
print("type(image): ", type(image))

image_tensor = transforms.ToTensor()(image)
print("image_tensor.shape: ", image_tensor.shape)
print("\nimage_tensor: ", image_tensor)

In [None]:
def visualize_samples(dataset, cols=8, rows=5):
    figure = plt.figure(figsize=(12, 6))
    for i in range(len(dataset)):
        img, label = dataset[i]
        figure.add_subplot(rows, cols, i+1)
        plt.title(label)
        plt.axis("off")
        plt.imshow(img.numpy().transpose((1, 2, 0)))
    plt.show()

In [None]:
visualize_samples(custom_dataset, rows = 2 , cols = 3)

## DataLoader
`DataLoader`는 `Dataset`을 배치 단위(mini-batch) 묶어주며, `shuffle`, 병렬 처리(`num_workers`) 등을 지원합니다

하지만 우리가 정의한 `CustomImageDataset`은 모든 샘플의 이미지 크기가 같지 않기 때문에 아래와 같이 dataloader를 이용해 mini-batch로 쌓으려고 하면 오류가 생깁니다.

In [None]:
train_dataloader = DataLoader(dataset= custom_dataset, batch_size=6, shuffle=True, num_workers=1)

for X, y in train_dataloader:
    print(X.shape, y)

`CustomImageDataset`에서 이미지들을 고정된 사이즈로 resize한 뒤 리턴함으로써 이 문제를 해결할 수 있습니다. 이를 위해 `transforms.Resize()`를 `transform`에 추가해 줍니다.
- [`transforms.Resize()`](https://pytorch.org/vision/main/generated/torchvision.transforms.Resize.html#torchvision.transforms.Resize)는 이미지를 주어진 크기로 resize해줍니다.

이 외에도 `torchvision.transforms`에 다양한 이미지 전처리 함수가 구현되어있으며, 자세한 내용은 [docs](https://pytorch.org/vision/main/transforms.html)을 참고하기 바랍니다.

In [None]:
train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

custom_datset_train = CustomImageDataset(root_dir = 'resources/cat_dog_images', 
                                         metadata_filename = "metadata.csv",
                                         transform = train_transform)

visualize_samples(custom_datset_train, rows = 2 , cols = 3)

In [None]:
train_dataloader = DataLoader(dataset = custom_datset_train, batch_size=4, shuffle=True, num_workers=1)
print("--- First iteration ---")
for X, y in train_dataloader:
    print("X.shape: ", X.shape, "\ty:", y)

print("\n--- Second iteration ---")
for X, y in train_dataloader:   # shuffle = True이므로 매번 다른 순서로 example들을 가져옵니다.
    print("X.shape: ", X.shape, "\ty:", y)

`shuffle = True`이므로 DataLoader를 순회할 때 마다 이미지 순서가 뒤바뀌는 것을 확인할 수 있습니다.

# Multi-layer Perceptron (MLP) with one hidden layer

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from planar_utils import plot_decision_boundary, load_planar_dataset

In [None]:
X, y = load_planar_dataset()
print(f"X.shape = {X.shape}, y.shape = {y.shape}")
plt.scatter(X[:, 0], X[:, 1], c=y, s=40, cmap=plt.cm.Spectral);

위와 같이 "꽃 모양(flower shape)" 데이터를 분류하는 모델을 구현해 보겠습니다.
- `X`: `m = 400`개의 example과 `d = 2`개의 feature를 가진 입력 데이터. Shape = `(400, 2)` 
- `y`: target (0 = red, 1 = blue)

## 1st trial: Logistic Regression

In [None]:
import sklearn.linear_model

clf = sklearn.linear_model.LogisticRegressionCV(); # Train the logistic regression classifier
clf.fit(X, y.flatten())

# Plot the decision boundary for logistic regression
plot_decision_boundary(lambda x: clf.predict(x), X, y)
plt.title("Logistic Regression")

# Print accuracy
y_pred = clf.predict(X)
accuracy = np.mean(y_pred == y.squeeze()) * 100
print(f"Accuracy of logistic regression: {accuracy:.2f} %")

- 위와 같이 로지스틱 회귀 또는 선형 모델은 데이터를 선형으로만 나눌 수 었이 좋은 성능을 기대하기 어렵습니다.
- MLP 신경망은 비선형을 학습할 수 있기 때문에 더 좋은 성능을 기대할 수 있습니다!

## 2-layer Multi-layer Perceptron
<div style="background-color: white; width: fit-content; padding: 10px;">
    <img src="resources/MLP_2-layer.png" style="width:400px;">
</div>


**Notation 정리**:
- 위첨자 $[l]$ : $l$번째 layer를 표현
- 위첨자 $(i)$ : $i$번째 샘플(example)을 표현 (예: $x^{(i)}$ 는 $i^{th}$ training example을 의미)
- 아래첨자 $i$ : 벡터의 $i$ 번째 원소 (예: $a^{[1]}_i$은 $\mathbf{a}^{[1]}$의 $i$번째 원소).


### Forward pass
입력 벡터 $ \mathbf{x} \in \mathbb{R}^{d} $ 가 주어졌을때, forward pass는 아래와 같이 주어집니다.
$$
\mathbf{z}^{[1]} = \mathbf{W}^{[1]} \mathbf{x} + \mathbf{b}^{[1]} \in \mathbb{R}^{h}
\tag{1}
$$

$$
\mathbf{a}^{[1]} = \mathrm{ReLU}(\mathbf{z}^{[1]})
\tag{2}
$$

$$
\mathbf{z}^{[2]} = \mathbf{W}^{[2]} \mathbf{a}^{[1]} + \mathbf{b}^{[2]} \in \mathbb{R}^{C}
\tag{3}
$$

$$
\hat{\mathbf{y}} = \mathbf{a}^{[2]} = \mathrm{softmax}(\mathbf{z}^{[2]})
\tag{4}
$$

where:
- $d$: 입력 차원 수 (`in_dim`)
- $h$: 은닉층의 뉴런 수 (`hidden_dim`)
- $C$: 출력 클래스 수 (`out_dim`)
- $ \mathbf{W}^{[1]} \in \mathbb{R}^{h \times d} $, $ \mathbf{b}^{[1]} \in \mathbb{R}^{h} $: weights and bias for layer 1  
- $ \mathbf{W}^{[2]} \in \mathbb{R}^{C \times h} $, $ \mathbf{b}^{[2]} \in \mathbb{R}^{C} $: weights and bias for layer 2



### Activations functions
비선형성을 도입하기 위해 각 은닉층 뒤에 ReLU 함수를 적용합니다.

<img src="resources/activation_functions.png" style="width:500px;">

<mark>실습</mark> 위의 forawd pass를 참고하여, 은닉층이 1개인 `TwoLayerMLP`를 완성하세요
- [`nn.Linear()`](https://pytorch.org/docs/stable/generated/torch.nn.Linear.html)
- [`nn.ReLU()`](https://pytorch.org/docs/stable/generated/torch.nn.ReLU.html): 비선형 활성화 함수(Non-linear activations)입니다. 선형 변환 후에 적용하여 모델에 **비선형성**을 부여하며, 이를 통해 신명망이 복잡한 함수와 데이터 분포를 학습할수 있도록 도와줍니다.
- `softmax`는 적용하지 마세요. (`nn.CrossEntropyLoss()`는 내부적으로 softmax와 cross-entropy를 동시에 처리해줍니다.)


In [None]:
import torch
from torch import nn

class TwoLayerMLP(nn.Module):
    def __init__(self, in_dim, hidden_dim, out_dim):
        super().__init__()
        
        ##### YOUR CODE START #####  



        ##### YOUR CODE END #####

    def forward(self, x):
        ##### YOUR CODE START #####



        ##### YOUR CODE END #####

        return logits

모델의 Forward pass를 수행하려면, 초기화된 `model` 객체에 input data를 `()` 연산자를 통해 전달하면 됩니다. 이때 `nn.Module`에 정의된 `__call__` 매직메서드가 호출되며, 이 메서드는 역전파를 위한 필요한 연산들과 함께 `forward`를 호출해줍니다.

직접 ``model.forward()``를 호출하지 마세요!

In [None]:
model = TwoLayerMLP(in_dim = 10, hidden_dim = 20, out_dim = 2)  # initalize model
print(model)

X = torch.rand(16, 10) # dummy data with batch_size = 16
logits = model(X)
print(f"\nX.shape = {X.shape}, logits.shape = {logits.shape}\n")

In [None]:
for name, param in model.named_parameters():
    print(f"[Parameters] {name}\t| Size: {param.size()}")

모델의 출력 `logits` 값은 각 클래스에 대한 <b>비정규화된 점수(un-normalized scores)</b>를 나타냅니다.

`logits`값으로 부터 예측된 클래스 인덱스 `y_pred`를 계산하기 위해서는 아래와 같이 `nn.Softmax`와 `argmax`를 이용할 수 있습니다.

이때 `nn.Softmax`를 이용한 확률 계산은 loss를 계산하거나 확률 분포를 얻을때에는 필요하지만, 단순히 가장 가능성이 높은 클래스를 선택하는 데에는 필수적이지 않으므로 생략할 수 있습니다.

In [None]:
# logits has shape (batch_size, num_classes)
pred_prob = nn.Softmax(dim = 1)(logits) # Predicted probability for each class
print(f"Predicted probabilities (shape = {pred_prob.shape}):\n", pred_prob)

y_pred = pred_prob.argmax(axis = 1)
print(f"\nPredicted classes: {y_pred}")

## Loss Function
다중 클래스 분류에서 가장 널리 사용되는 Categorical Cross-Entropy Loss를 사용합니다.
$$
\mathcal{L} = - \frac{1}{m} \sum_{i=1}^{m} \sum_{j=1}^{C} y_j^{(i)} \log \left( \hat{y}_j^{(i)} \right)
\tag{5}
$$
- $m$ : 전체 학습 데이터 수 (Number of training examples)
- $C$ : 클래스 개수
- $ \mathbf{y}^{(i)} \in \{0, 1\}^C$: $i$번째 샘플의 one-hot encoded true label
- $ \hat{\mathbf{y}}^{(i)} \in [0, 1]^C $ : softmax를 통해 계산된 predicted probability vector

`loss.backward()`를 호출하면 자동으로 모든 학습 가능한 파라미터들에 대한 손실함수의 미분이 계산됩니다: 
$$\frac{\partial{\mathcal{L}}}{\partial{\mathbf{W}^{[1]}}}, 
\frac{\partial{\mathcal{L}}}{\partial{\mathbf{b}^{[1]}}}, 
\frac{\partial{\mathcal{L}}}{\partial{\mathbf{W}^{[2]}}}, 
\frac{\partial{\mathcal{L}}}{\partial{\mathbf{b}^{[2]}}}$$

<mark>실습</mark> 아래 함수 `train_flower_main`를 잘 읽어보고 딥러닝 학습의 전체 흐름을 다시 한번 복습해보세요

In [None]:
import torch
from torch import nn

def train_flower_main(hidden_dim, learning_rate, num_epochs):
    device = "cuda" if torch.cuda.is_available() else "cpu"

    # Load the dataset
    X_np, y_np = load_planar_dataset()
    X_tensor = torch.from_numpy(X_np).float().to(device)           # shape: (m, d)
    y_tensor = torch.from_numpy(y_np).long().squeeze().to(device)  # shape: (m)
    
    # Initialize the model
    in_dim = X_tensor.shape[1]          # Number of features
    out_dim = len(y_tensor.unique())    # binary classification
    model = TwoLayerMLP(in_dim, hidden_dim, out_dim).to(device)

    # Define loss and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr = learning_rate)

    # Training loop
    model.train()
    for epoch in range(num_epochs):
        # Forward pass
        logits = model(X_tensor)
        loss = criterion(logits, y_tensor)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if (epoch + 1) % 1000 == 0: # Print loss every 1000 epochs
            print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

    # Evaluation
    model.eval()
    y_pred = predict(model, X_tensor, device = device)
    accuracy = (y_pred == y_tensor).float().mean().item() * 100
    print(f'Training Accuracy : {accuracy:.2f}%')

    # Plot decision boundary
    plot_decision_boundary(lambda x: predict(model, x).cpu().numpy(), X_np, y_np.squeeze())
    plt.title(f"Decision Boundary for MLP with {hidden_dim} hidden units")
    plt.show()


def predict(model, X, device = "cpu"):
    model.to(device)

    with torch.no_grad():
        if isinstance(X, np.ndarray):
            X_tensor = torch.from_numpy(X).float().to(device)
        else:
            X_tensor = X.to(device)

        logits = model(X_tensor)
        probs = torch.softmax(logits, dim=1)
        preds = torch.argmax(probs, dim=1)
    return preds


<mark>실습</mark> 은닉층 크기에 따른 모델 성능
- 다양한 `hidden_dim` 값(1, 2, 4, 8, ...)에 대해 모델을 학습시키고, 결정 경계(Decision Boundary)의 형태와 학습 정확도(Training Accuracy)의 변화를 관찰해보세요
- `Training Accuracy > 87%`를 달성해보세요

In [None]:
train_flower_main(hidden_dim = 1, learning_rate = 0.1, num_epochs = 10000)

### 결과 해석
- 은닉 유닛 수(`hidden_dim`)가 많아질수록 모델의 <b>표현력(복잡도)</b>이 올라가며, 더 복잡한 형태의 결정 경계를 학습할 수 있습니다.
- 하지만 모델이 너무 크면, 훈련 데이터에 과도하게 맞추는 과적합(overfitting) 현상이 발생합니다. 이 경우, 모델은 훈련 데이터에는 성능이 높지만 새로운 데이터(test set)에 대해서는 오히려 성능이 저하됩니다. 
- 이는 모델이 훈련 데이터의 **유의미한 패턴뿐만 아니라 노이즈까지 암기**해버렸기 때문입니다. 즉, 데이터를 일반화(generalize)해서 이해하지 못하고 **암기(memoriziation)**를 중심으로 학습된 상태라고 볼 수 있습니다.

# Image Classification with MLP
이번에는 다층 퍼셉트론(MLP)을 활용하여 <b>이미지 분류(Image Classification)</b>를 수행해봅니다.  

여기서는 은닉층이 2개인 **3-Layer MLP**를 구성합니다.

## Forward Pass 구성

1. Flatten: 3차원 입력 이미지 (1, 28, 28)를 크기가 $ d = 1 \times 28 \times 28 = 784$인 1차원 텐서로 변환합니다.

2. Layer 1 (Input → Hidden1)
    $$
    \mathbf{z}^{[1]} = \mathbf{W}^{[1]} \mathbf{x} + \mathbf{b}^{[1]} \in \mathbb{R}^{h_1}
    $$

    $$
    \mathbf{a}^{[1]} = \mathrm{ReLU}(\mathbf{z}^{[1]})
    $$

   - $d$: 입력 차원 수 (`in_dim`)
   - $h_1$: 첫번째 은닉층의 뉴런 수
   - $\mathbf{W}^{[1]} \in \mathbb{R}^{h_1 \times d}$, $\mathbf{b}^{[1]} \in \mathbb{R}^{h_1}$: weights and bias for layer 1  


3. Layer 2 (Hidden1 → Hidden2)
    $$
    \mathbf{z}^{[2]} = \mathbf{W}^{[2]} \mathbf{a}^{[1]} + \mathbf{b}^{[2]} \in \mathbb{R}^{h_2}
    $$

    $$
    \mathbf{a}^{[2]} = \mathrm{ReLU}(\mathbf{z}^{[2]})
    $$
    - $h_2$: 두번째 은닉층의 뉴런 수
    - $\mathbf{W}^{[2]} \in \mathbb{R}^{h_2 \times h_1}$, $\mathbf{b}^{[2]} \in \mathbb{R}^{h_2}$: weights and bias for layer 2

4. Layer 3 (Hidden2 → Output)

    $$
    \mathbf{z}^{[3]} = \mathbf{W}^{[3]} \mathbf{a}^{[2]} + \mathbf{b}^{[3]} \in \mathbb{R}^{C}
    $$

    $$
    \hat{\mathbf{y}} = \mathbf{a}^{[3]} = \mathrm{softmax}(\mathbf{z}^{[3]})
    $$

    where:
    - $C$: 출력 클래스 수 (`out_dim`)
    - $\mathbf{W}^{[3]} \in \mathbb{R}^{C \times h_2}$, $\mathbf{b}^{[3]} \in \mathbb{R}^{C}$: weights and bias for layer 3

## Parameter, input and output shapes
Batch size를 $B$라고 했을때,
| Layer             | Input Shape     | Weight Shape        | Bias Shape     | Output Shape    |
|------------------|------------------|----------------------|----------------|------------------|
| Flatten           | $(B, 1, 28, 28)$ | -                    | -              | $(B, 784)$         |
| Layer 1           | $(B, 784)$         | $(h_1, 784)$            | $(h_1,)$          | $(B, h_1)$          |
| Layer 2           | $(B, h_1)$          |$(h_2, h_1)$             | $(h_2,)$          | $(B, h_2)$          |
| Layer 3           | $(B, h_2)$          | $(C, h_2)$              | $(C,)$           | $(B, C)$           |

참고: batch 처리를 위해 PyTorch에서는 아래와 같은 수식에 따라 계산됩니다.
```python
z1 = x @ W1.T + b1  # x: (B, d), W1: (h1, d)
```

In [None]:
import os, time, shutil

import torch
from torch import nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm

import wandb

<mark>실습</mark> MultiLayerPerceptron
위 수식을 만족하는 3-layer MLP 모델 `MultiLayerPerceptron`을 완성하세요.
- 여기서 $h_1 = h_2 = $`hidden_dim` 입니다
- [`nn.Sequential`](https://pytorch.org/docs/stable/generated/torch.nn.Sequential.html): 순서가 있는 모듈들의 컨테이너(container)로, 데이터는 컨테이너 내의 모든 모듈들을 정의된것과 같은 순서로 통과합니다.
- [`nn.Flatten`](https://pytorch.org/docs/stable/generated/torch.nn.Flatten.html)
- [`nn.Linear`](https://pytorch.org/docs/stable/generated/torch.nn.Linear.html)
- [`nn.ReLU`](https://pytorch.org/docs/stable/generated/torch.nn.ReLU.html)

In [None]:
class MultiLayerPerceptron(nn.Module):
    def __init__(self, in_dim, hidden_dim, out_dim):
        super().__init__()

        ##### YOUR CODE START #####  
        # Use nn.Sequential() to stack layers together.




        ##### YOUR CODE END #####

    def forward(self, x):
        ##### YOUR CODE START #####


        ##### YOUR CODE END #####

        return logits

In [None]:
# Test forward pass
model = MultiLayerPerceptron(in_dim = 1*28*28, hidden_dim = 512, out_dim = 10)
print(model)

X = torch.rand(16, 1, 28, 28) # dummy data for testing with batch_size 16
logits = model(X) 

print("\nlogits.shape: ", logits.shape)

<mark>실습</mark> 앞서 정의한 `model = MultiLayerPerceptron(in_dim = 28*28, hidden_dim = 512, out_dim = 10)`의 파라미터 수를 직접 손으로 계산하여 숫자로 기입해보세요 (숫자 계산식으로 입력해도 괜찮으나 파이썬 변수를 사용하지 마세요)

 - Weight $\mathbf{W}^{[1]}$의 파라미터 수는 얼마인가요?
 - Bias $\mathbf{b}^{[1]}$의 파라미터 수는 얼마인가요?
 - Weight $\mathbf{W}^{[2]}$의 파라미터 수는 얼마인가요?
 - Bias $\mathbf{b}^{[2]}$의 파라미터 수는 얼마인가요?
 - Weight $\mathbf{W}^{[3]}$의 파라미터 수는 얼마인가요?
 - Bias $\mathbf{b}^{[3]}$의 파라미터 수는 얼마인가요?

In [None]:
num_params_W1 = ...  # TODO: number of parameters in W1
num_params_b1 = ...  # TODO: number of parameters in b1
num_params_W2 = ...  # TODO: number of parameters in W2
num_params_b2 = ...  # TODO: number of parameters in b2
num_params_W3 = ...  # TODO: number of parameters in W3
num_params_b3 = ...  # TODO: number of parameters in b3

In [None]:
assert sum(p.numel() for p in model.parameters() if p.requires_grad) == (num_params_W1 + num_params_b1 + num_params_W2 + num_params_b2 + num_params_W3 + num_params_b3), "❌ 계산한 파라미터 수가 실제 모델과 일치하지 않습니다."
print('\033[92mAll tests passed!')

## WandB로 학습 추적하기

딥러닝 모델을 학습할 때, 손실값(loss)이나 정확도(accuracy) 등의 지표를 일일이 `list`로 저장하고 관리하는 것은 매우 번거로운 작업입니다.

앞으로는 이러한 지표들을 WandB라는 도구를 통해 자동으로 기록하고 시각화하겠습니다.

WandB를 사용하면 실험 결과를 웹 브라우저에서 직관적으로 확인하고, 여러 실험 결과들 손쉽게 비교할 수 있습니다.

<img src="resources/WandB.png" style="width:800px;">

<mark>실습</mark> [WandB](https://kr.wandb.ai/)에 회원가입을 한 후, 터미널(terminal)에서 다음 명령어를 통해 로그인하세요

```bash
$ wandb login YOUR_API_KEY
```

`YOUR_API_KEY`는 회원가입 후 발급받은 개인 API 키를 입력합니다.


## Training an MLP Model with WandB
아래 함수들은 지난 실습에서 작성한 학습코드를 다음의 측면에서 보다 효율적이고 확장 가능하게 개선한 것입니다:
- `tqdm`을 이용학 학습 진행 상황 및 시간 추적
- `save_checkpoint`와 `load_checkpoint`를 이용하여 모델을 저장하고 불러오는 기능 추가
- `AverageMeter` 클래스를 통한 성능 지표의 누적 평균 계산
- `config` 객체를 함수 외부에서 전달하여 하이퍼파라미터 튜닝을 유연하게 수행

<mark>실습</mark> 아래 코드들을 자세히 리뷰해보고, 적절한 위치에 wandb 로그 기능들을 추가해주세요. 다음의 metric들을 WandB에서 추적할 것입니다.
 - `epoch`
 - `Train Loss`
 - `Train Accuracy`
 - `Test Loss`
 - `Test Accuracy`

In [None]:
def load_MNIST_datasets(data_root_dir):
    transform = transforms.Compose([
        transforms.ToTensor(),
    ])
    
    train_dataset = datasets.MNIST(
        root=data_root_dir, train=True, download=True, 
        transform=transform
    )
    test_dataset = datasets.MNIST(
        root=data_root_dir, train=False, download=True, 
        transform=transform
    )

    return train_dataset, test_dataset

def create_dataloaders(train_dataset, test_dataset, device, batch_size, num_worker):
    kwargs = {}
    if device.startswith("cuda"):
        kwargs.update({
            'pin_memory': True,
        })

    train_dataloader = DataLoader(dataset = train_dataset, batch_size=batch_size, 
                                  shuffle=True, num_workers=num_worker, **kwargs)
    test_dataloader = DataLoader(dataset = test_dataset, batch_size=batch_size, 
                                 shuffle=False, num_workers=num_worker, **kwargs)
    
    return train_dataloader, test_dataloader

`train_one_epoch` 함수와 `evaluate_one_epoch`함수를 수정하여 아래의 지표들을 **WandB를 통해 추적**하세요:
 - `epoch`
 - `Train Loss`
 - `Train Accuracy`
 - `Test Loss`
 - `Test Accuracy`

WandB에 지표를 기록하려면 다음과 같은 형태로 `wandb.log()` 함수를 사용하세요:

```python
wandb.log({
    "지표1 이름": 값1,
    "지표2 이름": 값2,
    ...
})

In [None]:
def train_one_epoch(model, device, dataloader, criterion, optimizer, epoch):
    """ train for one epoch """
    loss_meter = AverageMeter('Loss', '.4e')
    accuracy_meter = AverageMeter('Accuracy', '6.2f')
    data_time = AverageMeter('Data_Time', '6.3f') # Time for data loading
    batch_time = AverageMeter('Batch_Time', '6.3f') # time for mini-batch train
    metrics_list = [loss_meter, accuracy_meter, data_time, batch_time, ]
    
    model.train() # switch to train mode

    end = time.time()

    progress_bar = tqdm(dataloader, desc=f'Training Epoch {epoch + 1}', total=len(dataloader))
    for images, target in progress_bar:
        data_time.update(time.time() - end)

        images = images.to(device, non_blocking=True)
        target = target.to(device, non_blocking=True)

        output = model(images)
        loss = criterion(output, target)

        accuracy = compute_accuracy(output, target)
        loss_meter.update(loss.item(), images.shape[0])
        accuracy_meter.update(accuracy, images.shape[0])

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        batch_time.update(time.time() - end)
        progress_bar.set_postfix(avg_metrics = ", ".join([str(x) for x in metrics_list]))
        end = time.time()
    progress_bar.close()

    ##### YOUR CODE START #####
    # wandb log variables: epoch, loss_meter.avg, accuracy_meter.avg with names "epoch", "Train Loss", "Train Accuracy"


    ##### YOUR CODE END #####

def evaluate_one_epoch(model, device, dataloader, criterion, epoch = 0, use_wandb = True):
    loss_meter = AverageMeter('Loss', '.4e')
    accuracy_meter = AverageMeter('Accuracy', '6.2f')
    metrics_list = [loss_meter, accuracy_meter]

    model.eval() # switch to evaluate mode

    with torch.no_grad():
        progress_bar = tqdm(dataloader, desc='Validation/Test', total=len(dataloader))
        for images, target in progress_bar:
            images = images.to(device, non_blocking=True)
            target = target.to(device, non_blocking=True)

            output = model(images)
            loss = criterion(output, target)

            accuracy = compute_accuracy(output, target)
            loss_meter.update(loss.item(), images.shape[0])
            accuracy_meter.update(accuracy, images.shape[0])

            progress_bar.set_postfix(avg_metrics = ", ".join([str(x) for x in metrics_list]))
        progress_bar.close()

    if use_wandb:
        ##### YOUR CODE START #####
        # wandb log variables: epoch, loss_meter.avg, accuracy_meter.avg with names "epoch", "Test Loss", "Test Accuracy"


        ##### YOUR CODE END #####

    return accuracy_meter.avg

class AverageMeter(object):
    """Tracks and updates the running average of a metric."""
    def __init__(self, metric_name , format_spec = '.4f'):
        self.metric_name = metric_name 
        self.format_spec = format_spec
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count

    def __str__(self):
        return f"{self.metric_name}: {format(self.avg, self.format_spec)} (n={self.count})"
    

def compute_accuracy(output, target):
    """
    Computes the top-1 classification accuracy .

    Args:
        output (torch.Tensor): Model outputs (logits or probabilities), shape (batch_size, num_classes)
        target (torch.Tensor): Ground truth labels, shape (batch_size,)

    Returns:
        float: Accuracy as a percentage.
    """
    with torch.no_grad():
        pred = output.argmax(dim=1)
        accuracy = (pred == target).float().mean().item() * 100.0
    return accuracy   

`train_model` 함수의 시작과 끝에 각각 `wandb.init()`과 `wandb.finish()`를 적절히 추가하여, 학습 과정의 주요 지표들을 WandB를 통해 추적할 수 있도록 설정하세요.

- `wandb.init()`은 실험을 시작하며 프로젝트 이름, 실험 이름, hyperparameter 등 다양한 설정(config)을 전달합니다.
- `wandb.finish()`는 실험을 종료하고 로그를 정리합니다.
- `wandb.init()`와 `wandb.finish()` 사이에 `wandb.log()`를 추가하여 학습 손실과 정확도 등의 다양한 지표들을 손쉽게 기록할 수 있습니다.


예시:
``` python
wandb.init(
    project = "YOUR_PROJECT_NAME",
    name = "YOUR_EXPERIMENT_NAME",
    config = {
        "learning_rate": 0.02,
        "architecture": "MLP",
        "dataset": "MNIST",
        "epochs": 20,
    }
)

for epoch in range(num_epochs):
    # your training and evaluation code
    wandb.log({"loss": loss, "accuracy": accuracy})

wandb.finish()
```

In [None]:
def main():
    """ 전체 학습 및 평과를 위한 최상위 진입점(main entry point) """

    config = {
        "mode": "train",  # Options: "train", "eval"
        "device": "cuda" if torch.cuda.is_available() else "cpu",

        ## data and preprocessing settings
        "data_root_dir": '/datasets',
        "num_workers": 4,

        ## Training Hyperparams
        "batch_size": 128,
        "learning_rate": 1e-2,
        "num_epochs": 20,

        ## checkpoints
        "checkpoint_path": "checkpoints/checkpoint.pth",    # Path to save the most recent checkpoint
        "best_model_path": "checkpoints/best_model.pth",    # Path to save the best model checkpoint
        "checkpoint_save_interval": 10,                     # Save a checkpoint every N epochs
        "resume_training": None,    # Options: "latest", "best", or None

        ## WandB logging
        "wandb_project_name": "MNIST-experiments",
        "wandb_experiment_name" : "MLP_L3_H512",
        "model_architecture": "MLP",
        "dataset_name": "MNIST"
    }

    ## Model architecture hyperparameters
    in_dim = 28*28
    hidden_dim = 512

    train_dataset, test_dataset = load_MNIST_datasets(config["data_root_dir"])
    num_classes = len(train_dataset.classes)

    model = MultiLayerPerceptron(in_dim=in_dim, hidden_dim=hidden_dim, out_dim=num_classes)

    print(f"Using {config['device']} device")

    if config["mode"] == "train":
        train_model(model, train_dataset, test_dataset, config)
    elif config["mode"] == "eval":
        load_and_evaluate_model(model, test_dataset, config)
    else:
        raise ValueError(f"Unknown mode: {config['mode']}")


def train_model(model, train_dataset, test_dataset, config):
    device = config["device"]

    ##### YOUR CODE START #####
    # Initialize WandB experiment
    #  - 프로젝트 이름(project)은 config["wandb_project_name"]에서 가져옵니다.
    #  - 실험 이름(name)은 config["wandb_experiment_name"]에서 가져옵니다.
    #  - 실험 설정(config)은 config 전체를 그대로 전달합니다.


    ##### YOUR CODE END #####

    train_dataloader, test_dataloader = create_dataloaders(train_dataset, test_dataset, device, 
                                                           batch_size = config["batch_size"], 
                                                           num_worker = config["num_workers"])

    model.to(device)
    criterion = nn.CrossEntropyLoss().to(device)
    optimizer = torch.optim.SGD(model.parameters(), lr = config["learning_rate"])

    start_epoch = 0
    best_accuracy = 0

    if config["resume_training"]:
        load_checkpoint_path = config["best_model_path"] if config["resume_training"] == "best" else config["checkpoint_path"]
        start_epoch, best_accuracy = load_checkpoint(load_checkpoint_path, model, optimizer, device)

    for epoch in range(start_epoch, config["num_epochs"]):
        train_one_epoch(model, device, train_dataloader, criterion, optimizer, epoch)
        test_accuracy = evaluate_one_epoch(model, device, test_dataloader, criterion, epoch)

        ## save checkpoint
        if (epoch + 1) % config["checkpoint_save_interval"] == 0 or (epoch + 1) == config["num_epochs"]: 
            is_best = test_accuracy > best_accuracy
            best_accuracy = max(test_accuracy, best_accuracy)
            save_checkpoint(config["checkpoint_path"], model, optimizer, epoch, best_accuracy, is_best, config["best_model_path"])

    ##### YOUR CODE START #####
    # WandB 실험을 종료합니다.

    ##### YOUR CODE END #####

    return best_accuracy


def save_checkpoint(filepath, model, optimizer, epoch, best_accurcay, is_best, best_model_path):
    save_dir = os.path.split(filepath)[0]
    os.makedirs(save_dir, exist_ok=True)

    state = {
        'state_dict': model.state_dict(),
        'optimizer': optimizer.state_dict(),
        'epoch': epoch + 1,
        'best_accurcay': best_accurcay,
    }
    
    torch.save(state, filepath)
    if is_best:
        shutil.copyfile(filepath, best_model_path)


def load_checkpoint(filepath, model, optimizer, device):
    if os.path.isfile(filepath):
        checkpoint = torch.load(filepath, map_location=device)
        model.load_state_dict(checkpoint['state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer'])
        start_epoch = checkpoint['epoch']
        best_accurcay = checkpoint['best_accurcay']
        print(f"=> loaded checkpoint '{filepath}' (epoch {start_epoch})")
        return start_epoch, best_accurcay
    else:
        print(f"=> no checkpoint found at '{filepath}'")
        return 0, 0
    

def load_and_evaluate_model(model, test_dataset, config):
    """ Load model checkpoint from config["best_model_path"] and evaulate the model """

    device = config["device"]

    test_dataloader = DataLoader(dataset = test_dataset, batch_size=config["batch_size"], 
                                  shuffle=False, num_workers=config["num_workers"])
    
    model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr = config["learning_rate"])  # Dummy optimizer to satisfy checkpoint loader
    _, _ = load_checkpoint(config["best_model_path"], model, optimizer, device)

    test_accuracy = evaluate_one_epoch(model, device, test_dataloader, criterion, use_wandb = False)
    print(f"Test-set Accuracy: {test_accuracy:.2f}%")

    return test_accuracy

학습을 수행한 뒤 W&B 홈페이지를 통해 결과를 살펴보세요.

In [None]:
main()

<mark>실습</mark> `.gitignore` 파일 수정

위 코드를 따라 학습을 진행하면 다음과 같은 폴더들이 자동으로 생성됩니다:

1. `wandb/`: WandB를 통해 실험을 추적하면, 로그 파일과 메타데이터가 이 폴더에 저장됩니다.

2. `checkpoints/` : 학습 도중 저장되는 모델 체크포인트 파일들이 이 폴더에 저장됩니다.

이 두 폴더는 용량이 크고 변경내용 추적이 불필요하기 때문에 **GitHub 저장소에 업로드할 필요가 없습니다.**

`.gitignore` 파일을 적절히 수정하여 `wandb/`폴더와 `checkpoints/`가 git 추적 대상에서 제외되도록 설정하세요.

<mark>주의</mark> 수정이 완료된 `.gitignore`파일도 함께 `git push`하는 것을 잊지 마세요


-----

(참고) shell 환경변수를 설정하여 wandb를 끌수도 있습니다.

```python
os.environ['WANDB_DISABLED'] = 'true'
main()
```

## BetterMLP
<mark>실습</mark> 이미지 분류 정확도를 높이기 위해 더 나은 MLP모델 `BetterMLP`를 설계해보세요. 

주어진 `BetterMLP` 클래스의 네트워크 구조를 자유롭게 구성하여, **test accuracy를 최대한 향상**시키는 것이 목표입니다.

 - Convolutional Layer나 `torchvision.models`의 사전학습(pre-trained) 모델을 사용하는 것은 <u>허용되지 않습니다<u>
 - 오직 Multi-layer Perceptron (MLP) 구조만을 이용하여 `test set accuracy > 95.0%`를 달성하세요
 - 해당 정확도를 달성한 모델의 <mark>checkpoint 파일을 github에 함께 push하세요</mark>. (아래 코드에 따라 `submitted_checkpoints/` 폴더에 checkpoint가 저장됩니다.)

In [None]:
class BetterMLP(nn.Module):
    
    def __init__(self, in_dim, out_dim):
        super().__init__()

        ##### YOUR CODE START #####  


        ##### YOUR CODE END #####

    def forward(self, x):
        ##### YOUR CODE START #####


        ##### YOUR CODE END #####

        return logits

In [None]:
# Test forward pass
model = BetterMLP(in_dim = 1*28*28, out_dim = 10)
print(model)

X = torch.rand(16, 1, 28, 28) # dummy data for testing with batch_size 16
logits = model(X) 

print("\nlogits.shape: ", logits.shape)

<mark>주의</mark> `main_BetterMLP()`함수와 `config` 값을 수정하는것은 <u>허용되지 않습니다</u>

즉 batch_size, learning_rate, num_epochs 등과 같은 학습 관련 하이퍼파라미터 튜닝을 수행하지 <u>않고<u> 오직 모델 구조(`BetterMLP`) 만 변경하여 최고의 성능을 달성해보세요.

In [None]:
config = {
    "mode": "train",  # Options: "train", "eval"
    "device": "cuda" if torch.cuda.is_available() else "cpu",

    ## data and preprocessing settings
    "data_root_dir": '/datasets',
    "num_workers": 4,

    ## Training Hyperparams
    "batch_size": 128,
    "learning_rate": 1e-2,
    "num_epochs": 20,

    ## checkpoints
    "checkpoint_path": "submitted_checkpoints/checkpoint.pth",    # Path to save the most recent checkpoint
    "best_model_path": "submitted_checkpoints/best_model.pth",    # Path to save the best model checkpoint
    "checkpoint_save_interval": 10,                     # Save a checkpoint every N epochs
    "resume_training": None,    # Options: "latest", "best", or None

    ## WandB logging
    "wandb_project_name": "MNIST-experiments",
    "wandb_experiment_name" : "BetterMLP",
    "model_architecture": "MLP",
    "dataset_name": "MNIST"
}

def main_BetterMLP(config):
    train_dataset, test_dataset = load_MNIST_datasets(config["data_root_dir"])
    num_classes = len(train_dataset.classes)

    model = BetterMLP(in_dim=1*28*28, out_dim=num_classes)

    print(f"Using {config['device']} device")

    if config["mode"] == "train":
        test_accuracy = train_model(model, train_dataset, test_dataset, config)
    elif config["mode"] == "eval":
        test_accuracy = load_and_evaluate_model(model, test_dataset, config)
    else:
        raise ValueError(f"Unknown mode: {config['mode']}")
    
    return test_accuracy

In [None]:
main_BetterMLP(config)

<mark>주의</mark> 실습 과제를 제출하기 전 아래 코드를 통해 저장된 checkpoint가 `test set accuracy > 95.0%`의 성능을 달성했는지 다시한번 확인해보세요.

In [None]:
config["mode"] = "eval"
main_BetterMLP(config)