In [15]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
import matplotlib.pyplot as plt
import optuna
from tqdm.notebook import tqdm  # tqdm.auto 대신 tqdm.notebook 사용
from optuna.visualization import plot_optimization_history, plot_param_importances
import random
import numpy as np
import json
import os
from sklearn.model_selection import KFold

In [16]:
# ========================
# 1. 시드 고정 (재현성 확보)
# ========================
def set_seed(seed=42):
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
    np.random.seed(seed)
    random.seed(seed)
    torch.backends.cudnn.deterministic = True

set_seed(42)

### 1. 데이터 변환(Transforms)에서의 난수
- 데이터 증강(transforms.RandomRotation, transforms.RandomResizedCrop, transforms.RandomAffine) 과정에서 난수를 사용하여 이미지를 무작위로 회전, 크기 조정, 이동시킵니다.

- 가우시안 노이즈(AddGaussianNoise)도 난수를 통해 추가됩니다.

→ 시드를 고정하지 않으면 매 학습 시도마다 데이터 증강 결과가 달라지고, 모델 학습 결과가 달라질 수 있습니다.

### 2. K-Fold 작업에서의 난수
- K-Fold 교차 검증에서 데이터를 무작위로 섞기(shuffle=True) 위해 난수가 필요합니다.

- random_state를 설정해 시드를 고정하면 동일한 데이터 분할을 재현할 수 있습니다.

→ 시드를 고정하지 않으면 각 fold에서 데이터셋 분할이 매번 달라지며, 실험 결과를 비교하기 어렵습니다.

### 3. 딥러닝 모델의 내부 난수
- 모델 가중치 초기화(초기값은 난수로 설정).

- 드롭아웃(Dropout)에서의 노드 선택.

- 데이터 로더에서 shuffle=True 옵션.

→ 시드를 고정하지 않으면 모델 학습의 시작 조건이 매번 달라지고, 결과가 일관되지 않을 수 있습니다.

### 요약
- 시드를 고정하는 이유는 데이터를 증강하거나 분할할 때 발생하는 난수의 일관성을 유지하여, 실험 결과의 재현성을 확보하고 신뢰성을 높이기 위해서입니다.
이 과정은 특히 딥러닝 연구나 하이퍼파라미터 튜닝(예: Optuna)에서 필수적입니다.

In [17]:
# ========================
# 2. 데이터 증강 클래스 정의 (가우시안 노이즈 추가)
# ========================
class AddGaussianNoise(object):
    def __init__(self, mean=0., std=0.1):
        self.mean = mean # 평균 mean 0 
        self.std = std # 표준 편차 standard deviation 0.1 
    
    def __call__(self, tensor):
        # 기존 텐서에 가우시안 노이즈를 더해서 리턴
        return tensor + torch.randn(tensor.size()) * self.std + self.mean
    
    def __repr__(self):
        # __repr__은 객체의 디버깅용 표현을 정의하며, repr() 함수 호출, 대화형 환경, 디버깅 상황에서 자동으로 호출됩니다. 이를 통해 객체의 내부 정보를 명확히 확인할 수 있습니다.
        # f'(mean={self.mean}, std={self.std})' Python의 f-string(포맷 문자열)을 사용하여 문자열을 동적으로 생성하는 표현식
        return self.__class__.__name__ + f'(mean={self.mean}, std={self.std})'

### __call__
- __call__은 Python에서 객체를 함수처럼 호출할 수 있도록 해주는 특별 메서드
### __repr__
- 객체의 표현(representation)을 정의하는 메서드
- 객체에 대한 명확하고 디버깅 친화적인 문자열을 반환.
- 객체의 상태나 중요한 정보를 포함하여, 개발자가 객체를 이해하는 데 도움을 줌.
- 주로 개발자 도구와 관련된 환경에서 사용.

### f-string
Python 3.6부터 지원되는 기능으로, 문자열 내에서 중괄호 {}를 사용해 변수나 표현식의 값을 삽입할 수 있습니다.

```python
f"문자열 {변수 또는 표현식}"
```

In [18]:
# ========================
# 3. 데이터 로드 및 전처리 (교차 검증 포함)
# ========================
def load_data(batch_size=64):
    transform_train = transforms.Compose([
        transforms.RandomRotation(degrees=10),
        transforms.RandomResizedCrop(
            size=28,
            scale=(0.8, 1.2),
            ratio=(0.9, 1.1)
        ),
        transforms.RandomAffine(
            degrees=0,
            translate=(0.1, 0.1)
        ),
        transforms.ToTensor(),
        AddGaussianNoise(
            mean=0.,
            std=0.1
        ),
        transforms.Normalize(
            mean=(0.5,),
            std=(0.5,)
        )
    ])
    transform_test = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(
            mean=(0.5,),
            std=(0.5,)
        )
    ])
    
    # 전체 학습 데이터셋 로드
    full_train_dataset = datasets.MNIST(
        root='./data',
        train=True,
        transform=transform_train,
        download=True
    )
    test_dataset = datasets.MNIST(
        root='./data',
        train=False,
        transform=transform_test,
        download=True
    )
    
    return full_train_dataset, test_dataset

### Data loading & preprocessing
- load_data 함수는 **"훈련 데이터 셋, 평가 데이터 셋"을 튜플 형태**로 반환하는 함수.

#### Data augmentation (데이터 증강) 처리 추가
- Rotation, Affine Transformation을 통해 데이터 증강하여 *overfitting* 방지, *robustness* 증대, *Generalization* 성능을 기대할 수 있다.
- AddGaussianNoise을 통해 Noise 추가

In [19]:
# ========================
# 4. 모델 정의 (LeNet5)
# ========================
class LeNet5(nn.Module):
    def __init__(self, dropout_rate=0.5):
        super(LeNet5, self).__init__()
        self.conv1 = nn.Conv2d(
            in_channels=1,
            out_channels=6,
            kernel_size=5,
            stride=1,
            padding=0
        )
        self.bn1 = nn.BatchNorm2d(
            num_features=6
        )
        self.pool1 = nn.AvgPool2d(
            kernel_size=2,
            stride=2
        )
        self.conv2 = nn.Conv2d(
            in_channels=6,
            out_channels=16,
            kernel_size=5,
            stride=1,
            padding=0
        )
        self.bn2 = nn.BatchNorm2d(
            num_features=16
        )
        self.pool2 = nn.AvgPool2d(
            kernel_size=2,
            stride=2
        )
        self.conv3 = nn.Conv2d(
            in_channels=16,
            out_channels=120,
            kernel_size=5,
            stride=1,
            padding=0
        )
        self.fc1 = nn.Linear(
            in_features=120,
            out_features=84
        )
        self.dropout = nn.Dropout(
            p=dropout_rate
        )
        self.fc2 = nn.Linear(
            in_features=84,
            out_features=10
        )

    def forward(self, x):
        x = F.pad(x, (2, 2, 2, 2))  # 입력 이미지를 32x32로 패딩
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.pool1(x)
        x = F.relu(self.bn2(self.conv2(x)))
        x = self.pool2(x)
        x = F.relu(self.conv3(x))
        x = x.view(x.size(0), -1)  # 플래튼
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

### LeNet5 Modeling
**1 - 1st conv : 커널 사이즈 5x5**
- 입력 : 1채널의 32x32 이미지
- 출력 : 6채널의 28x28 이미지
- *stride=1*은 컨볼루션 연산 시 건너 뛰는 간격으로, 보통 모든 픽셀에 대해 컨볼루션 수행하므로 1로 설정함

**RELU 함수 적용1**

**2 - 1st pool : 커널 사이즈 2x2**
- 입력 : 6채널의 28x28 이미지
- 출력 : 6채널의 14x14 이미지

**3 - 2nd conv : 커널 사이즈 5x5**
- 입력 : 6채널의 14x14 이미지 
- 출력 : 16채널의 10x10 이미지 (padding 없이 5x5 커널을 사용했으므로 상하좌우로 2라인씩 줄어들어 10x10이 됨)
-> 6x14x14에 *16개의 5x5 커널*과 컨볼루션 연산 하면 16x10x10이 됨

**RELU 함수 적용2**

**4 - 2nd pool : 커널 사이즈 2x2**
- 입력 : 16채널의 10x10 이미지
- 출력 : 16채널의 5x5 이미지

**5 - 3rd conv : 커널 사이즈 5x5**
- 입력 : 16채널의 5x5 이미지 
- 출력 : 120채널의 1x1 이미지

**RELU 함수 적용3**

**conv 결과에 Flatten 처리**

**6 - FCL1**
- 입력 : 120 차원 벡터 [[x1, x2, x3, ... ,x120]]
- 출력 : 84 차원 벡터 [[x1, x2, x3, ... ,x84]]

**RELU 함수 적용4**

**7 - FCL2**
- 입력 : 84 차원 벡터 [[x1, x2, x3, ... ,x84]]
- 출력 : 10 차원 벡터 [[x1, x2, x3, ... ,x10]] **-> logit**

---
#### 멀티 채널에서의 컨볼루션 연산 
입력: 32×32x3
필터: 3x5×5, 총 6개.
출력: 28x28x6
하나의 필터는 입력 데이터의 모든 채널(3채널)에 대해 합성곱 연산을 수행한 후, 결과를 합산하여 하나의 피처맵 생성.
6개의 필터가 독립적으로 작동하여 총 6개의 피처맵 생성. 

---
#### Flatten 처리
- Fully Connected Layer를 통과하기 전에 Flatten 처리를 함
- x.view(x.size(0), -1)
- x는 120x1x1의 3차원 텐서이고, x.size(0)는 배치 사이즈를 의미 64
- 64x120 크기의 2차원 텐서로 평탄화 시킴

In [20]:
# ========================
# 5. 학습 함수 정의
# ========================
def train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs=5, early_stopping_patience=5):
    # 함수 정의 시 설정한 default값 (num_epochs=5, early_stopping_patience=5)은 함수 호출 시 전달한 인자 값으로 대체됨
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device) # 모델 연산 device로 이동

    # early stopping을 위한 loss value 및 몇 번 이상 loss 개선 안될 때 조기 종료할지 카운트하는 counter 초기화
    best_val_loss = float('inf')
    patience_counter = 0

    # 최적의 epoch 만큼 학습을 반복, 반복할 때 마다 모델은 초기화되지 않고 학습이 누적됨
    for epoch in range(num_epochs):
        model.train() # 모델을 훈련 모드로 변경
        total_loss = 0 # 누적 loss 값 0으로 초기화

        for images, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}"): # 객체 train_loader로부터 images와 labels를 차례대로 꺼내는 걸 반복
            images, labels = images.to(device), labels.to(device) # 꺼낸 images와 labels를 연산 device로 계속 보냄

            # Forward pass
            outputs = model(images) # 모델의 출력 값 
            loss = criterion(outputs, labels) # 모델의 출력 값과 정답 labels를 비교해서 loss를 계산

            # Backward pass and optimization
            optimizer.zero_grad() # 기울기값 초기화
            loss.backward() # 기울기 계산
            optimizer.step() # 계산한 기울기 이용하여 Fully connected layers 가중치 파라미터 업데이트

            total_loss += loss.item() # 계산된 loss를 python에서 계산할 수 있게 변환해서 total_loss에 누적 시킴
        
        avg_train_loss = total_loss / len(train_loader) # 전체 훈련 데이터셋의 길이로 나누어 평균 loss를 계산

        # 검증 단계
        model.eval() # 모델을 평가 모드로 변경
        total_val_loss = 0 # 누적 loss 값을 초기화
        with torch.no_grad(): # 기울기 계산 하지 않음 
            for images, labels in tqdm(val_loader, desc="Validation"): # main에서 호출했을때 val_loader는 test_dataset을 로드한 test_loader임.cross_validate에서 호출했을때는 k-fold에서의 validate data임
                images, labels = images.to(device), labels.to(device) # 검증 data에서 images와 labels를 연산 device로 보냄
                outputs = model(images) # 평가용 모델의 출력 
                loss = criterion(outputs, labels) # 정답과 비교하여 loss를 계산
                total_val_loss += loss.item() # total_val_loss에 누적 loss를 저장
        
        avg_val_loss = total_val_loss / len(val_loader) # 평가용 평균 loss를 계산 
        print(f"Epoch [{epoch+1}/{num_epochs}], Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}") # 지금 몇 epoch이고, 훈련 시엔 평균 몇 loss인지, 평가 시엔 평균 몇 loss인지를 출력
        
        # 스케줄러 업데이트 (ReduceLROnPlateau의 경우)
        if isinstance(scheduler, optim.lr_scheduler.ReduceLROnPlateau): # 학습률 스케줄러(scheduler)가 ReduceLROnPlateau 클래스인지 확인
            scheduler.step(avg_val_loss) # ReduceLROnPlateau이면 평균 검증 loss 를 사용해서 스케줄러를 업데이트 해라 
        else:
            scheduler.step() # 아니라면 step() 메서드를 호출. 이 경우 학습률 스케줄러는 ReduceLROnPlateau와는 다른 방식으로 동작
        
        # 조기 종료 조건 확인
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            patience_counter = 0
        else:
            patience_counter += 1
            if patience_counter >= early_stopping_patience:
                print("Early stopping triggered")
                break

### 훈련 모델
- 현재 구현 기준으로 train_model은 2번 사용된다. 

1. 하이퍼파라미터 최적화 시 교차 검증(K-FOLD)에서 호출되어 사용된다.
2. 최적의 하이퍼파라미터를 찾은 후 해당 파라미터로 다시 모델을 학습 시킬 때 호출되어 사용된다.

In [21]:
# ========================
# 6. 평가 함수 정의
# ========================
def evaluate_model_accuracy(model, test_loader):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    model.eval()

    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in tqdm(test_loader, desc="Evaluating"):
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    return accuracy


### 평가 모델
- 모델의 정확도를 계산하기 위해 사용된다. 

현재 구현 기준으로는 2번 사용된다. 
1. 하이퍼파라미터 최적화 시 교차 검증(K-FOLD)에서 호출되어 사용된다.
2. 최적의 하이퍼파라미터를 찾은 후 학습이 끝난 후 최종 평가 시 호출되어 사용된다.

In [22]:
# ========================
# 7. 교차 검증 함수 정의
# ========================
def cross_validate(model_class, dataset, k=5, params=None):
    # 교차검증 함수에 필요한 인자 : 모델 이름, 전체 training dataset, 5개의 fold, 최적화가 완료된 하이퍼파리미터들
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # 연산 device 결정
    kfold = KFold(n_splits=k, shuffle=True, random_state=42) # 스킷런 라이브러리로 부터 가져온 KFOLD 메서드를 이용하여 교차 검증 객체를 생성
    # shuffle=True 는 데이터를 K개의 Fold로 나누기 전에 데이터를 섞는다는 의미
    # 이미 set_seed(seed=42) 함수에서 난수 시드를 고정했지만, random_state=42를 추가로 설정하는 이유는 scikit-learn과 PyTorch의 난수 생성기가 독립적으로 동작하기 때문. 서로 독립이므로 42가 아닌 다른 수를 설정해도 됨.
    
    accuracies = [] # fold마다 나오는 accuray들을 저장할 빈 리스트를 생성 k가 5면 5개의 accuracy가 저장됨 (accuracies = [0.85, 0.88, 0.86, 0.90, 0.87])

    for fold, (train_idx, val_idx) in enumerate(kfold.split(dataset)):
        print(f"Fold {fold + 1}") # fold는 0부터 시작하는 인덱스이므로 +1해서 몇 번째 fold 결과인지 출력
        # 데이터 로더 생성

        # train_idx와val_idx는 kfold로 분할된 데이터의 인덱스가 저장된 배열
        # train_idx = ([1, 2, 3, 4, 5, ... ])
        # val_idx = ([4000, 4001, 4002, ... ])
        train_subsampler = torch.utils.data.SubsetRandomSampler(train_idx)
        val_subsampler = torch.utils.data.SubsetRandomSampler(val_idx)
        
        train_loader = torch.utils.data.DataLoader(
            dataset,
            batch_size=params['batch_size'],
            sampler=train_subsampler
        )
        val_loader = torch.utils.data.DataLoader(
            dataset,
            batch_size=params['batch_size'],
            sampler=val_subsampler
        )
        
        # 교차 검증 마다 모델과 학습률 스케줄러가 초기화 되어야 각각의 독립적인 accuracy에 대해 구할 수 있으므로 
        # 모델 초기화
        model = model_class(dropout_rate=params['dropout_rate']).to(device)
        criterion = nn.CrossEntropyLoss()
        optimizer = optim.Adam(model.parameters(), lr=params['learning_rate'])
        
        # 학습률 스케줄러 초기화 (ReduceLROnPlateau)
        scheduler = optim.lr_scheduler.ReduceLROnPlateau(
            optimizer,
            mode='min',
            patience=params['scheduler_patience'],
            factor=params['scheduler_factor'],
            verbose=False
        )
        
        # 모델 학습
        train_model(
            model,
            train_loader,
            val_loader,
            criterion,
            optimizer,
            scheduler,
            num_epochs=params['num_epochs'],
            early_stopping_patience=params['early_stopping_patience']
        )
        
        # 모델 평가
        accuracy = evaluate_model_accuracy(model, val_loader)
        accuracies.append(accuracy)
        print(f"Fold {fold + 1} Accuracy: {accuracy:.2f}%\n")
    
    average_accuracy = sum(accuracies) / len(accuracies)
    print(f"Average Accuracy over {k} folds: {average_accuracy:.2f}%")
    # 교차 검증으로 나온 K개의 accuracy의 평균을 리턴 
    return average_accuracy

### 교차 검증 (K-Fold Cross-Validation)
- 교차 검증은 데이터셋을 여러 번 나누어 학습 및 검증 과정을 반복함으로써 모델의 일반화 성능을 평가하는 방법.
- 여기서는 K-Fold 방법을 사용.


### Code 
**for fold, (train_idx, val_idx) in enumerate(kfold.split(dataset)):**
- enumerate는 kfold.split(dataset)가 반복될 때 마다 그 횟수 정보를 추가하여 튜플 형태로 제공 (횟수, kfold.split(dataset)의 반환 값)
- kfold.split(dataset)는 dataset를 훈련 셋과 검증 셋으로 나누었을 때 인덱스를 튜플로 제공 (훈련 셋 인덱스, 검증 셋 인덱스)

In [23]:
# ========================
# 8. Optuna를 사용한 하이퍼파라미터 튜닝 (교차 검증 포함)
# ========================
def objective(trial):
    try:
        # 하이퍼파라미터 제안
        learning_rate = trial.suggest_float('learning_rate', 1e-5, 1e-2, log=True)
        batch_size = trial.suggest_categorical('batch_size', [64, 128, 256])
        dropout_rate = trial.suggest_float('dropout_rate', 0.2, 0.5)
        num_epochs = trial.suggest_int('num_epochs', 5, 20)
        early_stopping_patience = trial.suggest_int('early_stopping_patience', 3, 10)
        
        # ReduceLROnPlateau의 하이퍼파라미터 제안 (학습률 스케줄러를 위한 하이퍼파라미터)
        scheduler_patience = trial.suggest_int('scheduler_patience', 2, 5) # 성능 개선이 없더라도 학습률을 감소시키기 전에 기다리는 epoch 수
        scheduler_factor = trial.suggest_float('scheduler_factor', 0.1, 0.5, step=0.1) # 학습률을 감소시킬 비율
        
        # 데이터 로드 - 하이퍼파라미터 최적화 시 훈련 셋만 필요
        full_train_dataset, _ = load_data(batch_size)
        
        # 교차 검증 수행
        params = {
            'learning_rate': learning_rate,
            'batch_size': batch_size,
            'dropout_rate': dropout_rate,
            'num_epochs': num_epochs,
            'early_stopping_patience': early_stopping_patience,
            'scheduler_patience': scheduler_patience,
            'scheduler_factor': scheduler_factor
        }
        
        # 하이퍼 파라미터 최적화 시 k-fold를 이용 
        average_accuracy = cross_validate(LeNet5, full_train_dataset, k=2, params=params) # 시간 단축을 위해 k를 5에서 2로 
        # Kfold의 평균 정확도를 리턴 
        return average_accuracy
    except Exception as e:
        print(f"An error occurred during trial: {e}")
        return 0  # 에러 발생 시 최소 정확도로 반환

ReduceLROnPlateau는 모델의 학습 중 성능 개선이 정체되었을 때, 학습률을 감소시켜 학습이 더 잘 진행되도록 돕는 스케줄러

In [24]:
# ========================
# 9. Optuna 스터디 생성 및 최적화 실행
# ========================
def run_optuna_study(n_trials=20):
    # Optuna 스터디 생성 (목표는 최대화, 즉 정확도 accuracy의 최대화)
    study = optuna.create_study(direction='maximize')
    
    # 최적화 실행
    study.optimize(objective, n_trials=n_trials)
    
    # 최적의 하이퍼파라미터 출력
    if study.best_trial is not None:
        print("Best trial:")
        trial = study.best_trial

        print(f"  Accuracy: {trial.value:.2f}%")
        print("  Params: ")
        for key, value in trial.params.items():
            print(f"    {key}: {value}")
        
        # 하이퍼파라미터 저장 
        # indent=4는 JSON 파일을 사람이 읽기 쉽도록 들여쓰기(4칸)로 저장하는 옵션
        # Context Manager 파일 읽기/쓰기 작업 - best_hyperparameters.json 파일 생성 및 저장
        with open('outputs/best_hyperparameters.json', 'w') as f:
            json.dump(trial.params, f, indent=4)
        print("하이퍼파라미터가 'outputs/best_hyperparameters.json' 파일로 저장되었습니다.")
        
        return trial.params, study
    else:
        print("No successful trials found.")
        return None, study

### optuna study의 생성
- Optuna에서 study를 create한다는 것은 하이퍼파라미터 최적화를 수행하기 위한 환경(스터디 객체)을 만드는 것

### study의 목표
- "direction = maximize"는 study의 목표가 accuracy 최대화임을 의미한다. 
- 정확도(accuracy)를 최적화하려면 maximize, 손실(loss)을 최적화하려면 minimize.

In [25]:
# ========================
# 10. 모델 저장 및 불러오기
# ========================
def save_model(model, path='outputs/best_lenet5_mnist.pth'):
    torch.save(model.state_dict(), path)
    print(f"모델이 '{path}' 파일로 저장되었습니다.")

def load_model(path='outputs/best_lenet5_mnist.pth', dropout_rate=0.5):
    model = LeNet5(dropout_rate=dropout_rate)
    model.load_state_dict(torch.load(path))
    model.eval()
    print(f"모델이 '{path}' 파일에서 불러와졌습니다.")
    return model

In [26]:
# ========================
# 11. 예측 시각화
# ========================
def visualize_predictions(model, test_loader, num_images=5):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    model.eval()

    data_iter = iter(test_loader)
    images, labels = next(data_iter)
    images, labels = images[:num_images].to(device), labels[:num_images].to(device)

    outputs = model(images)
    _, preds = torch.max(outputs, 1)

    fig, axes = plt.subplots(2, num_images, figsize=(15, 6))

    for idx in range(num_images):
        # 원본 이미지
        ax = axes[0, idx]
        img = images[idx].cpu().squeeze()
        img = img * 0.5 + 0.5  # 정규화 해제
        ax.imshow(img, cmap='gray')
        ax.set_title(f'Label: {labels[idx].item()}')
        ax.axis('off')
        
        # 예측된 이미지
        ax = axes[1, idx]
        ax.imshow(img, cmap='gray')
        ax.set_title(f'Pred: {preds[idx].item()}')
        ax.axis('off')

    plt.tight_layout()
    plt.show()

In [27]:
# ========================
# 12. 메인 함수 정의
# ========================
def main():
    # Optuna 하이퍼파라미터 튜닝 실행, n_trials번 동안 학습하면서 파라미터를 최적화 함
    best_params, study = run_optuna_study(n_trials=2) # 동작 확인을 위한 시간 단축 20 -> 2로 수정
    
    if best_params is not None: # best hyperparameters를 찾았다면, 
        # 데이터 로드
        full_train_dataset, test_dataset = load_data(best_params['batch_size']) # 전체 훈련 데이터, 평가 데이터를 batch size에 맞게 로드해라.
        
        # 전체 학습 데이터 로드
        # Optuna에서 사용한 전체 학습 데이터를 다시 사용하지 않고, 전체 데이터로 다시 학습
        train_loader = torch.utils.data.DataLoader(
            full_train_dataset,
            batch_size=best_params['batch_size'],
            shuffle=True
        )
        test_loader = torch.utils.data.DataLoader(
            test_dataset,
            batch_size=best_params['batch_size'],
            shuffle=False
        )
        
        # 모델 초기화
        best_model = LeNet5(dropout_rate=best_params['dropout_rate'])
        
        # 손실 함수 및 최적화 알고리즘
        criterion = nn.CrossEntropyLoss()
        optimizer = optim.Adam(best_model.parameters(), lr=best_params['learning_rate'])
        
        # 학습률 스케줄러 초기화 (ReduceLROnPlateau)
        scheduler = optim.lr_scheduler.ReduceLROnPlateau(
            optimizer,
            mode='min',
            patience=best_params['scheduler_patience'],
            factor=best_params['scheduler_factor'],
            verbose=True
        )
        
        # 모델 학습
        train_model(
            best_model,
            train_loader,
            test_loader,  # 여기서는 검증 데이터를 테스트 데이터로 사용
            criterion,
            optimizer,
            scheduler,
            num_epochs=best_params['num_epochs'],
            early_stopping_patience=best_params['early_stopping_patience']
        )
        
        # 최종 평가
        final_accuracy = evaluate_model_accuracy(best_model, test_loader)
        print(f"Final Accuracy with Best Params: {final_accuracy:.2f}%")
        
        # 예측 시각화
        visualize_predictions(best_model, test_loader, num_images=5)
        
        # 모델 저장
        save_model(best_model, 'outputs/best_lenet5_mnist.pth')
        
        # Optuna 시각화
        fig1 = plot_optimization_history(study)
        fig1.write_image('outputs/optimization_history.png')
        print("Optuna 최적화 히스토리가 'outputs/optimization_history.png'로 저장되었습니다.")

        fig2 = plot_param_importances(study)
        fig2.write_image('outputs/param_importances.png')
        print("Optuna 파라미터 중요도가 'outputs/param_importances.png'로 저장되었습니다.")

    else:
        print("최적의 하이퍼파라미터를 찾지 못했습니다.")

1. 하이퍼 파라미터 최적화 (Optuna를 이용, k-fold로 분류된 데이터 활용)
2. training data / test data 로드 (최적화된 batch size)
3. LeNet5 객체 생성 (최적화된 dropout)
4. 손실 함수 및 최적화 알고리즘 객체 생성 (최적화된 하이퍼파라미터)
5. 학습률 스케줄러 객체 생성 (최적화된 스케줄러의 하이퍼 파라미터)
6. 모델 학습 (최적화된 하이퍼 파라미터)
7. 모델 평가 및 성능 계산 
8. 시각화 및 모델 저장
9. 하이퍼파라미터 시각화

In [None]:
if __name__ == "__main__":
    # 디렉토리 생성
    os.makedirs('outputs', exist_ok=True)
    
    main()