In [1]:
import torch
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, random_split

import random
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
from datetime import datetime

def set_seed(seed):
    # Python random 모듈의 시드 고정
    random.seed(seed)
    
    # NumPy의 시드 고정
    np.random.seed(seed)
    
    # PyTorch의 시드 고정 (CPU 연산)
    torch.manual_seed(seed)
    
    # PyTorch의 시드 고정 (CUDA 연산)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)  # 여러 개의 GPU를 사용하는 경우
    
    # 재현성을 위해 일부 설정 비활성화
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

# 사용 예시
#set_seed(42)

# STEP 0. GPU 유무에 따른 Device 설정

In [2]:
if torch.cuda.is_available():
    print("GPU가 사용 가능합니다. 현재 사용중인 GPU 수:", torch.cuda.device_count())
    print("현재 디바이스 이름:", torch.cuda.get_device_name(torch.cuda.current_device()))
else:
    print("GPU를 사용할 수 없습니다. CPU를 사용합니다.")

GPU가 사용 가능합니다. 현재 사용중인 GPU 수: 1
현재 디바이스 이름: NVIDIA GeForce RTX 2070 SUPER


In [3]:
DEVICE = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

print("device : {}".format(DEVICE))

device : cuda


# STEP 1.  데이터 로드 및 train/validation/test 데이터설정

In [4]:
# 1. 데이터 전처리: ToTensor 변환을 적용합니다.
train_transform = transforms.Compose([
    transforms.ToTensor()
])

test_transform = transforms.Compose([
    transforms.ToTensor()
])

# 2. CIFAR-10 데이터셋 다운로드 및 로드
train_dataset = torchvision.datasets.CIFAR10(root="./DataSet", train=True, download=True, transform=train_transform)
test_dataset = torchvision.datasets.CIFAR10(root="./DataSet", train=False, download=True, transform=test_transform)

print(train_dataset)

Files already downloaded and verified
Files already downloaded and verified
Dataset CIFAR10
    Number of datapoints: 50000
    Root location: ./DataSet
    Split: Train
    StandardTransform
Transform: Compose(
               ToTensor()
           )


In [5]:
# 3. 학습 데이터셋을 훈련용과 검증용으로 분할
train_dataset_size = int(len(train_dataset)*0.85)
validation_dataset_size = len(train_dataset) - train_dataset_size

train_dataset, validation_dataset = random_split(train_dataset,[train_dataset_size, validation_dataset_size])

print("train 데이터셋 크기 : {}".format(len(train_dataset)))
print("validation 데이터셋 크기 : {}".format(len(validation_dataset)))
print("test 데이터셋 크기 : {}".format(len(test_dataset)))

train 데이터셋 크기 : 42500
validation 데이터셋 크기 : 7500
test 데이터셋 크기 : 10000


In [6]:
batchSize = 256

# 학습 데이터 로더 생성
# batch_size: 한 번에 불러올 데이터 수 (예, 64개)
# shuffle: 학습 시 데이터 순서를 무작위로 섞어 모델의 일반화 성능을 높임
# num_workers: 데이터를 불러올 때 사용할 subprocess 수 (환경에 따라 조정)
train_loader = DataLoader(train_dataset, batch_size=batchSize, shuffle=True, num_workers=8)

# 검증 데이터 로더 생성
# 평가 시에는 데이터 순서를 섞을 필요가 없으므로 shuffle=False로 설정
validation_loader = DataLoader(validation_dataset, batch_size=batchSize, shuffle=False, num_workers=8)

# 테스트 데이터 로더 생성 (검증과 동일한 설정)
test_loader = DataLoader(test_dataset, batch_size=batchSize, shuffle=False, num_workers=8)

# STEP 2. 모델 구성

In [7]:
import torch.nn as nn

# CNN 모델 클래스 정의 (nn.Module을 상속받음)
class Model(nn.Module):
    def __init__(self):
        super().__init__()

        # 1. 첫 번째 합성곱(Convolution) 계층
        # 입력 채널: 3 (RGB 이미지), 출력 채널: 32, 커널 크기: 3x3, 패딩: 1 (출력 크기를 유지)
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, padding=1)
        
        # 2. 두 번째 합성곱 계층
        # 입력 채널: 32 (앞 계층의 출력), 출력 채널: 64, 커널 크기: 3x3, 패딩: 1
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1)

        # 3. 최대 풀링 계층
        # 2x2 영역에서 최대값을 취해 이미지의 크기를 절반으로 줄임 (stride=2)
        self.pooling = nn.MaxPool2d(kernel_size=2, stride=2)

        # 4. 완전 연결(fully-connected, fc) 계층
        # 첫 번째 fc 계층: 평탄화(flatten)된 8*8*64 차원을 128 차원으로 변환
        self.fc1 = nn.Linear(8 * 8 * 64, 128)
        # 두 번째 fc 계층: 128 차원을 10개의 클래스에 대응하는 출력으로 변환
        self.fc2 = nn.Linear(128, 10)

        # 5. 드롭아웃 계층
        # dropout25: 25%의 확률로 일부 뉴런을 임의로 꺼서 과적합을 방지
        self.dropout25 = nn.Dropout(p=0.25)
        # dropout50: 50%의 확률로 일부 뉴런을 임의로 꺼서 과적합을 방지
        self.dropout50 = nn.Dropout(p=0.5)

    # 순전파(forward) 함수 정의: 입력 x가 모델을 통과하는 과정을 기술
    def forward(self, x):
        # 첫 번째 합성곱 계층 통과
        x = self.conv1(x)
        # ReLU 활성화 함수 적용하여 음수 값은 0으로 만듦
        x = torch.relu(x)
        # 최대 풀링 계층 적용하여 특징 맵 크기를 줄임
        x = self.pooling(x)
        # 25% 드롭아웃 적용하여 일부 뉴런을 임의로 끔
        x = self.dropout25(x)

        # 두 번째 합성곱 계층 통과
        x = self.conv2(x)
        # ReLU 활성화 함수 적용
        x = torch.relu(x)
        # 다시 최대 풀링 계층 적용 (출력 크기 더 줄어듦)
        x = self.pooling(x)
        # 또 다시 25% 드롭아웃 적용
        x = self.dropout25(x)

        # 다차원 텐서를 1차원으로 평탄화(flatten)
        # -1은 배치 크기를 자동으로 맞춰줌 (여기서 8x8x64는 풀링 후의 차원)
        x = x.view(-1, 8 * 8 * 64)

        # 첫 번째 완전 연결 계층 통과
        x = self.fc1(x)
        # ReLU 활성화 함수 적용
        x = torch.relu(x)
        # 50% 드롭아웃 적용 (더 강한 과적합 방지)
        x = self.dropout50(x)

        # 두 번째 완전 연결 계층 (출력 계층) 전에도 50% 드롭아웃 적용
        logits = self.dropout50(x)

        # 최종 출력 (각 클래스에 대한 점수 반환)
        return logits

# 모델 객체 생성 후, 지정한 DEVICE("cuda" 또는 "cpu")로 이동
model = Model().to(DEVICE)

# 분류 문제에서 많이 사용하는 손실 함수 (교차 엔트로피 손실 함수) 정의
loss_function = nn.CrossEntropyLoss()

# Adam 옵티마이저 정의 (모델 파라미터 업데이트, 학습률은 0.001로 설정)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

# 모델, 손실 함수, 옵티마이저 정보를 출력
print("---------------------------------------------------------------------")
print("model : {}".format(model))
print("---------------------------------------------------------------------")
print("loss_function : {}".format(loss_function))
print("---------------------------------------------------------------------")
print("optimizer : {}".format(optimizer))
print("---------------------------------------------------------------------")

---------------------------------------------------------------------
model : Model(
  (conv1): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv2): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (pooling): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (fc1): Linear(in_features=4096, out_features=128, bias=True)
  (fc2): Linear(in_features=128, out_features=10, bias=True)
  (dropout25): Dropout(p=0.25, inplace=False)
  (dropout50): Dropout(p=0.5, inplace=False)
)
---------------------------------------------------------------------
loss_function : CrossEntropyLoss()
---------------------------------------------------------------------
optimizer : Adam (
Parameter Group 0
    amsgrad: False
    betas: (0.9, 0.999)
    capturable: False
    differentiable: False
    eps: 1e-08
    foreach: None
    fused: None
    lr: 0.001
    maximize: False
    weight_decay: 0
)
-------------------------------------------------

# STEP 3. train/evaluate 구성

In [8]:
def model_train(dataloader, model, loss_function, optimizer):
    # 1. 모델을 학습 모드로 전환합니다.
    #    - model.train()을 호출하면 dropout이나 batch normalization과 같은 계층들이 학습 모드에 맞게 동작합니다.
    model.train()

    # 2. 학습 과정 동안 누적할 변수들을 초기화합니다.
    #    - train_loss_sum: 모든 배치(batch)의 손실(loss) 값을 누적할 변수 (float)
    #    - train_correct: 모든 배치에서 예측이 맞은 샘플 수를 누적할 변수 (int)
    #    - train_total: 전체 샘플 수를 누적할 변수 (int)
    train_loss_sum = train_correct = train_total = 0

    # 3. 전체 배치의 수를 계산합니다.
    #    - dataloader는 DataLoader 객체이며, len(dataloader)는 전체 배치의 개수 (int)를 반환합니다.
    total_train_batch = len(dataloader)

    # 4. DataLoader를 통해 배치 단위로 데이터를 순회합니다.
    #    - images: 한 배치에 포함된 이미지 텐서, 보통 shape는 [batch_size, 채널, 높이, 너비] (예: [64, 3, 32, 32])
    #    - labels: 해당 이미지들의 정답 레이블 텐서, 보통 shape는 [batch_size] (예: [64])
    for images, labels in dataloader:

        # 5. 현재 배치의 데이터를 지정한 DEVICE(GPU 또는 CPU)로 이동시킵니다.
        #    - x_train: images 데이터를 DEVICE로 이동 (동일한 shape, 데이터 타입은 torch.Tensor)
        #    - y_train: labels 데이터를 DEVICE로 이동 (동일한 shape, 데이터 타입은 torch.Tensor)
        x_train = images.to(DEVICE)
        y_train = labels.to(DEVICE)

        # 6. 모델에 입력 데이터를 넣어 예측 결과를 구합니다.
        #    - outputs: 모델의 출력 텐서, 보통 shape는 [batch_size, 클래스 수] (예: [64, 10])
        outputs = model(x_train)

        # 7. 모델의 출력과 실제 정답을 비교하여 손실(loss)를 계산합니다.
        #    - loss_function: 주로 nn.CrossEntropyLoss() 등 분류 손실 함수를 사용
        #    - loss: 현재 배치의 손실 값 (torch.Tensor, 스칼라 형태)
        loss = loss_function(outputs, y_train)

        # 8. 옵티마이저의 기울기(gradient)를 0으로 초기화합니다.
        #    - 기울기가 누적되어 이전 배치의 정보가 남는 것을 방지합니다.
        optimizer.zero_grad()

        # 9. 역전파(backward propagation)를 수행하여 기울기를 계산합니다.
        loss.backward()

        # 10. 옵티마이저를 통해 모델 파라미터를 업데이트합니다.
        optimizer.step()

        # 11. 현재 배치의 손실 값을 train_loss_sum에 누적합니다.
        #     - loss.item()은 손실 텐서에서 파이썬 스칼라 값을 추출합니다.
        train_loss_sum += loss.item()

        # 12. 현재 배치의 총 샘플 수를 train_total에 누적합니다.
        #     - y_train.size(0)은 배치의 첫 번째 차원(샘플 수)를 반환합니다.
        train_total += y_train.size(0)

        # 13. 현재 배치에서 맞춘 샘플 수를 계산하여 train_correct에 누적합니다.
        #     - torch.argmax(outputs, 1): 각 샘플별로 예측 결과(outputs) 중 가장 큰 값을 가진 인덱스(예측 클래스)를 반환
        #     - (torch.argmax(outputs, 1) == y_train): 예측한 클래스와 실제 라벨이 같은지를 비교하여 Boolean 텐서를 만듦
        #     - .sum(): True(맞은 샘플)의 개수를 셈 (Tensor형태)
        #     - .item(): 그 결과를 파이썬 스칼라 값으로 변환
        train_correct += ((torch.argmax(outputs, 1) == y_train)).sum().item()

    # 14. 평균 손실(train_avg_loss)을 계산합니다.
    #     - train_loss_sum: 모든 배치에서의 손실 합계 (float)
    #     - total_train_batch: 전체 배치 수 (int)
    #     - 두 값을 나누어 배치당 평균 손실을 구함
    train_avg_loss = train_loss_sum / total_train_batch

    # 15. 평균 정확도(train_avg_accuracy)를 계산합니다.
    #     - train_correct: 맞은 샘플의 총 개수 (int)
    #     - train_total: 전체 샘플 수 (int)
    #     - (train_correct / train_total)은 정확도를 소수점으로 나타내며, 여기에 100을 곱해 백분율로 표현합니다.
    train_avg_accuracy = 100 * train_correct / train_total

    # 16. 최종적으로 배치 평균 손실과 전체 정확도를 튜플 형태로 반환합니다.
    return (train_avg_loss, train_avg_accuracy)

In [9]:
def model_evaluate(dataloader, model, loss_function, optimizer):
    # 1. 모델을 평가 모드로 전환합니다.
    #    - model.eval()은 드롭아웃, 배치 정규화 등 특정 계층들이 평가용으로 동작하도록 설정합니다.
    model.eval()

    # 2. 평가 시에는 그래디언트(gradient)를 계산하지 않도록 설정합니다.
    #    - with torch.no_grad() 블록 안에서는 backward 연산이 수행되지 않아 메모리 사용량과 연산 속도를 최적화할 수 있습니다.
    with torch.no_grad():

        # 3. 평가 동안 누적할 변수를 초기화합니다.
        #    - val_loss_sum: 각 배치(batch)에서 계산된 손실(loss) 값의 총합 (float)
        #    - val_correct: 맞게 예측한 샘플 수를 누적 (int)
        #    - val_total: 전체 평가에 사용된 샘플의 총 개수 (int)
        val_loss_sum = val_correct = val_total = 0

        # 4. 전체 평가 배치(batch)의 개수를 계산합니다.
        #    - dataloader의 길이는 전체 배치의 수를 의미합니다.
        total_val_batch = len(dataloader)

        # 5. DataLoader를 이용해 평가 데이터를 배치 단위로 순회합니다.
        #    - images: 각 배치에 포함된 이미지 텐서 (일반적으로 shape: [batch_size, 채널, 높이, 너비], 예: [64, 3, 32, 32])
        #    - labels: 각 이미지에 해당하는 정답 레이블 텐서 (일반적으로 shape: [batch_size], 예: [64])
        for images, labels in dataloader:

            # 6. 현재 배치의 데이터를 지정한 DEVICE(GPU 또는 CPU)로 이동합니다.
            #    - x_val: images 데이터를 DEVICE로 이동 (동일한 shape, torch.Tensor)
            #    - y_val: labels 데이터를 DEVICE로 이동 (동일한 shape, torch.Tensor)
            x_val = images.to(DEVICE)
            y_val = labels.to(DEVICE)

            # 7. 모델에 평가 데이터(x_val)를 입력하여 예측 결과(outputs)를 얻습니다.
            #    - outputs: 모델의 출력 텐서 (일반적으로 shape: [batch_size, 클래스 수], 예: [64, 10])
            outputs = model(x_val)

            # 8. 모델의 출력(outputs)과 실제 정답(y_val)을 비교하여 손실(loss)을 계산합니다.
            #    - loss_function은 주로 nn.CrossEntropyLoss() 등 분류 문제에서 사용되는 손실 함수입니다.
            loss = loss_function(outputs, y_val)

            # 9. 현재 배치의 손실 값을 스칼라 값으로 변환하여 누적합니다.
            #    - loss.item()은 텐서(loss)를 파이썬 스칼라 값으로 변환합니다.
            val_loss_sum += loss.item()

            # 10. 현재 배치의 총 샘플 수를 누적합니다.
            #     - y_val.size(0)은 현재 배치의 샘플 수(배치 크기)를 반환합니다.
            val_total += y_val.size(0)

            # 11. 현재 배치에서 맞춘 샘플 수를 계산하여 누적합니다.
            #     - torch.argmax(outputs, 1)은 각 샘플별로 가장 높은 점수를 가진 인덱스(예측 클래스)를 반환합니다.
            #     - (torch.argmax(outputs, 1) == y_val)는 예측 클래스와 실제 레이블을 비교하여 Boolean 텐서를 생성합니다.
            #     - .sum()을 통해 True(맞은 샘플)의 개수를 계산하고, .item()으로 파이썬 스칼라 값으로 변환합니다.
            val_correct += ((torch.argmax(outputs, 1) == y_val)).sum().item()

        # 12. 평가 전체 배치에 대한 평균 손실(val_avg_loss)을 계산합니다.
        #     - 전체 손실의 합(val_loss_sum)을 전체 배치 수(total_val_batch)로 나눕니다.
        val_avg_loss = val_loss_sum / total_val_batch

        # 13. 전체 평가 데이터에 대한 평균 정확도(val_avg_accuracy)를 계산합니다.
        #     - 전체 맞은 샘플 수(val_correct)를 전체 샘플 수(val_total)로 나눈 후, 100을 곱해 백분율(%)로 표현합니다.
        val_avg_accuracy = 100 * val_correct / val_total

    # 14. 최종적으로 평가 모드에서 계산된 평균 손실과 평균 정확도를 튜플 형태로 반환합니다.
    return (val_avg_loss, val_avg_accuracy)


In [10]:
def model_test(dataloader, model): # model_eval과 동일함

    model.eval()

    with torch.no_grad():

        test_loss_sum = test_correct = test_total = 0

        total_test_batch = len(dataloader)

        for images, labels in dataloader:

            x_test = images.to(DEVICE)
            y_test = labels.to(DEVICE)

            outputs = model(x_test)
            loss = loss_function(outputs, y_test)

            test_loss_sum += loss.item()

            test_total += y_test.size(0)
            test_correct += ((torch.argmax(outputs, 1)==y_test)).sum().item()

        test_avg_loss = test_loss_sum / total_test_batch
        test_avg_accuracy = 100*test_correct / test_total

        print('accuracy:', test_avg_accuracy)
        print('loss:', test_avg_loss)

# STEP 4. 학습 수행 및 결과 분석

In [None]:
from datetime import datetime  # datetime 모듈을 사용하여 시간 측정을 수행합니다.

# 학습 및 평가 결과를 저장할 리스트들 (각 epoch마다 손실과 정확도를 기록)
train_loss_list = []       # 각 epoch의 평균 학습 손실 (float) 저장
train_accuracy_list = []   # 각 epoch의 평균 학습 정확도 (float) 저장

val_loss_list = []         # 각 epoch의 평균 검증 손실 (float) 저장
val_accuracy_list = []     # 각 epoch의 평균 검증 정확도 (float) 저장

# 전체 학습 시작 시간 기록 (전체 코드가 실행되는 시작 시각)
start_time = datetime.now()

EPOCHS = 30  # 학습할 epoch(반복) 수, 정수형

for epoch in range(EPOCHS):
    # -----------------------------------------------
    # 각 epoch 시작 시각을 기록합니다.
    epoch_start = datetime.now()  # epoch 시작 시간 (datetime 객체)
    # -----------------------------------------------

    # =================== model train =================== #
    # train_loader: DataLoader 객체, 배치 단위의 학습 데이터를 제공합니다.
    # model_train 함수는 학습 데이터를 이용해 모델 파라미터를 업데이트하고,
    # 평균 손실(train_avg_loss)과 평균 정확도(train_avg_accuracy)를 반환합니다.
    train_avg_loss, train_avg_accuracy = model_train(train_loader, model, loss_function, optimizer)

    
    # 반환된 손실과 정확도를 리스트에 저장합니다.
    train_loss_list.append(train_avg_loss)
    train_accuracy_list.append(train_avg_accuracy)
    # ==================================================== #

    # =================== model evaluation =================== #
    # validation_loader: DataLoader 객체, 배치 단위의 검증 데이터를 제공합니다.
    # model_evaluate 함수는 검증 데이터를 이용해 모델의 성능(평균 손실과 정확도)을 평가합니다.
    val_avg_loss, val_avg_accuracy = model_evaluate(validation_loader, model, loss_function, optimizer)
    
    # 반환된 검증 손실과 정확도를 리스트에 저장합니다.
    val_loss_list.append(val_avg_loss)
    val_accuracy_list.append(val_avg_accuracy)
    # ======================================================= #

    # -----------------------------------------------
    # 각 epoch 종료 시각을 기록하고, 해당 epoch에 걸린 시간을 계산합니다.
    epoch_end = datetime.now()           # epoch 종료 시간 (datetime 객체)
    epoch_elapsed = epoch_end - epoch_start  # epoch 소요 시간 (timedelta 객체)
    epoch_seconds = epoch_elapsed.total_seconds() # 소요 시간을 초 단위의 float 값으로 변환
    # -----------------------------------------------

    # 현재 epoch의 결과와 소요 시간을 출력합니다.
    print('epoch:', '%02d' % (epoch + 1),
          'epoch_time =', '{:.2f} sec '.format(epoch_seconds),
          'train_loss =', '{:.2f} '.format(train_avg_loss),
          'train_acc =', '{:.2f} '.format(train_avg_accuracy),
          'val_loss =', '{:.2f} '.format(val_avg_loss),
          'val_acc =', '{:.2f} '.format(val_avg_accuracy)
          )

# -----------------------------------------------
# 전체 학습 종료 시각을 기록하고 전체 소요 시간을 계산합니다.
end_time = datetime.now()  # 전체 종료 시간
total_elapsed_time = end_time - start_time  # 전체 실행 시간 (timedelta 객체)
# -----------------------------------------------

print('Total elapsed time => ', total_elapsed_time)


epoch: 01 epoch_time = 77.28 sec  train_loss = 4.38  train_acc = 15.18  val_loss = 4.16  val_acc = 34.77 
epoch: 02 epoch_time = 76.92 sec  train_loss = 4.29  train_acc = 18.15  val_loss = 4.09  val_acc = 41.75 
epoch: 03 epoch_time = 76.27 sec  train_loss = 4.25  train_acc = 19.22  val_loss = 4.06  val_acc = 44.68 
epoch: 04 epoch_time = 76.29 sec  train_loss = 4.23  train_acc = 19.80  val_loss = 4.08  val_acc = 49.69 
epoch: 05 epoch_time = 76.46 sec  train_loss = 4.21  train_acc = 20.18  val_loss = 4.08  val_acc = 50.35 
epoch: 06 epoch_time = 76.31 sec  train_loss = 4.20  train_acc = 20.42  val_loss = 3.96  val_acc = 53.27 
epoch: 07 epoch_time = 76.46 sec  train_loss = 4.18  train_acc = 20.74  val_loss = 3.89  val_acc = 52.36 
epoch: 08 epoch_time = 76.49 sec  train_loss = 4.19  train_acc = 20.82  val_loss = 4.01  val_acc = 55.19 
epoch: 09 epoch_time = 76.26 sec  train_loss = 4.17  train_acc = 20.80  val_loss = 3.95  val_acc = 55.05 
epoch: 10 epoch_time = 76.30 sec  train_loss =