# Convolution Layers와 PyTorch
- Conv1d(1차원, Text-CNN에서 많이 사용)
- Conv2d(2차원, 이미지 분류에서 많이 사용
- Conv3d(3차원)

# Conv2d
- Conv2d(in_channels, out_channels, kernel_size, stride=1, padding=0, dilation=1, groups=1, bias=True, padding_mode='zeros')

# Shape 이해
- Input Tensor: (N, C, H, W)
  - N: batch 사이즈
  - C: in_channels와 일치해야함
  - H: 2D input Tensor의 높이
  - W: 2D input Tensor의 너비
- Output Tensor: (N, C, H, W)
  - N: batch 사이즈
  - C: out_channels와 일치해야함
  - H: 출력높이 수식
  - W: 출력너비 수식
    - 다양한 CNN 알고리즘 중에는 너비, 높이에서의 padding, stride를 달리할 수 있고, 이를 stride[0], stride[1]과 같은 식의 튜플 형태로 적용도 가능함

In [1]:
import torch
import torch.nn as nn

conv1 = nn.Conv2d(1, 1, 3, padding=1)
input1 = torch.Tensor(1, 1, 5, 5)
out1 = conv1(input1)
out1.shape

torch.Size([1, 1, 5, 5])

# Pooling Layers
- 입력 데이터 차원에 맞추어 Max Pooling 또는 Average Pooling을 적용할 수 있음
  - MaxPool1d
  - MaxPool2d
  - MaxPool3d
  - AvgPool1d
  - AvgPool2d
  - AvgPool3d

# MaxPool2d
- MaxPool2d(kernel_size, stride=None, padding=0, dilation=1, return_indices=False, ceil_mode=False)

In [2]:
import torch
import torch.nn as nn

conv1 = nn.Conv2d(1, 1, 3, padding=1)
input1 = torch.Tensor(1, 1, 5, 5)
pool1 = nn.MaxPool2d(2)
out1 = conv1(input1)
print(out1.shape)
out2 = pool1(out1)
print(out2.shape) # 5/2와 같은 실수면 default는 무조건 내림(5/2 = 2)이다.

torch.Size([1, 1, 5, 5])
torch.Size([1, 1, 2, 2])


# 모델 정의
- Convolution Layer는 입력 데이터에 필터(커널) 적용 후, 활성화함수를 적용한 Layer를 의미함
  1. Convolution Layer는 입력 데이터에 필터(커널) 적용을 위한 전용 클래스 제공(nn.Conv2d)
  2. 이후에 활성화함수 적용(예: nn.LeakyReLU(0.1))
  3. 이후에 Batch Normalization, Dropout 등 regularization 적용(옵션)
  4. 이후에 Pooling 적용(예: nn.MaxPool2d)
- BatchNorm1d()와 BatchNorm2d()
  - BatchNorm1d(C)는 Input과 Output이 (N, C) 또는 (N, C, L)의 형태
    - N은 Batch 크기, C는 Channel, L은 Lengh
  - BatchNorm2d(C)는 Input과 Output이 (N, C, H, W)의 형태
    - N은 Batch 크기, C는 Channel, H는 Height, W는 Width
  - 인자로 Output Channel 수를 넣으면 되며, Conv2d()에서는 BatchNorm2d()를 사용해야함

In [3]:
conv1 = nn.Sequential(
    nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1),
    nn.LeakyReLU(0.1),
    nn.BatchNorm2d(32),
    nn.MaxPool2d(2)
    # Img = (?, 1, 28, 28)
    # 출력 = (?, 32, 28/2, 28/2)
)
input1 = torch.Tensor(1, 1, 28, 28)
out1 = conv1(input1)
print(out1.shape)

torch.Size([1, 32, 14, 14])


In [4]:
conv1 = nn.Sequential(
    nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1),
    nn.LeakyReLU(0.1),
    nn.BatchNorm2d(32),
    nn.MaxPool2d(2),
    # 출력 = (1, 32, 14, 14)
    nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
    nn.LeakyReLU(0.1),
    nn.BatchNorm2d(64),
    nn.MaxPool2d(2),
    # 출력 = (1, 64, 7, 7)
    nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
    nn.LeakyReLU(0.1),
    nn.BatchNorm2d(128),
    nn.MaxPool2d(2)
    # 출력 = (1, 128, 3, 3)
)

input1 = torch.Tensor(1, 1, 28, 28)
out1 = conv1(input1)
out2 = out1.view(out1.size(0), -1)
print(out1.shape, out2.shape, 128*3*3)

torch.Size([1, 128, 3, 3]) torch.Size([1, 1152]) 1152


# CNN 모델 구성
1. 다음 세트로 하나의 Convolution Layer + Pooling Layer를 구성하고 여러 세트로 구축
    - 보통 Convolution Layer + Pooling Layer의 출력 채널을 늘리는 방식으로 여러 세트 구축
2. Flatten
    - 텐서.view(텐서.size(0), -1)로 Flatten
3. 여러 Fully-Connected Layer 구성
    - Flatten한 입력을 받아서 최종 Multi-Class 개수만큼 출력
    - Multi-Class일 경우, nn.LogSoftmax()로 최종 결과값 출력

In [5]:
class CNNModel(nn.Module):
    def __init__(self):
        super().__init__()

        self.conv_layers = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1),
            nn.LeakyReLU(0.1),
            nn.BatchNorm2d(32),
            nn.MaxPool2d(2),
            # 출력 = (N, 32, 14, 14)
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.LeakyReLU(0.1),
            nn.BatchNorm2d(64),
            nn.MaxPool2d(2),
            # 출력 = (N, 64, 7, 7)
            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.LeakyReLU(0.1),
            nn.BatchNorm2d(128),
            nn.MaxPool2d(2)
            # 출력 = (N, 128, 3, 3)
        )

        self.linear_layers = nn.Sequential(
            nn.Linear(128*3*3, 128),
            nn.LeakyReLU(0.1),
            nn.BatchNorm1d(128),
            nn.Linear(128, 64),
            nn.LeakyReLU(0.1),
            nn.BatchNorm1d(64),
            nn.Linear(64, 10),
            nn.LogSoftmax(dim=-1)
        )

    def forward(self, x):
        x = self.conv_layers(x)
        x = x.view(x.size(0), -1)
        x = self.linear_layers(x)
        return x

# MNIST with CNN

In [6]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Subset
from torchvision import datasets, transforms
from sklearn.model_selection import train_test_split
import numpy as np
from copy import deepcopy

In [7]:
train_rawdata = datasets.MNIST(root = 'dataset_MNIST',
                               train = True, # True면 Train 데이터
                               download = True, # 데이터가 없으면 Download
                               transform = transforms.ToTensor()) # raw 포맷을 텐서로 바꿔줌
test_rawdata = datasets.MNIST(root = 'dataset_MNIST',
                               train = False, # False면 Test 데이터
                               download = True, # 데이터가 없으면 Download
                               transform = transforms.ToTensor()) # raw 포맷을 텐서로 바꿔줌
print('number of training data : ', len(train_rawdata))
print('number of test data : ', len(test_rawdata))

number of training data :  60000
number of test data :  10000


In [8]:
VALIDATION_RATE = 0.2
train_indices, val_indices, _, _ = train_test_split(
    range(len(train_rawdata)), # Train 데이터셋의 인덱스 번호 추출(0~59999)
    train_rawdata.targets, # y 정답 라벨
    stratify = train_rawdata.targets, # y 정답 라벨 균등분포
    test_size = VALIDATION_RATE # 여기선 Validation 데이터셋 비율
)

In [9]:
train_dataset = Subset(train_rawdata, train_indices)
validation_dataset = Subset(train_rawdata, val_indices)

In [10]:
print(len(train_dataset), len(validation_dataset), len(test_rawdata))

48000 12000 10000


In [11]:
BATCH_SIZE = 128
train_batchs = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
va_batchs = DataLoader(validation_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_batchs = DataLoader(test_rawdata, batch_size=BATCH_SIZE, shuffle=False)

# CNNModel 객체 생성

In [12]:
model = CNNModel()
model

CNNModel(
  (conv_layers): Sequential(
    (0): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): LeakyReLU(negative_slope=0.1)
    (2): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (4): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (5): LeakyReLU(negative_slope=0.1)
    (6): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (7): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (8): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (9): LeakyReLU(negative_slope=0.1)
    (10): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (11): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (linear_layers): Sequential(
    (0): Linear(in_features=1152, out_features=128, bias=True)

# loss, optimizer 설정

In [13]:
loss_func = nn.NLLLoss()
optimizer = torch.optim.Adam(model.parameters())

# Training & Validation

In [14]:
def train_model(model, early_stop, nb_epochs, progress_interval):
    train_losses, valid_losses, lowest_loss = list(), list(), np.inf

    for epoch in range(nb_epochs):
        train_loss, valid_loss = 0, 0

        # train model
        model.train() # prepare model for training
        for x_minibatch, y_minibatch in train_batchs:
            y_minibatch_pred = model(x_minibatch)
            loss = loss_func(y_minibatch_pred, y_minibatch)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            train_loss += loss.item()

        train_loss = train_loss / len(train_batchs)
        train_losses.append(train_loss)

        # validate model
        model.eval()
        with torch.no_grad():
            for x_minibatch, y_minibatch in va_batchs:
                y_minibatch_pred = model(x_minibatch)
                loss = loss_func(y_minibatch_pred, y_minibatch)
                valid_loss += loss.item()
    
        valid_loss = valid_loss / len(va_batchs)
        valid_losses.append(valid_loss)
    
        if valid_losses[-1] < lowest_loss:
            lowest_loss = valid_losses[-1]
            lowest_epoch = epoch
            best_model = deepcopy(model.state_dict())
        else:
            if (early_stop > 0) and lowest_epoch + early_stop < epoch:
                print("Early Stopped", epoch, "epochs")
                break
    
        if (epoch % progress_interval) == 0:
            print(train_losses[-1], valid_losses[-1], lowest_loss, lowest_epoch, epoch)

    model.load_state_dict(best_model)
    return model, lowest_loss, train_losses, valid_losses

# 훈련 실행

In [15]:
nb_epochs = 30
progress_interval = 3
early_stop = 10

model, lowest_loss, train_losses, valid_losses = train_model(model, early_stop, nb_epochs, progress_interval)

0.12998725000272193 0.05393193105354588 0.05393193105354588 0 0
0.01516358590591699 0.029974613274308913 0.029974613274308913 3 3
0.009930346643474574 0.03491437948492177 0.029974613274308913 3 6
0.005683745020534843 0.0470760349980605 0.029974613274308913 3 9
0.004945649953229198 0.0491833004057395 0.029974613274308913 3 12
Early Stopped 14 epochs
