In [150]:
# MNIST 분류 모델 97% 이상 달성

# Trainer 클래스와 콜백을 활용하여, MNIST 손글씨 숫자 데이터셋을 분류하는 MLP 모델 구축.
# 1. torchvision.dataset.MINIST를 활용하여 데이터셋과 데이터로더를 준비. ok
# 2. Trainer 객체 생성. ok
#    CheckpointCallback, EarlyStoppingCallback, LoggingCallback 사용
#    CheckpointCallback : 모델 체크포인트 저장 ok
#    EarlyStoppingCallback : 검증 손실이 일정 epoch 동안 향상되지 않으면 학습 조기 종료 ok
#    LoggingCallback : 학습 로그 출력 (epoch, loss) ok
# 3. 분류 97% 이상 달성. (lr, parmateter 조정) ok
# 4. 모든 랜덤 시드 (random, numpy, torch)를 고정했을 때, 항상 동일한 결과가 나와야 함. (재현성) ok
#    테스트 정확도 +-0.2%
# 5. 실행 후, 학습 로그가 정상적으로 출력되고 가장 좋은 성능의 모델의 체크포인프 파일(.pth)가 실제로 생성되어야 함.

In [151]:
# 뭔 보안 인증 문제로 인증 검증을 비활성화 하는 코드가 필요하다고 함.
import ssl
import time, os
import pandas as pd

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

from typing import Tuple, List, Optional
from dataclasses import dataclass, field

# MNIST 데이터셋 준비
from torchvision import datasets, transforms

# SSL 인증서 검증 비활성화
ssl._create_default_https_context = ssl._create_unverified_context

In [152]:
# 모델의 기본 구조 정의
@dataclass
class TrainingConfig:
    model = None
    epochs : int = 30
    seed: int = 42
    learning_rate : float = 1e-4
    batch_size = 64
    hidden_layers : List[int] = field(default_factory = lambda : [128, 64])
    use_mixed_precision: bool = True

    def __post_init__(self):
        if self.learning_rate <= 0:
            raise ValueError("Learning rate must be positive.")
        if self.epochs <= 0:
            raise ValueError("Number of epochs must be positive.")
        if self.batch_size <= 0:
            raise ValueError("Batch size must be positive.")


In [153]:
# 2.
# Callback을 활용하여 Trainer 객체를 정의.
# Callback 기본 구조 정의
class BaseCallback:
    def on_train_begin(self, trainer): pass
    def on_epoch_begin(self, trainer): pass
    def on_batch_end(self, trainer): pass
    def on_epoch_end(self, trainer): pass
    def on_train_end(self, trainer): pass
    def checkpoint(self, trainer): pass # on_epoch_end 시점에 이전보다 성능이 좋으면 .pth 파일로 모델 저장.
    def earlystoppint(self, trainer): pass # 일정 횟수 이상 성능 향상이 없으면 학습 조기 종료.

In [154]:
# 간단 MLP 구축
class MLP(nn.Module):
    def __init__(self, in_layer, h_layer, out_num) :
        super().__init__()
        self.flatten_size = in_layer * in_layer
        self.in_layer = nn.Linear(self.flatten_size, h_layer[0])
        self.h_layer = nn.Linear(h_layer[0], h_layer[1])
        self.out_layer = nn.Linear(h_layer[1], out_num)

    def forward(self, x):
        # vector화
        x = x.view(-1, self.flatten_size)
        # 순전파
        x = F.relu(self.in_layer(x))
        x = F.relu(self.h_layer(x))
        return self.out_layer(x)

In [155]:
# Trainer 클래스 정의
class Trainer:
    def __init__(self, model, train_loader, test_loader, optimizer, loss_fn, 
                 # optim.lr_scheduler._LRScheduler -> learning rate 스케줄러. 학습 도중에 학습률을 동적으로 조정하는 데 사용됨.
                 scheduler: Optional[torch.optim.lr_scheduler._LRScheduler] = None,
                 callbacks: Optional[List[BaseCallback]] = None,
                 device: str = "cuda"):
        
        self.model = model
        self.train_loader = train_loader
        # self.val_loader = val_loader
        self.test_loader = test_loader
        self.optimizer = optimizer
        self.loss_fn = loss_fn
        self.scheduler = scheduler
        self.callbacks = callbacks if callbacks else []
        self.device = device
        # state 변수에는 학습의 상태를 지속적으로 업데이트 함.
        self.state = {}

    # 콜백 메서드를 실행하는 함수
    def _run_callbacks(self, event_name : str):
        for callback in self.callbacks:
            # print(f"callback {event_name}")
            # getattr 함수 : 객체의 속성, 메서드를 동적으로 가져올 때 사용함.
            # 여기에서는 콜백의 각 이벤트 메서드를 호출하는 용도로 사용됨.
            # 즉, 콜백에서만 사용되는 함수는 아님.
            getattr(callback, event_name)(self)

    # 학습을 수행하는 파트
    def fit(self, num_epochs: int, config : TrainingConfig):
        self._run_callbacks("on_train_begin") # 학습 시작 시점에 콜백 불러서 state 초기화
        self.state['config'] = config
        # 모델 학습 시작
        for i in range(num_epochs):
            self.state["epoch"] = i + 1
            self._run_callbacks("on_epoch_begin")

            # 학습/검증 루프 
            for idx, data in enumerate(self.train_loader) :  # 배치 학습 루프
                input, output = data
                # TODO: 모델 삽입
                # 순전파
                x = self.model(input)
                loss = self.loss_fn(x, output)

                # 역전파
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()

                r_loss = self.state.get("running_loss")
                self.state['cur_loss'] = loss.item()
                if r_loss is None:
                    self.state["running_loss"] = loss.item()
                else : 
                    self.state["running_loss"] += loss.item()
                    
                # self._run_callbacks("on_batch_end")

            self._run_callbacks("on_epoch_end")
            
            # 지난 loss 와 현재 loss의 차이가 별로 없을 경우, no improve +1
            loss_dif = abs(self.state.get('cur_loss') - self.state.get('prev_loss'))
            # print(loss_dif)
            if loss_dif <= 1e-4:
                print("no improve loss")
                self.state['no_improve_epochs'] += 1
            self.state['prev_loss'] = self.state.get('cur_loss')

            self.test()

            self._run_callbacks("earlystoppint")
            
        self.state['cur_model_state'] = None # TODO: 현재 모델의 상태 저장
        self._run_callbacks("checkpoint") # 매 epoch 마다 체크포인트 저장
        self._run_callbacks("on_train_end")

    # 테스트를 하는 함수
    def test(self):
        correct = 0
        total = 0

        with torch.no_grad():
            for idx, data in enumerate(self.test_loader):
                input, output = data
                result = self.model(input)
                # 각 예측값 중, 가장 높은 것을 답으로 선택
                _, pred = torch.max(result.data, 1) 
                total += output.size(0)
                correct += (pred == output).sum().item() # 맞춘 갯수만큼 더해줌
                
        acc = 100 * correct / total

        abs_acc = abs(self.state.get('cur_acc') - acc)
        print(f"[debug] {abs_acc}")
        if  abs_acc <= 1e-5:
            print("[debug] no_improve_epochs + 1")
            imp = self.state.get('no_improve_epochs')
            self.state['no_improve_epochs'] = int(imp) + 1

        if self.state.get('best_acc') < self.state.get('cur_acc') :
            self.state['no_improve_epochs'] = 0 # acc 가 오르면 no improve 초기화
            print("[debug] cur acc > best acc")
            self.state['best_acc'] = acc
            self.state['cur_acc'] = acc
        else:
            self.state['cur_acc'] = acc
        
        print(f"accuracy : {acc:.4f}%")
        return acc

In [156]:
class LoggingCallBack(BaseCallback):        
    def on_train_begin(self, trainer : Trainer) :
        # print("=== Training started ===")
        trainer.state['config'] = None
        trainer.state['epoch'] = 0 # epoch를 담을 state
        trainer.state['epoch_start_time'] = 0
        trainer.state['epoch_end_time'] = 0
        trainer.state['batch'] = 0 # batch 순번을 담을 state
        trainer.state['stop'] = False # 학습 종료 여부
        trainer.state['running_loss'] = None # running중의 loss를 기록
        trainer.state['cur_loss'] = 0 # 현재 loss
        trainer.state['prev_loss'] = 0 # 저번 loss
        trainer.state['best_acc'] = 0 # 가장 좋은 성능의 acc
        trainer.state['cur_acc'] = 0 # 현재 성능의 acc
        trainer.state['best_metric'] = None # 가장 좋은 성능을 기록
        trainer.state['cur_model_state'] = None # 현재 모델의 파라미터
        trainer.state['best_model_state'] = None # 가장 성능 좋은 모델의 파라미터
        trainer.state['no_improve_epochs'] = 0 # 성능 향상이 없는 epoch 수

    def on_epoch_begin(self, trainer) : 
        # print(f"=== Epoch {trainer.state.get('epoch')} started ===")
        trainer.state['epoch_start_time'] = time.time()

    def on_batch_end(self, trainer) : 
        print(f"=== Batch {trainer.state.get('batch')} processed ===")

    def on_epoch_end(self, trainer): 
        trainer.state['epoch_end_time'] = time.time()
        epoch = trainer.state.get('epoch')
        epochs = trainer.state.get('config').epochs
        loss = trainer.state.get('cur_loss')
        spend_time = trainer.state.get('epoch_end_time') - trainer.state.get('epoch_start_time')
        print(f"=== Epoch {epoch}/{epochs} | loss {loss:.4f} | epoch time {spend_time:.4f} s ===")

    def on_train_end(self, trainer):
        if trainer.state.get('stop') == True :
            print("=== Training ended ===")

    # on_epoch_end 시점에 이전보다 성능이 좋으면 .pth 파일로 모델 저장.
    def checkpoint(self, trainer) : 
        cur_model_state = trainer.state.get('cur_model_state')
        prev_model_state = trainer.state.get('best_model_state')
        conf = trainer.state.get('config')

        daytime = time.strftime('%Y%m%d_%H%M%S')

        column = ["code", "model", "epoch", "seed", "learning_rate", "batch_size", "hidden_layers", "use_mixed_precision"]
        value = [[f"{daytime}", conf.model, conf.epochs, conf.seed, conf.learning_rate, conf.batch_size, conf.hidden_layers, conf.use_mixed_precision]]
        df = pd.DataFrame(value, columns = column)

        if prev_model_state is None or cur_model_state['accuracy'] > prev_model_state['accuracy'] :
            trainer.state['best_model_state'] = cur_model_state # 가장 성능 좋은 모델의 파라미터를 교체
            print("New best model found, saving checkpoint...")
            
            torch.save(cur_model_state, f"best_model_{daytime}.pht")

            if not os.path.exists("config.csv"):
                df.to_csv("config.csv", index = False)
            else:
                df_config = pd.read_csv("config.csv")
                df_config = pd.concat([df_config, df], ignore_index = True)
                df_config.to_csv("config.csv", index = False)

    def earlystoppint(self, trainer): # 일정 횟수 이상 성능 향상이 없으면 학습 조기 종료.
        if trainer.state.get('no_improve_epochs') >= 3 :
            print("Early stopping triggered.")
            trainer.state['stop'] = True

In [159]:
m_conf = TrainingConfig()
m_conf.learning_rate = 1e-2
callback = LoggingCallBack()

torch.manual_seed(m_conf.seed) # seed 고정해서 dataloader의 random shuffle을 고정시킴

# normalized 된 MNIST 데이터셋을 Tensor 형태로 변환하여 불러옴.
# print("=== Load Train Data ===")
train_data = datasets.MNIST('./train_data', train = True, download = True,
                      transform = transforms.Compose([
                          transforms.ToTensor(), 
                          transforms.Normalize(mean = (0.5,), std = (0.5,))]))
# print("=== Load Test Data ===")
test_data = datasets.MNIST('./test_data', train = False, download = True,
                      transform = transforms.Compose([
                          transforms.ToTensor(), 
                          transforms.Normalize(mean = (0.5,), std = (0.5,))]))

# 데이터 로더에 데이터셋을 담아서 정의
train_loader = DataLoader(dataset = train_data, batch_size = m_conf.batch_size, shuffle = True)
test_loader = DataLoader(dataset = test_data, batch_size = m_conf.batch_size, shuffle = True)

model = MLP(28, m_conf.hidden_layers, m_conf.epochs)
optim = torch.optim.Adam(model.parameters(), m_conf.learning_rate)
loss_f = nn.CrossEntropyLoss()

trainer = Trainer(model, train_loader, test_loader, optim, loss_f, None, [callback])
trainer.fit(m_conf.epochs, m_conf)

=== Epoch 1/30 | loss 0.2361 | epoch time 4.0339 s ===
[debug] 91.99
accuracy : 91.9900%
=== Epoch 2/30 | loss 0.1948 | epoch time 3.9429 s ===
[debug] 2.410000000000011
[debug] cur acc > best acc
accuracy : 94.4000%
=== Epoch 3/30 | loss 0.2759 | epoch time 3.8907 s ===
[debug] 1.3000000000000114
accuracy : 93.1000%
=== Epoch 4/30 | loss 0.1522 | epoch time 3.8248 s ===
[debug] 1.7800000000000011
accuracy : 94.8800%
=== Epoch 5/30 | loss 0.0307 | epoch time 4.2618 s ===
[debug] 0.20000000000000284
[debug] cur acc > best acc
accuracy : 95.0800%
=== Epoch 6/30 | loss 0.1891 | epoch time 4.0600 s ===
[debug] 1.289999999999992
accuracy : 93.7900%
=== Epoch 7/30 | loss 0.0164 | epoch time 4.7943 s ===
[debug] 0.8399999999999892
accuracy : 94.6300%
=== Epoch 8/30 | loss 0.1068 | epoch time 4.5302 s ===
[debug] 0.6899999999999977
accuracy : 93.9400%
=== Epoch 9/30 | loss 0.2247 | epoch time 4.1515 s ===
[debug] 1.0600000000000023
accuracy : 95.0000%
=== Epoch 10/30 | loss 0.4303 | epoch time