# CIFAR-10 데이터 학습 및 Test

In [1]:
import torch
import torch.nn as nn # 신경망 모듈을 생성하는 데 필요한 다양한 클래스와 함수를 포함한 PyTorch의 신경망 라이브러리
import torch.optim as optim # 다양한 최적화 알고리즘을 제공하는 PyTorch의 최적화 라이브러리

import torchvision.datasets as datasets # 여러 가지 유명한 데이터셋을 쉽게 불러올 수 있도록 도와주는 torchvision의 데이터셋 라이브러리
import torchvision.transforms as transforms # 데이터 전처리 및 변환을 도와주는 torchvision의 변환 라이브러리

import time
import matplotlib.pyplot as plt

In [2]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

'cuda'

## 데이터 전처리

In [3]:
batch_size = 128
learning_rate = 0.01
epochs=10

In [4]:
standardization = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.4913, 0.4821, 0.4465], std = [0.2470, 0.2434, 0.2615]) # 채널별 평균 및 표준편차

])

train_dataset = datasets.CIFAR10(
    root='./data/CIFAR_10',  # CIFAR-10 데이터셋이 저장될 경로를 지정
    train=True,  # 학습용(train) 데이터셋을 로드할지 여부를 설정 (True면 학습용 데이터, False면 테스트용 데이터)
    download=True,  # 지정된 경로에 데이터셋이 없으면 인터넷에서 자동으로 다운로드하여 저장
    transform=standardization  # 데이터를 로드할 때 적용할 변환(예: 정규화 등)을 지정
)

test_dataset = datasets.CIFAR10(
    root='./data/CIFAR_10',
    train=False,
    download=True,
    transform=standardization
)

train_loader = torch.utils.data.DataLoader(
    dataset=train_dataset,  # 앞서 정의한 학습용 데이터셋을 로드
    batch_size=batch_size,  # 각 미니배치의 크기를 지정 (한 번에 네트워크로 전달할 샘플 수)
    shuffle=True,  # 데이터를 로드할 때 섞을지 여부를 지정 (True면 에포크마다 데이터가 랜덤하게 섞임)
    drop_last=True  # 데이터셋이 배치 사이즈로 나누어 떨어지지 않을 때 마지막 남은 샘플들을 버릴지 여부 (True면 버림)
)

test_loader = torch.utils.data.DataLoader(
    dataset=test_dataset,
    batch_size=batch_size,
    shuffle=False,
    drop_last=True
)

Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./data/CIFAR_10/cifar-10-python.tar.gz


100%|██████████| 170498071/170498071 [00:13<00:00, 12787266.45it/s]


Extracting ./data/CIFAR_10/cifar-10-python.tar.gz to ./data/CIFAR_10
Files already downloaded and verified


In [5]:
batch_train, batch_labels = next(iter(train_loader))  # train_loader에서 첫 번째 배치를 추출하여 batch_train과 batch_labels에 각각 이미지와 라벨을 저장
batch_test, batch_test_labels = next(iter(test_loader))

print(batch_train.shape) # batch_size, channel, height, width
print(batch_labels.shape)

# next(iter())는 파이썬의 반복자를 이용하여 데이터 로더에서 다음 배치를 가져오는 역할을 함
# train_loader와 test_loader는 배치 단위로 데이터를 제공하는 반복자(iterator) 역할을 하며,
# iter(train_loader)는 train_loader 객체에서 반복자를 생성하고 next() 함수는 이 반복자에서 다음 배치를 반환

torch.Size([128, 3, 32, 32])
torch.Size([128])


## 기초 모델 생성
1. BatchNormalization : BatchNorm1d
2. Activation Function : ReLU
3. Initialization : He Initialization



In [6]:
class cifar10_dnn(nn.Module):
    def __init__(self):
        super(cifar10_dnn, self).__init__() # 부모 클래스(nn.Module)의 생성자를 호출하여 nn.Module의 속성들을 이 클래스에서 사용할 수 있도록 함

        # 은닉층 및 출력층 정의
        # 여러 레이어를 순차적으로 쌓아 하나의 블록처럼 처리할 수 있도록 도와줌
        self.layer1 = nn.Sequential(
            nn.Linear(3 * 32 * 32, 1024),
            nn.BatchNorm1d(1024),
            nn.ReLU(),
        )
        self.layer2 = nn.Sequential(
            nn.Linear(1024, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(),
        )
        self.layer3 = nn.Sequential(
            nn.Linear(512, 128),
            nn.BatchNorm1d(128),
            nn.ReLU(),
        )
        self.layer4 = nn.Sequential(
            nn.Linear(128, 32),
            nn.BatchNorm1d(32),
            nn.ReLU(),
        )
        self.layer5 = nn.Sequential(
            nn.Linear(32, 10),
        )

    # 신경망의 순전파 (forward propagation) 정의
    def forward(self, x):
        x = x.view(batch_size, -1)
        out = self.layer1(x)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = self.layer5(out)
        return out

In [7]:
# 신경망의 각 레이어에 대한 초기화 작업 수행
def weight_init(m):
    if isinstance(m, nn.Linear): # 주어진 레이어 m이 nn.Linear 인스턴스인지 확인
        nn.init.kaiming_uniform_(m.weight.data) # nn.Linear 레이어의 가중치에 대해 He 초기화 적용

In [8]:
model = cifar10_dnn() # cifar10_dnn 클래스의 인스턴스를 생성하여 모델 정의
model.apply(weight_init) # 모델의 모든 레이어에 weight_init 함수에서 정의한 가중치 초기화 적용

cifar10_dnn(
  (layer1): Sequential(
    (0): Linear(in_features=3072, out_features=1024, bias=True)
    (1): BatchNorm1d(1024, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
  )
  (layer2): Sequential(
    (0): Linear(in_features=1024, out_features=512, bias=True)
    (1): BatchNorm1d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
  )
  (layer3): Sequential(
    (0): Linear(in_features=512, out_features=128, bias=True)
    (1): BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
  )
  (layer4): Sequential(
    (0): Linear(in_features=128, out_features=32, bias=True)
    (1): BatchNorm1d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
  )
  (layer5): Sequential(
    (0): Linear(in_features=32, out_features=10, bias=True)
  )
)

## 모델 테스트

In [9]:
temp_images = batch_train
test_prediction = model(temp_images) # 임시 입력 데이터로 모델 예측 수행

print(temp_images.shape) # batch_size, channel, height, width
print(test_prediction.shape)

torch.Size([128, 3, 32, 32])
torch.Size([128, 10])


In [10]:
model.to(device) # 모델을 지정된 장치(GPU)로 이동

cifar10_dnn(
  (layer1): Sequential(
    (0): Linear(in_features=3072, out_features=1024, bias=True)
    (1): BatchNorm1d(1024, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
  )
  (layer2): Sequential(
    (0): Linear(in_features=1024, out_features=512, bias=True)
    (1): BatchNorm1d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
  )
  (layer3): Sequential(
    (0): Linear(in_features=512, out_features=128, bias=True)
    (1): BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
  )
  (layer4): Sequential(
    (0): Linear(in_features=128, out_features=32, bias=True)
    (1): BatchNorm1d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
  )
  (layer5): Sequential(
    (0): Linear(in_features=32, out_features=10, bias=True)
  )
)

In [11]:
optimizer = optim.Adam(model.parameters(), lr = learning_rate) # 모델의 학습 가능한 파라미터를 반환하며 이를 Adam 옵티마이저에 전달
loss_func = nn.CrossEntropyLoss() # 분류 문제에서 많이 사용되며, 모델의 출력과 실제 레이블 간의 차이를 계산하여 손실 값을 반환

In [12]:
loss_arrs = [] # 손실 값들을 저장할 리스트 초기화

# 모델 학습을 위한 함수
def fit(model, train_loader, epochs, optimizer, loss_func):
    for epoch in range(epochs):
        start = time.time()
        avg_loss = 0
        total_batch = len(train_dataset) // batch_size # 전체 배치 수 계산

        # 학습 데이터 로더에서 배치를 하나씩 가져와 모델 학습
        for num, (images, labels) in enumerate(train_loader):
            images = images.to(device)
            labels = labels.to(device)

            outputs = model(images) # 모델에 이미지를 입력하여 출력(예측값) 산출
            loss = loss_func(outputs, labels) # 출력값과 실제 라벨 간의 손실 계산

            optimizer.zero_grad() # optimizer의 모든 가중치에 대해 변화도를 0으로 초기화
            loss.backward() # 손실에 대한 역전파(gradient)를 계산하여 각 가중치에 대한 변화도 계산
            optimizer.step() # 계산된 변화도를 바탕으로 optimizer가 가중치 업데이트

            avg_loss += loss / total_batch # 현재 배치에서의 손실 값을 평균 손실에 합산
            loss_arrs.append(avg_loss)
        print(f'[Epoch: {epoch+1}/{epochs}]  loss={avg_loss:.4f},  time={time.time()-start:.2f}')
    print('Learning Finished !')

In [13]:
fit(model, train_loader, epochs, optimizer, loss_func)

[Epoch: 1/10]  loss=1.6913,  time=15.47
[Epoch: 2/10]  loss=1.4545,  time=14.94
[Epoch: 3/10]  loss=1.3389,  time=13.87
[Epoch: 4/10]  loss=1.2497,  time=14.04
[Epoch: 5/10]  loss=1.1806,  time=14.10
[Epoch: 6/10]  loss=1.1056,  time=13.35
[Epoch: 7/10]  loss=1.0439,  time=13.13
[Epoch: 8/10]  loss=0.9836,  time=13.95
[Epoch: 9/10]  loss=0.9169,  time=13.72
[Epoch: 10/10]  loss=0.8540,  time=13.25
Learning Finished !


In [14]:
# 모델 평가를 위한 함수
def test(model, test_loader, epochs):
    start = time.time()
    test_acc = 0 # 전체 평가 정확도를 저장할 변수 초기화

    # 평가 데이터 로더에서 배치를 하나씩 가져와 모델 평가
    for num, (images, labels) in enumerate(test_loader):
        test_images = images.to(device)
        test_labels = labels.to(device)

        test_outputs = model(test_images) # 모델에 평가 이미지를 입력하여 출력(예측값) 산출

        test_batch_acc = ((test_outputs.argmax(dim=1) == test_labels).float().mean()) # acc = 맞춘 개수 / 배치사이즈
        # 모델의 예측에서 가장 높은 확률을 가진 클래스 선택한 뒤, 실제 라벨과 일치하는지 확인해 배치별 정확도 산출
        test_acc += test_batch_acc / len(test_loader) # acc / total_Iteration
        # 배치별 정확도를 누적하여 평균 정확도 산출
    print(f'test_acc: {test_acc}, takes {time.time()-start:.2f} secs')

In [15]:
test(model, test_loader, epochs)

test_acc: 0.5434696078300476, takes 2.53 secs


---