# 3. 파이토치로 구현한 CNN
CNN은 Convolutional Neural Network의 약자로, 이미지 처리에 특화된 신경망입니다.
CNN은 이미지의 특징을 추출하는 컨볼루션층(합성곱)과 풀링층, 그리고 분류를 수행하는 완전연결층으로 구성되어 있습니다.
컨볼루션층에서 이미지의 특징을을 추출하고 풀링 계층은 그 필터를 거친 여러 특징중 중요한 특징들을 골라냅니다.
덜 중요한 특징은 버림으로써 이미지의 차원을 감소시켜 비용을 낮춥니다.
이런 층들로 만들어진 CNN이 궁극적으로 하는 것은 이미지에서 특징을 추출하는 필터를 학습시키는 것입니다.
이번에는 파이토치로 CNN을 구현하고, Fashion MNIST 데이터셋을 분류하는 실습을 진행해보겠습니다.
## 3-1. Fashion MNIST 데이터셋 분류하기

In [4]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision import datasets, transforms

USE_CUDA = torch.cuda.is_available()
DEVICE = torch.device("cuda" if USE_CUDA else "cpu")

EPOCHS = 40
BATCH_SIZE = 64
LEARNING_RATE = 0.01

train_loader = torch.utils.data.DataLoader(
    datasets.FashionMNIST('./.data',
                          train=True,
                          download=True,
                          transform=transforms.Compose([
                              transforms.ToTensor(),
                              transforms.Normalize((0.1307,), (0.3081,))
                          ])),
    batch_size=BATCH_SIZE,
    shuffle=True)
test_loader = torch.utils.data.DataLoader(
    datasets.FashionMNIST('./.data',
                          train=False,
                          transform=transforms.Compose([
                              transforms.ToTensor(),
                              transforms.Normalize((0.1307,), (0.3081,))
                          ])),
    batch_size=BATCH_SIZE,
    shuffle=True)

class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        ## 첫번째 파라미터는 입력 채널의 수이다. Fashion MNIST는 흑백 이미지이므로 1이다.
        ## 두번째 파라미터는 출력 채널의 수이다. 10으로 설정했다. (10개의 필터를 사용한다는 의미)
        ## stride, padding은 default 값으로 설정.
        ## kernel_size(필터의 크기)는 5*5로 설정했다.
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10) ## 분류할 클래스 갯수인 10개로 출력 설정

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        ## 커널사이즈가 2인것은 2*2 윈도우 크기로 맥스 풀링 하겠다는 것
        x = F.relu(F.max_pool2d(self.drop(self.conv2(x)), 2))
#        print('x사이즈 : ',x.size()) => x사이즈 :  torch.Size([64, 20, 4, 4])
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x)
        x = self.fc2(x)
        ## x사이즈 : 64*10 => 배치사이즈가 64이고, 10개의 클래스로 분류한다는 의미
        return x

model = CNN().to(DEVICE)
optimizer = optim.SGD(model.parameters(), lr=LEARNING_RATE, momentum=0.5)

def train(model, train_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(DEVICE), target.to(DEVICE)
        optimizer.zero_grad()
        output = model(data)
        loss = F.cross_entropy(output, target)
        loss.backward()
        optimizer.step()

        if batch_idx % 200 == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))

def evaluate(model, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(DEVICE), target.to(DEVICE)
            output = model(data)

            test_loss += F.cross_entropy(output, target,
                                         reduction='sum').item()

            pred = output.max(1, keepdim=True)[1]
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)
    test_accuracy = 100. * correct / len(test_loader.dataset)
    return test_loss, test_accuracy

for epoch in range(1, EPOCHS + 1):
    train(model, train_loader, optimizer, epoch)
    test_loss, test_accuracy = evaluate(model, test_loader)

    print('[{}] Test Loss: {:.4f}, Accuracy: {:.2f}%'.format(
        epoch, test_loss, test_accuracy))


[1] Test Loss: 0.7764, Accuracy: 71.08%
[2] Test Loss: 0.6970, Accuracy: 73.44%
[3] Test Loss: 0.6219, Accuracy: 77.01%
[4] Test Loss: 0.5775, Accuracy: 79.25%
[5] Test Loss: 0.5503, Accuracy: 79.83%
[6] Test Loss: 0.5407, Accuracy: 79.87%
[7] Test Loss: 0.5167, Accuracy: 81.47%
[8] Test Loss: 0.5078, Accuracy: 81.84%
[9] Test Loss: 0.4975, Accuracy: 82.13%
[10] Test Loss: 0.4786, Accuracy: 82.73%
[11] Test Loss: 0.4665, Accuracy: 83.08%
[12] Test Loss: 0.4638, Accuracy: 83.65%
[13] Test Loss: 0.4521, Accuracy: 84.00%
[14] Test Loss: 0.4566, Accuracy: 83.92%
[15] Test Loss: 0.4421, Accuracy: 84.25%
[16] Test Loss: 0.4414, Accuracy: 84.41%
[17] Test Loss: 0.4307, Accuracy: 84.60%
[18] Test Loss: 0.4225, Accuracy: 84.68%
[19] Test Loss: 0.4209, Accuracy: 84.65%
[20] Test Loss: 0.4263, Accuracy: 84.63%
[21] Test Loss: 0.4189, Accuracy: 84.96%
[22] Test Loss: 0.4127, Accuracy: 85.09%
[23] Test Loss: 0.4138, Accuracy: 85.06%
[24] Test Loss: 0.4100, Accuracy: 85.37%
[25] Test Loss: 0.4092, A

## 3-2. ResNet으로 컬러 데이터셋에 적용

In [10]:
EPOCHS = 30
BATCH_SIZE = 128

train_loader = torch.utils.data.DataLoader(
    datasets.CIFAR10('./.data',
                        train=True,
                        download=True,
                        transform=transforms.Compose([
                            transforms.RandomCrop(32, padding=4),
                            ## 데이터 증강 방법중 패딩이 추가된 이미지에서 32*32 크기로 랜덤하게 잘라내는 RandomCrop
                            ## 채널수는 그대로 가져감 (색상 3개를 쓰는 데이터이므로 채널수는 3 => 32*32*3)
                            transforms.RandomHorizontalFlip(),
                            transforms.ToTensor(),
                            transforms.Normalize((0.5, 0.5, 0.5),
                                                (0.5, 0.5, 0.5))
                        ])),
    batch_size=BATCH_SIZE,shuffle=True)
test_loader = torch.utils.data.DataLoader(
    datasets.CIFAR10('./.data',
                        train=False,
                        transform=transforms.Compose([
                            transforms.ToTensor(),
                            transforms.Normalize((0.5, 0.5, 0.5),
                                                (0.5, 0.5, 0.5))
                        ])),
    batch_size=BATCH_SIZE,shuffle=True)

## ResNet 내의 기본 블록을 정의 (이건 ResNet 과정 중의 하나이므로 전체 모델 프로세스의 일부라고 생각하고 이해하면 됨.)
class BasicBlock(nn.Module):
    def __init__(self, in_planes, planes, stride=1):
        super(BasicBlock, self).__init__()

        self.conv1 = nn.Conv2d(in_planes, planes,kernel_size=3,stride=stride,padding=1,bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        ## 출력 채널들을 정규화하는 레이어
        self.conv2 = nn.Conv2d(planes, planes,kernel_size=3,stride=stride,padding=1,bias=False)
        self.bn2 = nn.BatchNorm2d(planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, planes,kernel_size=1,stride=stride,bias=False),
                nn.BatchNorm2d(planes)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out
    
## ResNet 모델을 정의 (코드 흐름 이해안되면 책 보기 - 172~177)
class ResNet(nn.Module):
    def __init__(self, num_classes=10):
        super(ResNet, self).__init__()
        self.in_planes = 16

        self.conv1 = nn.Conv2d(3, 16, kernel_size=3,stride=1,padding=1,bias=False)
        self.bn1 = nn.BatchNorm2d(16)
        self.layer1 = self._make_layer(16, 2, stride=1)
        self.layer2 = self._make_layer(32, 2, stride=2)
        self.layer3 = self._make_layer(64, 2, stride=2)
        self.linear = nn.Linear(64, num_classes)

    def _make_layer(self, planes, num_blocks, stride):
        strides = [stride] + [1]*(num_blocks-1)
        layers = []
        for stride in strides:
            layers.append(BasicBlock(self.in_planes, planes, stride))
            self.in_planes = planes
        return nn.Sequential(*layers)

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = F.avg_pool2d(out, 8)
        out = out.view(out.size(0), -1)
        out = self.linear(out)
        return out

model = ResNet().to(DEVICE)
optimizer = optim.SGD(model.parameters(), lr=0.1,
                      momentum=0.9, weight_decay=0.0005)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=50, gamma=0.1)
## 스케줄러는 학습률을 동적으로 조절하는 역할을 한다. (학습률을 조절하는 방법은 여러가지가 있음)
## StepLR은 매 step_size마다 learning rate에 gamma를 곱해준다.
## 예를 들어, step_size=50, gamma=0.1이면 50번째 epoch마다 learning rate에 0.1을 곱해준다.

def train(model, train_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(DEVICE), target.to(DEVICE)
        optimizer.zero_grad()
        output = model(data)
        loss = F.cross_entropy(output, target)
        loss.backward()
        optimizer.step()

        if batch_idx % 200 == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))

def evaluate(model, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(DEVICE), target.to(DEVICE)
            output = model(data)

            test_loss += F.cross_entropy(output, target,
                                         reduction='sum').item()

            pred = output.max(1, keepdim=True)[1]
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)
    test_accuracy = 100. * correct / len(test_loader.dataset)
    return test_loss, test_accuracy

print(model)

Files already downloaded and verified
ResNet(
  (conv1): Conv2d(3, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
  (bn1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (shortcut): Sequential()
    )
    (1): BasicBlock(
      (conv1): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2

In [11]:
for epoch in range(1,EPOCHS+1):
    scheduler.step() ## 에폭마다 호출해줌
    train(model, train_loader, optimizer, epoch)
    test_loss, test_accuracy = evaluate(model, test_loader)
    print('[{}] Test Loss: {:.4f}, Accuracy: {:.2f}%'.format(
        epoch, test_loss, test_accuracy))



RuntimeError: The size of tensor a (8) must match the size of tensor b (16) at non-singleton dimension 3