In [138]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms

import plotly.graph_objects as go

import random
import pandas as pd
import numpy as np

!pip install torchinfo
from torchinfo import summary



In [139]:
# Random Seed 고정 (학습 반복 시행 시에도 동일한 결과가 나오도록)

seed = 20250302

random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)

**1. 데이터셋 로딩 및 데이터 분석**

In [140]:
# 데이터셋 로딩

transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))])

train_dataset = torchvision.datasets.MNIST(root='./data',
                                           train=True,
                                           transform=transform,
                                           download=True)

test_dataset = torchvision.datasets.MNIST(root='./data',
                                          train=False,
                                          transform=transform,
                                          download=True)


In [141]:
# 시간 절약을 위해, 학습 데이터에서 랜덤하게 일부 샘플만 추출

from torch.utils.data import Subset, DataLoader

NUM_TRAIN_SAMPLES = 9000
BATCH_SIZE = 32

subset_indices = random.sample(range(len(train_dataset)), NUM_TRAIN_SAMPLES)
train_subset = Subset(train_dataset, subset_indices)

train_loader = DataLoader(train_subset,
                          batch_size=BATCH_SIZE,
                          shuffle=True)

# 테스트 데이터셋은 학습 대상이 아니므로 그대로 이용
test_loader = DataLoader(test_dataset,
                         batch_size=BATCH_SIZE,
                         shuffle=False)

In [142]:
# 클래스 불균형 분석

# 학습 데이터
train_labels = torch.tensor([train_subset.dataset.targets[i] for i in subset_indices])
train_class_counts = torch.bincount(train_labels)
print(train_class_counts)

NUM_CLASSES = len(train_class_counts)

tensor([ 832, 1034,  893,  961,  884,  834,  889,  946,  859,  868])


In [143]:
train_class_percentage = np.array(train_class_counts) * 100.0 / sum(train_class_counts)

train_y_distrib = pd.DataFrame({'class': list(range(NUM_CLASSES)),
                                'count': train_class_counts,
                                'percentage (%)': train_class_percentage})

train_y_distrib

Unnamed: 0,class,count,percentage (%)
0,0,832,9.244445
1,1,1034,11.488889
2,2,893,9.922222
3,3,961,10.677778
4,4,884,9.822222
5,5,834,9.266667
6,6,889,9.877778
7,7,946,10.511111
8,8,859,9.544445
9,9,868,9.644445


In [144]:
# 테스트 데이터
test_labels = test_loader.dataset.targets
test_class_counts = torch.bincount(test_labels)
print(test_class_counts)

tensor([ 980, 1135, 1032, 1010,  982,  892,  958, 1028,  974, 1009])


In [145]:
test_class_percentage = np.array(test_class_counts) * 100.0 / sum(test_class_counts)

test_y_distrib = pd.DataFrame({'class': list(range(NUM_CLASSES)),
                               'count': test_class_counts,
                               'percentage (%)': test_class_percentage})

test_y_distrib

Unnamed: 0,class,count,percentage (%)
0,0,980,9.8
1,1,1135,11.35
2,2,1032,10.32
3,3,1010,10.1
4,4,982,9.82
5,5,892,8.92
6,6,958,9.58
7,7,1028,10.28
8,8,974,9.74
9,9,1009,10.09


**2. CNN 모델 정의**

In [146]:
# CNN 모델 정의

class CNN(nn.Module):

    def __init__(self):
        super(CNN, self).__init__()

        # Conv
        self.conv1 = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3, padding=1),
            nn.ReLU()
        )
        self.pool1 = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=3),
            nn.ReLU()
        )
        self.pool2 = nn.MaxPool2d(2, 2)
        self.conv3 = nn.Sequential(
            nn.Conv2d(64, 64, kernel_size=3),
            nn.ReLU()
        )

        # Fully Connected
        self.fc1 = nn.Sequential(
            nn.Linear(64 * 4 * 4, 64),
            nn.Sigmoid()
        )
        self.fc_final = nn.Sequential(
            nn.Linear(64, 10),
            nn.Softmax()  # Classification Task 의 Output Layer 이므로 Softmax 고정
        )

    def forward(self, x):

        # Conv
        x = self.conv1(x)
        x = self.pool1(x)
        x = self.conv2(x)
        x = self.pool2(x)
        x = self.conv3(x)

        x = x.view(-1, 64 * 4 * 4)

        # Fully Connected
        x = self.fc1(x)
        x = self.fc_final(x)

        return x

In [147]:
# 모델 구조 출력

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CNN().to(device)

print(summary(model, input_size=(BATCH_SIZE, 1, 28, 28)))

Layer (type:depth-idx)                   Output Shape              Param #
CNN                                      [32, 10]                  --
├─Sequential: 1-1                        [32, 32, 28, 28]          --
│    └─Conv2d: 2-1                       [32, 32, 28, 28]          320
│    └─ReLU: 2-2                         [32, 32, 28, 28]          --
├─MaxPool2d: 1-2                         [32, 32, 14, 14]          --
├─Sequential: 1-3                        [32, 64, 12, 12]          --
│    └─Conv2d: 2-3                       [32, 64, 12, 12]          18,496
│    └─ReLU: 2-4                         [32, 64, 12, 12]          --
├─MaxPool2d: 1-4                         [32, 64, 6, 6]            --
├─Sequential: 1-5                        [32, 64, 4, 4]            --
│    └─Conv2d: 2-5                       [32, 64, 4, 4]            36,928
│    └─ReLU: 2-6                         [32, 64, 4, 4]            --
├─Sequential: 1-6                        [32, 64]                  --
│    └


Implicit dimension choice for softmax has been deprecated. Change the call to include dim=X as an argument.



**3. 데이터셋 분리**

* Train Data -> Train Data + Valid Data (epoch) + Valid Data (하이퍼파라미터 최적화)

In [148]:
# 데이터셋 분리

from torch.utils.data import random_split

# 샘플 수
num_train = 2000
num_valid_epoch = 2000
num_valid_hpo = 5000

assert NUM_TRAIN_SAMPLES == num_train + num_valid_epoch + num_valid_hpo

# 데이터셋 분리
train_dataset, valid_epoch_dataset, valid_hpo_dataset =\
    random_split(train_subset, [num_train, num_valid_epoch, num_valid_hpo])

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
valid_epoch_loader = DataLoader(valid_epoch_dataset, batch_size=BATCH_SIZE, shuffle=False)
valid_hpo_loader = DataLoader(valid_hpo_dataset, batch_size=BATCH_SIZE, shuffle=False)

**4. 하이퍼파라미터 최적화 학습 실시 함수**

* 하이퍼파라미터 최적화 라이브러리는 Optuna 사용
* 하이퍼파라미터 탐색 100 회 실시
* 하이퍼파라미터 목록
  * Learning Rate Scheduler 의 종류
    * Multiplicative
    * Exponential
    * Step
    * Multi-Step
    * Reduce-LR-On-Plateau
    * Cosine-Annealing
    * Cosine-Annealing-Warm-Restarts
    * Cyclic (```triangle```, ```triangle2```, ```exp_range``` 3가지)
  * Learning Rate
    * 5e-5 ~ 1e-2


In [149]:
MAX_EPOCHS = 65536
EARLY_STOPPING_ROUNDS = 10  # Early Stopping Patience (epochs)
TRIAL_COUNT = 100           # HPO trial count

In [150]:
from sklearn.metrics import accuracy_score
from copy import deepcopy

In [151]:
# Optuna 설정

!pip install optuna
import optuna
import logging

optuna.logging.set_verbosity(logging.WARNING)



In [152]:
# 모델 학습 실시

# args :
# - model           : 학습할 모델
# - train_loader    : Training Data Loader
# - train_loss_list : 각 epoch 에서의 train loss 기록

# returns :
# - train_loss : 모델의 Train Loss

def run_train(model, train_loader, train_loss_list):
    model.train()
    train_loss = 0.0
    cnt = 0

    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)

        # train 실시
        model.optimizer.zero_grad()
        outputs = model(images)

        loss = nn.CrossEntropyLoss()(outputs, labels)
        loss.backward()
        model.optimizer.step()

        train_loss += loss.item()
        cnt += 1

    train_loss_list.append(train_loss / len(train_loader))
    return train_loss_list[-1]

In [153]:
# 모델 validation 실시

# args :
# - model        : validation 할 모델
# - valid_loader : Validation Data Loader
# - during_train : 모델 학습 중이면 True, 그렇지 않으면 False

# returns :
# - accuracy : 모델의 validation 정확도

def run_validation(model, valid_loader, during_train=True):
    model.eval()
    correct, total = 0, 0

    with torch.no_grad():
        for images, labels in valid_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)

            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        # Accuracy 계산
        accuracy = correct / total

        if during_train and model.scheduler is not None:
            if model.scheduler.__class__.__name__ == 'ReduceLROnPlateau':
                model.scheduler.step(accuracy)
            else:
                model.scheduler.step()

    return accuracy

In [154]:
# 모델 학습 및 validation 전체 프로세스

# args :
# - model              : 학습할 모델
# - train_loader       : Training Data Loader
# - valid_epoch_loader : 각 epoch 마다 validation 할 Valid Data Loader
# - valid_hpo_loader   : 최종적으로 해당 하이퍼파라미터 조합에 대한 Valid Data Loader
# - verbose            : 학습 중 프로세스 출력 여부

# returns :
# - final_acc        : 해당 하이퍼파라미터 조합에 대한 최종 Accuracy (valid_acc 이 가장 높았던 epoch 의 모델로 측정)
# - best_epoch_model : valid_acc 이 가장 높았던 epoch 에서 생성된 모델

def run_model_common(model, train_loader, valid_epoch_loader, valid_hpo_loader,
                     verbose=False):

    train_loss_list = []       # train loss
    valid_acc_list = []        # valid accuracy
    max_valid_acc = 0.0        # max validation accuracy
    best_valid_acc_epoch = -1  # valid_acc 이 가장 높았던 epoch
    best_epoch_model = None    # valid_acc 이 가장 높았던 epoch 의 모델

    # 1. 학습 실시
    for epoch in range(MAX_EPOCHS):

        # 1-1. train model
        train_loss = run_train(model, train_loader, train_loss_list)

        # 1-2. validate model (with EPOCH VALID SET)
        epoch_acc = run_validation(model, valid_epoch_loader)
        valid_acc_list.append(epoch_acc)

        # 1-3. Early Stopping 처리 (overfitting 방지)
        if epoch_acc > max_valid_acc:
            max_valid_acc = epoch_acc
            best_valid_acc_epoch = epoch

            best_epoch_model = CNN().to(device)
            best_epoch_model.load_state_dict(model.state_dict())

            if verbose:
                print('best model updated')

        if epoch - best_valid_acc_epoch >= EARLY_STOPPING_ROUNDS:
            break

        # 1-4. 결과 출력
        if verbose:
            if model.scheduler is not None:
                epoch_lr = model.scheduler.get_last_lr()[0]
                print(f"Epoch {epoch+1}, Loss: {train_loss:.4f}, Learning Rate: {epoch_lr:.6f}, Accuracy: {epoch_acc:.4f}")
            else:
                print(f"Epoch {epoch+1}, Loss: {train_loss:.4f}, Accuracy: {epoch_acc:.4f}")

    # check best-epoch model correctly loaded
    checked_acc = run_validation(best_epoch_model,
                                 valid_epoch_loader,
                                 during_train=False)

    if verbose:
        print(f"Best Epoch: {best_valid_acc_epoch}, Best Valid Acc: {max_valid_acc}")
        print(f"Valid Acc (with Epoch valid set) on Loaded Best Model: {checked_acc}")

    assert abs(max_valid_acc - checked_acc) < 1e-8

    # 2. validate best-epoch model (with HPO VALID SET)
    final_acc = run_validation(best_epoch_model,
                               valid_hpo_loader,
                               during_train=False)

    if verbose:
        print(f"Final Acc (with HPO valid set) on Loaded Best Model: {final_acc}")

    return final_acc, best_epoch_model

In [155]:
print(device)

cuda


**4-1. 실험 실시**

In [156]:
hpo_best_acc = 0              # 모든 Hyper-param 조합의 HPO Valid set 정확도 중 가장 높은 것
best_hyperparam_set = None    # HPO Valid set 정확도가 가장 높은 Hyper-param 조합
best_hyperparam_model = None  # best_hyperparam_set 의 Hyper-param 조합으로 학습된 모델

In [157]:
# Scheduler 생성

# args:
# - scheduler_name : name of scheduler to create
# - optimizer      : optimizer for the scheduler
# - init_lr        : initial learning rate

def create_scheduler(scheduler_name, optimizer, init_lr):

    if scheduler_name == 'multiplicative':
        return optim.lr_scheduler.MultiplicativeLR(optimizer=optimizer,
                                                   lr_lambda=lambda epoch: 0.98 ** epoch)

    elif scheduler_name == 'exponential':
        return optim.lr_scheduler.ExponentialLR(optimizer=optimizer,
                                                gamma=0.95)

    elif scheduler_name == 'step':
        return optim.lr_scheduler.StepLR(optimizer=optimizer,
                                         step_size=10,
                                         gamma=0.5)

    elif scheduler_name == 'multistep':
        return optim.lr_scheduler.MultiStepLR(optimizer=optimizer,
                                              milestones=[15, 25, 30, 35, 45],
                                              gamma=0.5)

    elif scheduler_name == 'reduce_lr_on_plateau':
        return optim.lr_scheduler.ReduceLROnPlateau(optimizer=optimizer,
                                                    mode='max',
                                                    patience=5,
                                                    factor=0.25)  # max = Accuracy 가 더 이상 증가하지 않을 때

    elif scheduler_name == 'cosine_annealing':
        return optim.lr_scheduler.CosineAnnealingLR(optimizer=optimizer,
                                                    T_max=10,
                                                    eta_min=0)

    elif scheduler_name == 'cosine_annealing_warm_restarts':
        return optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer=optimizer,
                                                              T_0=5,
                                                              T_mult=2,
                                                              eta_min=1e-5)

    elif scheduler_name.startswith('cyclic_'):
        return optim.lr_scheduler.CyclicLR(optimizer=optimizer,
                                           base_lr=1e-5,
                                           step_size_up=5,
                                           max_lr=init_lr,
                                           gamma=0.92,  # for only 'exp_range' mode
                                           mode=scheduler_name[7:])

    elif scheduler_name == 'lambda':
        return optim.lr_scheduler.LambdaLR(optimizer=optimizer,
                                           lr_lambda=lambda epoch: 0.95 ** epoch)

    else:
        raise Exception(f'Error: wrong scheduler name {scheduler_name}')

In [158]:
trial_count = 0  # 1st ~ 10th trial 에만 학습 중 정보 출력

def objective(trial):
    global hpo_best_acc, best_hyperparam_set, best_hyperparam_model, trial_count

    # hyper-params
    scheduler_list = ['none', 'multiplicative', 'exponential', 'step',
                      'multistep', 'reduce_lr_on_plateau', 'cosine_annealing',
                      'cosine_annealing_warm_restarts', 'cyclic_triangular',
                      'cyclic_triangular2', 'cyclic_exp_range']

    params = {
        'scheduler_name': trial.suggest_categorical('scheduler_name', scheduler_list),
        'learning_rate': trial.suggest_float('learning_rate', 0.00005, 0.01, log=True)
    }

    # define and run model
    model = CNN().to(device)
    model.optimizer = torch.optim.AdamW(model.parameters(),
                                        lr=params['learning_rate'])

    if params['scheduler_name'] != 'none':
        model.scheduler = create_scheduler(scheduler_name=params['scheduler_name'],
                                          optimizer=model.optimizer,
                                          init_lr=params['learning_rate'])
    else:
        model.scheduler = None

    final_acc, best_epoch_model = run_model_common(model,
                                                   train_loader,
                                                   valid_epoch_loader,
                                                   valid_hpo_loader,
                                                   verbose=(trial_count < 10))

    trial_count += 1

    # global best model 갱신
    if final_acc > hpo_best_acc:
        hpo_best_acc = final_acc
        best_hyperparam_set = params

        best_hyperparam_model = CNN().to(device)
        best_hyperparam_model.load_state_dict(best_epoch_model.state_dict())

        print(f'best_hyperparam_model updated with Accuracy={hpo_best_acc:.4f}')

    print(f"Params: {params}, Accuracy: {final_acc:.4f}")
    return final_acc

In [159]:
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=TRIAL_COUNT)


Implicit dimension choice for softmax has been deprecated. Change the call to include dim=X as an argument.



best model updated
Epoch 1, Loss: 2.2814, Learning Rate: 0.000248, Accuracy: 0.5250
best model updated
Epoch 2, Loss: 2.1185, Learning Rate: 0.000230, Accuracy: 0.7175
best model updated
Epoch 3, Loss: 2.0102, Learning Rate: 0.000202, Accuracy: 0.7355
best model updated
Epoch 4, Loss: 1.9403, Learning Rate: 0.000167, Accuracy: 0.7615
best model updated
Epoch 5, Loss: 1.8904, Learning Rate: 0.000127, Accuracy: 0.8055
best model updated
Epoch 6, Loss: 1.8514, Learning Rate: 0.000088, Accuracy: 0.8230
best model updated
Epoch 7, Loss: 1.8238, Learning Rate: 0.000052, Accuracy: 0.8380
best model updated
Epoch 8, Loss: 1.8054, Learning Rate: 0.000024, Accuracy: 0.8440
best model updated
Epoch 9, Loss: 1.7950, Learning Rate: 0.000006, Accuracy: 0.8490
Epoch 10, Loss: 1.7909, Learning Rate: 0.000000, Accuracy: 0.8475
Epoch 11, Loss: 1.7892, Learning Rate: 0.000006, Accuracy: 0.8475
Epoch 12, Loss: 1.7902, Learning Rate: 0.000024, Accuracy: 0.8480
best model updated
Epoch 13, Loss: 1.7872, Lea

In [160]:
# Test Dataset 성능 평가

print(f'best hyper-param: {best_hyperparam_set}, best acc: {hpo_best_acc}')

best hyper-param: {'scheduler_name': 'cosine_annealing', 'learning_rate': 0.0036353756479507514}, best acc: 0.9728


In [161]:
# best_hyperparam_model 이 정상적으로 load 되었는지 최종 확인

checked_hpo_acc = run_validation(best_hyperparam_model,
                                 valid_hpo_loader,
                                 during_train=False)

print(f"Valid Acc (with HPO valid set) on Best Hyper-param Model: {checked_hpo_acc}")

assert abs(hpo_best_acc - checked_hpo_acc) < 1e-8

Valid Acc (with HPO valid set) on Best Hyper-param Model: 0.9728


In [162]:
# 테스트셋에 대한 최종 정확도

hpo_final_acc = run_validation(best_hyperparam_model,
                               test_loader,
                               during_train=False)

print(f'Final HPO Acc (with test set) : {hpo_final_acc}')

Final HPO Acc (with test set) : 0.9749


**5. HPO 성능 결과 확인**

In [163]:
from optuna.visualization import plot_optimization_history

In [168]:
# HPO 추이

fig = plot_optimization_history(study)
fig.update_layout(width=1000,
                  height=650,
                  yaxis_title='Accuracy (HPO valid set)')
fig.show()

In [169]:
fig.update_layout(yaxis=dict(range=[0.95, 0.975]))
fig.show()

**6. 각 Hyperparameter 값에 따른 성능 분포 확인**

In [170]:
# trial DataFrame 가져오기

trials_df = study.trials_dataframe()

In [171]:
trials_df

Unnamed: 0,number,value,datetime_start,datetime_complete,duration,params_learning_rate,params_scheduler_name,state
0,0,0.9620,2025-03-02 10:22:49.487744,2025-03-02 10:23:58.644510,0 days 00:01:09.156766,0.000255,cosine_annealing,COMPLETE
1,1,0.1004,2025-03-02 10:23:58.644645,2025-03-02 10:24:13.146090,0 days 00:00:14.501445,0.009336,multistep,COMPLETE
2,2,0.7744,2025-03-02 10:24:13.146213,2025-03-02 10:25:35.080329,0 days 00:01:21.934116,0.000095,step,COMPLETE
3,3,0.9690,2025-03-02 10:25:35.080547,2025-03-02 10:26:13.802914,0 days 00:00:38.722367,0.002423,cyclic_triangular,COMPLETE
4,4,0.9682,2025-03-02 10:26:13.803041,2025-03-02 10:27:10.978033,0 days 00:00:57.174992,0.000191,none,COMPLETE
...,...,...,...,...,...,...,...,...
95,95,0.9674,2025-03-02 11:30:13.792198,2025-03-02 11:30:44.838037,0 days 00:00:31.045839,0.000751,none,COMPLETE
96,96,0.9624,2025-03-02 11:30:44.838197,2025-03-02 11:31:23.037550,0 days 00:00:38.199353,0.000474,cyclic_triangular,COMPLETE
97,97,0.9720,2025-03-02 11:31:23.037699,2025-03-02 11:32:20.744740,0 days 00:00:57.707041,0.001969,cosine_annealing_warm_restarts,COMPLETE
98,98,0.9664,2025-03-02 11:32:20.744928,2025-03-02 11:33:22.269690,0 days 00:01:01.524762,0.000355,cosine_annealing_warm_restarts,COMPLETE


In [191]:
# 각 Learning Rate Scheduler 별 최고 정확도

import plotly.express as px

max_accuracy_by_scheduler = trials_df.groupby(by=['params_scheduler_name'], as_index=False)['value'].max()

fig = px.bar(max_accuracy_by_scheduler,
             x='params_scheduler_name',
             y='value',
             color='params_scheduler_name',
             title='Max Accuracy by Learning Rate Scheduler')

fig.update_layout(width=900, height=550,
                  yaxis=dict(range=[0.967, 0.973]),
                  yaxis_title='Accuracy')
fig.show()

In [192]:
# 각 Learning Rate Scheduler 별 정확도 분석

fig = px.scatter(trials_df,
                 x="params_learning_rate",
                 y="value",
                 color="params_scheduler_name",
                 color_discrete_sequence=px.colors.qualitative.Alphabet,
                 title="Accuracy Distribution by Learning Rate Scheduler")

fig.update_layout(width=900, height=550,
                  yaxis=dict(range=[0.95, 0.975]),
                  yaxis_title='Accuracy')

fig.update_traces(marker=dict(size=10))

fig.show()