<a href="https://colab.research.google.com/github/SurinSeong/deep_learning/blob/main/ResNet_architecture.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# ResNet-18 구조 파악 (Pytorch)

In [None]:
## 7x7 conv layer로 시작
# ResNet에 적합한 초기화 방법 : He initialization ==> 가중치 초기화
## residual block을 깊게 쌓아 깊은 층을 가지도록 설계
# 3x3 conv layer 사용 ==> 파라미터 수가 급격하게 증가하지 않도록 설계
# block의 색이 변할 때마다 공간 해상도는 절반으로 감소하고 (strides=2), 채널의 수는 두배씩 증가
## 하나의 FC layer로 최종 결과 값을 출력

* 시작부분
    * 7x7, 64, stride 2
    * 3x3 max pool, stride 2

In [None]:
# 기본 ResNet 50층
def resnet50(pretrained=False, progress=True, **kwargs):
    return _resnet('resnet50', Bottleneck, [3, 4, 6, 3], pretrained, progress, **kwargs)

In [None]:
def _resnet(arch, block, layers, pretrained, progress, **kwargs):
    r"""
    - pretrained: pretrained된 모델 가중치를 불러오기 (saved by caffe)
    - arch: ResNet모델 이름
    - block: 어떤 block 형태 사용할지 ("Basic or Bottleneck")
    - layers: 해당 block이 몇번 사용되는지를 list형태로 넘겨주는 부분
    """
    model = ResNet(block, layers, **kwargs)
    if pretrained:
        state_dict = load_state_dict_from_url(model_urls[arch], progress=progress)
        model.load_state_dict(state_dict)
    return model

## Convolution Layer

In [None]:
def conv3x3(in_planes, out_planes, stride=1, groups=1, dilation=1):
    r"""
    3x3 convolution with padding
    - in_planes: in_channels
    - out_channels: out_channels
    - bias=False: BatchNorm에 bias가 포함되어 있으므로, conv2d는 bias=False로 설정.
    """
    return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride,
                     padding=dilation, groups=groups, bias=False, dilation=dilation)

def conv1x1(in_planes, out_planes, stride=1):
    """1x1 convolution"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)

## Bottleneck

In [None]:
class Bottleneck(nn.Module):
    # Bottleneck in torchvision places the stride for downsampling at 3x3 convolution(self.conv2)
    # while original implementation places the stride at the first 1x1 convolution(self.conv1)
    # according to "Deep residual learning for image recognition"https://arxiv.org/abs/1512.03385.
    # This variant is also known as ResNet V1.5 and improves accuracy according to
    # https://ngc.nvidia.com/catalog/model-scripts/nvidia:resnet_50_v1_5_for_pytorch.

    expansion = 4 # 블록 내에서 차원을 증가시키는 3번째 conv layer에서의 확장계수

    def __init__(self, inplanes, planes, stride=1, downsample=None, groups=1,
                 base_width=64, dilation=1, norm_layer=None):
        super(Bottleneck, self).__init__()
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        # ResNext나 WideResNet의 경우 사용
        width = int(planes * (base_width / 64.)) * groups

        # Bottleneck Block의 구조
        self.conv1 = conv1x1(inplanes, width)
        self.bn1 = norm_layer(width)
        self.conv2 = conv3x3(width, width, stride, groups, dilation) # conv2에서 downsample
        self.bn2 = norm_layer(width)
        self.conv3 = conv1x1(width, planes * self.expansion)
        self.bn3 = norm_layer(planes * self.expansion)
        self.relu = nn.ReLU(inplace=True)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):
        identity = x
        # 1x1 convolution layer
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        # 3x3 convolution layer
        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)
        # 1x1 convolution layer
        out = self.conv3(out)
        out = self.bn3(out)
        # skip connection
        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity
        out = self.relu(out)

        return out

## ResNet class

In [None]:
class ResNet(nn.Module):
    # __init__에서 처음 conv1층과 마지막층(pooing과 fully connected) 이외에는 _make_layer함수로 모델의 제일 큰 단위의 층을 생성 및 정의
    def __init__(self, block, layers, num_classes=1000, zero_init_residual=False,
                 groups=1, width_per_group=64, replace_stride_with_dilation=None,
                 norm_layer=None):
        super(ResNet, self).__init__()
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        self._norm_layer = norm_layer
        # default values
        self.inplanes = 64 # input feature map
        self.dilation = 1
        # stride를 dilation으로 대체할지 선택
        if replace_stride_with_dilation is None:
            # each element in the tuple indicates if we should replace
            # the 2x2 stride with a dilated convolution instead
            replace_stride_with_dilation = [False, False, False]
        if len(replace_stride_with_dilation) != 3:
            raise ValueError("replace_stride_with_dilation should be None "
                             "or a 3-element tuple, got {}".format(replace_stride_with_dilation))
        self.groups = groups
        self.base_width = width_per_group

        r"""
        - 처음 입력에 적용되는 self.conv1과 self.bn1, self.relu는 모든 ResNet에서 동일
        - 3: 입력으로 RGB 이미지를 사용하기 때문에 convolution layer에 들어오는 input의 channel 수는 3
        """
        self.conv1 = nn.Conv2d(3, self.inplanes, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = norm_layer(self.inplanes)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        r"""
        - 아래부터 block 형태와 갯수가 ResNet층마다 변화
        - self.layer1 ~ 4: 필터의 개수는 각 block들을 거치면서 증가(64->128->256->512)
        - self.avgpool: 모든 block을 거친 후에는 Adaptive AvgPool2d를 적용하여 (n, 512, 1, 1)의 텐서로
        - self.fc: 이후 fc layer를 연결
        """
        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2, # 여기서부터 downsampling적용
                                       dilate=replace_stride_with_dilation[0])
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2,
                                       dilate=replace_stride_with_dilation[1])
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2,
                                       dilate=replace_stride_with_dilation[2])
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

        # Zero-initialize the last BN in each residual branch,
        # so that the residual branch starts with zeros, and each residual block behaves like an identity.
        # This improves the model by 0.2~0.3% according to https://arxiv.org/abs/1706.02677
        if zero_init_residual:
            for m in self.modules():
                if isinstance(m, Bottleneck):
                    nn.init.constant_(m.bn3.weight, 0)
                elif isinstance(m, BasicBlock):
                    nn.init.constant_(m.bn2.weight, 0)

    # _make_layer함수는 논문의 conv2_X, conv3_x, conv4_X, conv5_x을 구현하며 각 층에 해당하는 block을 갯수에 맞게 생성 및 연결시켜주는 역할
    def _make_layer(self, block, planes, blocks, stride=1, dilate=False):
        r"""
        convolution layer 생성 함수
        - block: block종류 지정
        - planes: feature map size (input shape)
        - blocks: layers[0]와 같이, 해당 블록이 몇개 생성돼야하는지, 블록의 갯수 (layer 반복해서 쌓는 개수)
        - stride와 dilate은 고정
        """
        norm_layer = self._norm_layer
        downsample = None
        previous_dilation = self.dilation
        if dilate:
            self.dilation *= stride
            stride = 1

        # the number of filters is doubled: self.inplanes와 planes 사이즈를 맞춰주기 위한 projection shortcut
        # the feature map size is halved: stride=2로 downsampling
        if stride != 1 or self.inplanes != planes * block.expansion:
            downsample = nn.Sequential(
                conv1x1(self.inplanes, planes * block.expansion, stride),
                norm_layer(planes * block.expansion),
            )

        layers = []
        # 블록 내 시작 layer, downsampling 필요
        layers.append(block(self.inplanes, planes, stride, downsample, self.groups,
                            self.base_width, previous_dilation, norm_layer))
        self.inplanes = planes * block.expansion # inplanes 업데이트
        # 동일 블록 반복
        for _ in range(1, blocks):
            layers.append(block(self.inplanes, planes, groups=self.groups,
                                base_width=self.base_width, dilation=self.dilation,
                                norm_layer=norm_layer))

        return nn.Sequential(*layers)

    # forward함수로 모델에 대한 feedforward를 진행
    def _forward_impl(self, x):
        # See note [TorchScript super()]
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)

        return x

    def forward(self, x):
        return self._forward_impl(x)

---

# Transfer-Learning

In [None]:
# torchvision 관련 라이브러리 로드
from torchvision import utils, datasets
import torchvision.transforms as transforms
from torch.utils.data import DataLoader

## 데이터 사용 준비
* 사용할 데이터셋 : STL10
    * Image Classification의 벤치마크로 10개의 라벨을 가진 데이터 셋
    * torchvision에서는 5000개의 train 데이터와 8000개의 test로 구성되어있음.

In [None]:
# 경로 설정
import os

os.mkdir('./train')
os.mkdir('./test')

In [None]:
# train, test 데이터 다운 (datasets.STL10 메소드 다운)
train_dataset = datasets.STL10('./train', split='train', download=True, transform=transforms.ToTensor())
test_dataset = datasets.STL10('./test', split='test', download=True, transform=transforms.ToTensor())

Downloading http://ai.stanford.edu/~acoates/stl10/stl10_binary.tar.gz to ./train/stl10_binary.tar.gz


100%|██████████| 2640397119/2640397119 [02:12<00:00, 19937300.83it/s]


Extracting ./train/stl10_binary.tar.gz to ./train
Downloading http://ai.stanford.edu/~acoates/stl10/stl10_binary.tar.gz to ./test/stl10_binary.tar.gz


100%|██████████| 2640397119/2640397119 [01:24<00:00, 31111430.21it/s]


Extracting ./test/stl10_binary.tar.gz to ./test


* 다운 받은 이미지에 대해 스케일링 과정이 필요함.
    * transform을 활용해 이미지 크기를 고정하고, normalization을 진행
* 주어진 데이터셋의 이미지는 RGB 3개의 채널로 구성되어 있음.

In [None]:
# 채널별 mean값과 std값을 계산한 후, transform을 정의
import numpy as np

# 채널별 mean 계산
def get_mean(dataset):
    meanRGB = [np.mean(image.numpy(), axis=(1, 2)) for image, _ in dataset] # 각 image를 가져오고, _는 라벨을 무시하겠다는 의미 / image.numpy() : image는 pytorch의 Tensor 객체, pytorch 텐서를 numpy 배열로 변환하겠다는 의미 / axis : 높이(H)와 너비(W)에 대해 연산을 수행
    meanR = np.mean([m[0] for m in meanRGB])
    meanG = np.mean([m[1] for m in meanRGB])
    meanB = np.mean([m[2] for m in meanRGB])
    return [meanR, meanG, meanB]

# 채널별 std 계산
def get_std(dataset):
    stdRGB = [np.std(image.numpy(), axis=(1, 2)) for image, _ in dataset]
    stdR = np.mean([s[0] for s in stdRGB])
    stdG = np.mean([s[1] for s in stdRGB])
    stdB = np.mean([s[2] for s in stdRGB])
    return [stdR, stdG, stdB]

* augmentation 방법

In [None]:
# transforms.Compose 메소드로 transform(변형) 단계를 묶어서 진행
# 이미지의 크기를 임의로 128으로 고정한 후, 정규화하는 과정만 진행

train_transforms = transforms.Compose([transforms.Resize((128, 128)),
                                       transforms.ToTensor(),
                                       transforms.Normalize(get_mean(train_dataset), get_std(train_dataset))])

test_transforms = transforms.Compose([transforms.Resize((128, 128)),
                                       transforms.ToTensor(),
                                       transforms.Normalize(get_mean(test_dataset), get_std(test_dataset))])

# transform 정의
train_dataset.transform = train_transforms
test_dataset.transform = test_transforms

* dataloader 정의

In [None]:
# dataloader 정의
train_dataloader = DataLoader(train_dataset, batch_size=128, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=64, shuffle=False)

## Training
* 모델 학습을 위해 pretrained된 resnet50 모델을 사용
    * 해당 resnet 모델은 사전 학습된 모델로, 이미지 분류 문제를 해결할 수 있도록 규모가 큰 데이터(ImageNet)로 미리 학습된 모델이다.

In [None]:
import torch
from torchvision import models

# 학습 환경 설정
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# true 옵션으로 사전 학습된 모델을 로드
model = models.resnet50(pretrained=True).to(device)

Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to /root/.cache/torch/hub/checkpoints/resnet50-0676ba61.pth
100%|██████████| 97.8M/97.8M [00:00<00:00, 120MB/s]


* torchsummary의 summary 메소드로 모델을 요약해서 확인 가능
* 사용할 데이터셋은 RGB 3개의 channel로 구성

In [None]:
# 모델의 layer별 파라미터 개수 확인
from torchsummary import summary

summary(model, (3, 128, 128))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [-1, 64, 64, 64]           9,408
       BatchNorm2d-2           [-1, 64, 64, 64]             128
              ReLU-3           [-1, 64, 64, 64]               0
         MaxPool2d-4           [-1, 64, 32, 32]               0
            Conv2d-5           [-1, 64, 32, 32]           4,096
       BatchNorm2d-6           [-1, 64, 32, 32]             128
              ReLU-7           [-1, 64, 32, 32]               0
            Conv2d-8           [-1, 64, 32, 32]          36,864
       BatchNorm2d-9           [-1, 64, 32, 32]             128
             ReLU-10           [-1, 64, 32, 32]               0
           Conv2d-11          [-1, 256, 32, 32]          16,384
      BatchNorm2d-12          [-1, 256, 32, 32]             512
           Conv2d-13          [-1, 256, 32, 32]          16,384
      BatchNorm2d-14          [-1, 256,

## 모델학습을 위한 함수 정의
* 가장 간단한 형태의 train 컨테이너를 구성

In [None]:
import torch
import torch.nn as nn
from torch import optim

* 전이학습을 위해 필요한 파라미터 정의
    * lr = 0.0001
    * 많은 epoch의 학습을 진행하 때에는 스케쥴러를 사용하면 용이하지만, 가장 간단한 train 컨테이너를 구성해 다루지 않음.
    * optimizer='adam'
    * loss function='crossentropy' (학습 목적에 따라 다양하게 구성 가능)
    * num_epochs(학습횟수)=5

In [None]:
# 파라미터 설정 ==> 수정 예정

lr = 0.0001
num_epochs = 5
optimizer = optim.Adam(model.parameters(), lr=lr)
loss_function = nn.CrossEntropyLoss().to(device)

In [None]:
# 파라미터 저장
params = \
 {'num_epochs':num_epochs,
  'optimizer':optimizer,
  'loss_function':loss_function,
  'train_dataloader':train_dataloader,
  'test_dataloader':test_dataloader,
  'device':device}

In [None]:
# 훈련 함수
def train(model, params):
    # 손실함수
    loss_function=params['loss_function']
    # 훈련 데이터 로더
    train_dataloader=params['train_dataloader']
    # 테스트 데이터 로더
    test_dataloader=params['test_dataloader']
    # 학습환경
    device=params['device']

    for epoch in range(0, num_epochs):
        for i, data in enumerate(train_dataloader, 0):
            # train dataloader로 불러온 데이터에서 이미지와 라벨을 분리한다.
            inputs, labels = data
            inputs = inputs.to(device)
            labels = labels.to(device)

            # 이전 batch에서 계산된 가중치를 초기화
            optimizer.zero_grad()

            # forward + back propagation 연산 (순전파, 역전파)
            # 사전학습된 모델에 input 넣어서 output 출력
            outputs = model(inputs)
            train_loss = loss_function(outputs, labels)
            train_loss.backward()
            optimizer.step()

        # test accuracy 계산
        total = 0
        correct = 0
        accuracy = []

        for i, data in enumerate(test_dataloader, 0):
            # test dataloader로 불러온 데이터에서 이미지와 라벨을 분리한다.
            inputs, labels = data
            inputs = inputs.to(device)
            labels = labels.to(device)

            # 결과값 연산
            outputs = model(inputs)

            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            test_loss = loss_function(outputs, labels).item()
            accuracy.append(100 * correct/total)

        # 학습 결과 출력
        print('Epoch: %d/%d, Train loss: %.6f, Test loss: %.6f, Accuracy: %.2f' %(epoch+1, num_epochs, train_loss.item(), test_loss, 100*correct/total))

In [None]:
train(model, params)

Epoch: 1/5, Train loss: 0.927852, Test loss: 0.438244, Accuracy: 90.28
Epoch: 2/5, Train loss: 1.955365, Test loss: 0.535371, Accuracy: 90.34
Epoch: 3/5, Train loss: 0.607415, Test loss: 0.387795, Accuracy: 90.04


KeyboardInterrupt: 