In [1]:
!git clone https://github.com/Rope-player/pytorch_advanced.git

Cloning into 'pytorch_advanced'...
remote: Enumerating objects: 548, done.[K
remote: Counting objects: 100% (174/174), done.[K
remote: Compressing objects: 100% (173/173), done.[K
remote: Total 548 (delta 5), reused 162 (delta 0), pack-reused 374[K
Receiving objects: 100% (548/548), 50.13 MiB | 38.00 MiB/s, done.
Resolving deltas: 100% (43/43), done.


In [2]:
%cd "pytorch_advanced"

/content/pytorch_advanced


In [3]:
%cd "2_objectdetection"

/content/pytorch_advanced/2_objectdetection


In [4]:
%ls

2-2-3_Dataset_DataLoader.ipynb    2-8_SSD_inference.ipynb
2-4-5_SSD_model_forward.ipynb     [0m[01;34mdata[0m/
2-6_loss_function.ipynb           make_folders_and_data_downloads.ipynb
2-7_SSD_training.ipynb            [01;34mutils[0m/
2-8_SSD_inference_appendix.ipynb


In [5]:
import os
import urllib.request
import zipfile
import tarfile

In [6]:
data_dir = "./data/"
if not os.path.exists(data_dir):
    os.mkdir(data_dir)

In [7]:
weights_dir = "./weights/"
if not os.path.exists(weights_dir):
    os.mkdir(weights_dir)

In [8]:
url = "http://host.robots.ox.ac.uk/pascal/VOC/voc2012/VOCtrainval_11-May-2012.tar"
target_path = os.path.join(data_dir, "VOCtrainval_11-May-2012.tar") 

if not os.path.exists(target_path):
    urllib.request.urlretrieve(url, target_path)
    
    tar = tarfile.TarFile(target_path)
    tar.extractall(data_dir)
    tar.close()

In [9]:
url = "https://s3.amazonaws.com/amdegroot-models/vgg16_reducedfc.pth"
target_path = os.path.join(weights_dir, "vgg16_reducedfc.pth") 

if not os.path.exists(target_path):
    urllib.request.urlretrieve(url, target_path)

In [10]:
url = "https://s3.amazonaws.com/amdegroot-models/ssd300_mAP_77.43_v2.pth"
target_path = os.path.join(weights_dir, "ssd300_mAP_77.43_v2.pth") 

if not os.path.exists(target_path):
    urllib.request.urlretrieve(url, target_path)

In [11]:
from math import sqrt
from itertools import product

import pandas as pd
import torch
from torch.autograd import Function
import torch.nn as nn
import torch.nn.functional as F
import torch.nn.init as init

In [12]:
%ls

2-2-3_Dataset_DataLoader.ipynb  2-8_SSD_inference_appendix.ipynb       [0m[01;34mutils[0m/
2-4-5_SSD_model_forward.ipynb   2-8_SSD_inference.ipynb                [01;34mweights[0m/
2-6_loss_function.ipynb         [01;34mdata[0m/
2-7_SSD_training.ipynb          make_folders_and_data_downloads.ipynb


# 네트워크 모델 구현

SSD에서는 다양한 크기의 DBox(Default Box)를 준비해야 함.

SSD의 네트워크 모듈은 네 개의 모듈로 구성 됨.

## SSD 네트워크 모델의 개요

SSD의 주요 서브 네트워크는 vgg, extra, loc, conf 네 가지임.

1. ***vgg:*** 먼저 vgg(VGG16 기반)에서 화상을 입력 받음. 이후 10회의 합성곱을 받고 L2Norm 층에서 크기를 정규화한 후 변수 `source1`로 정함. 그리고 다시 5회의 합성곱을 받으며 이는 `source2`로 함. 그리고 이 출력을 extra 모듈에 입력
2. ***extra:*** 여깃는 8회의 합성곱을 실시하는데, 2회 마다 출력치를 source3 ~ 6으로 함. 지금 까지 만든 소스들은 특징맵 크기가 다른데 각각의 맵 마다 감지하고자 하는 물체가 다름. 이것이 다양한 물체를 감지할 수 있는 이유임. 다만 합성곱의 횟수에 차이가 있는 만큼 큰 크기는 작은 물체를 감지하는 정밀도가 낮음.
3. ***loc:*** 이곳에서는 각 소스에 1회씩 합성곱을 실시하여 8732개의 DBox 오프셋 정보를 얻어냄.
4. ***conf:*** 마지막으로 이곳에서 또 각 소스에 합성곱을 실시해 8732개의 DBox에 대한 20가지 + 배경 21가지 클래스의 신뢰도를 출력.


DBox가 8732개인 이유는 이 DBox를 오프셋 정보로 변형시키면 38 \* 38 + 19 \* 19 + 10 \* 10 + 5 \* 5 + 3 \* 3 + 1 * 1 = 1940개의 DBox를 마련하게 됨. 그리고 각 특징맵에 대하여 `source1, 5, 6`에는 네 개씩, `source2, 3, 4`에는 여섯개씩 준비해야 함. 

따라서,

38 \* 38 \*4 + 19 \* 19 \* 6 + 10 \* 10 \* 6 + 5 \* 5 \* 6 + 3 \* 3 \* 4 + 1 * 1 \* 4 = 8732

가 되는것임.

### vgg 모듈 구현

각 합성곱 층의 채널 수와 최대 풀링 층의 정보를 구성 변수로 하여 `cfg = [64, 64, 'M', ... ]`으로 하고 그 요소 값에 따라 유닛을 작성.

'M'은 최대 풀링 층을, 'MC'는 ceil 모드의 최대 풀링 층임. ceil(천장 함수) 모드로 하면 소수점을 올림

`inplace`는 ReLu에 대한 입력을 메모리 상에 유지할 것인지 혹은 입력을 재작성하여 출력으로 바꾼 후 메모리 상에 유지하지 않을 것인지 나타냄. `inplace` 변수가 TRUE면 입력을 재작성 함.

In [13]:
# 34층에 걸친  VGG모듈 작성

def make_vgg():
  layers = []
  in_channels = 3

  # VGG모듈에서 사용하는 합성곱 층이나 최대 풀링 채널 수
  cfg = [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 'MC', 512, 512, 512, 'M', 512, 512, 512]

  for v in cfg:
    if v == 'M':
      layers += [nn.MaxPool2d(kernel_size=2, stride=2)]

    elif v == 'MC':
      layers += [nn.MaxPool2d(kernel_size=2, stride=2, ceil_mode=True)]

    else:
      conv2d = nn.Conv2d(in_channels, v, kernel_size=3, padding=1)
      layers += [conv2d, nn.ReLU(inplace=True)]
      in_channels = v


  pool5 = nn.MaxPool2d(kernel_size=3, stride=1, padding=1)
  conv6 = nn.Conv2d(512, 1024, kernel_size=3, padding=6, dilation=6)
  conv7 = nn.Conv2d(1024, 1024, kernel_size=1)
  layers += [pool5, conv6, nn.ReLU(inplace=True), conv7, nn.ReLU(inplace=True)]
  return nn.ModuleList(layers)

# 동작 확인
vgg_test = make_vgg()
print(vgg_test)

ModuleList(
  (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (1): ReLU(inplace=True)
  (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (3): ReLU(inplace=True)
  (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (6): ReLU(inplace=True)
  (7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (8): ReLU(inplace=True)
  (9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (11): ReLU(inplace=True)
  (12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (13): ReLU(inplace=True)
  (14): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (15): ReLU(inplace=True)
  (16): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=True)
  (17): Conv2d(256, 512, kernel_siz

### extra 모듈 구현

VGG 모듈을 구현하는 `make_extras` 함수 작성. 합성곱 층을 총 8유닛 나열함.

In [14]:
# 8층에 걸친 extras 모듈.


def make_extras():
  layers = []
  in_channels = 1024  # VGG 모듈에서 출력된 extra에 입력되는 화상의 수

  # extra 모듈의 합성곱 층 채널 수를 설정하는 구성
  cfg = [256, 512, 128, 256, 128, 256, 128, 256]

  layers += [nn.Conv2d(in_channels, cfg[0], kernel_size=(1))]
  layers += [nn.Conv2d(cfg[0], cfg[1], kernel_size=(3), stride=2, padding=1)]
  layers += [nn.Conv2d(cfg[1], cfg[2], kernel_size=(1))]
  layers += [nn.Conv2d(cfg[2], cfg[3], kernel_size=(3), stride=2, padding=1)]
  layers += [nn.Conv2d(cfg[3], cfg[4], kernel_size=(1))]
  layers += [nn.Conv2d(cfg[4], cfg[5], kernel_size=(3))]
  layers += [nn.Conv2d(cfg[5], cfg[6], kernel_size=(1))]
  layers += [nn.Conv2d(cfg[6], cfg[7], kernel_size=(3))]

  # 활성화 함수의 ReLu는 이번에는 SSD 모듈의 순전파에서 준비하고 extra 모듈에서는 준비하지 않음.
  return nn.ModuleList(layers)

# 동작 확인
extras_test = make_extras()
print(extras_test)

ModuleList(
  (0): Conv2d(1024, 256, kernel_size=(1, 1), stride=(1, 1))
  (1): Conv2d(256, 512, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
  (2): Conv2d(512, 128, kernel_size=(1, 1), stride=(1, 1))
  (3): Conv2d(128, 256, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
  (4): Conv2d(256, 128, kernel_size=(1, 1), stride=(1, 1))
  (5): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1))
  (6): Conv2d(256, 128, kernel_size=(1, 1), stride=(1, 1))
  (7): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1))
)


### loc 및 conf 모듈 구현

loc 과 conf를 구현하는 `make_loc_conf` 작성. 각각 6개의 합성곱층을 준비함. loc과 conf 모듈은 각각 여섯개의 합성곱 층을 준비하여 하나의 모듈로 함. (여섯개의 합성곱 층을 앞에서 뒤로 순전파 하지는 않음)

vgg와 extras 모듈에서 꺼낸 변수 source1 ~ 6에 여섯개의 합성곱층이 각각 대응해서 한 번씩 계산 됨.

각 소스로 사용하는 DBox 수를 `bbox_aspect_num` 인수로 설정.

In [15]:
# loc_layers  : 디폴드 박스의 오프셋 출력
# conf_layers : 디폴트 박스의 각 클래스 신뢰도 'confidence'를 출력.
def make_loc_conf(num_classes=21, bbox_aspect_num=[4, 6, 6, 6, 4, 4]):

  loc_layers  = []
  conf_layers = []

  # VGG 22층, conv4_3(source 1)의 합성곱 층
  loc_layers  += [nn.Conv2d(512, bbox_aspect_num[0] * 4, kernel_size=3, padding=1)]
  conf_layers += [nn.Conv2d(512, bbox_aspect_num[0] * num_classes, kernel_size=3, padding=1)]

  # VGG 최종층(source 2)의 합성곱 층
  loc_layers  += [nn.Conv2d(1024, bbox_aspect_num[1] * 4, kernel_size=3, padding=1)]
  conf_layers += [nn.Conv2d(1024, bbox_aspect_num[1] * num_classes, kernel_size=3, padding=1)]

  # extra(source 3)의 합성곱 층
  loc_layers  += [nn.Conv2d(512, bbox_aspect_num[2] * 4, kernel_size=3, padding=1)]
  conf_layers += [nn.Conv2d(512, bbox_aspect_num[2] * num_classes, kernel_size=3, padding=1)]

  # extra(source 4)의 합성곱 층
  loc_layers  += [nn.Conv2d(256, bbox_aspect_num[3] * 4, kernel_size=3, padding=1)]
  conf_layers += [nn.Conv2d(256, bbox_aspect_num[3] * num_classes, kernel_size=3, padding=1)]

  # extra(source 5)의 합성곱 층
  loc_layers  += [nn.Conv2d(256, bbox_aspect_num[4] * 4, kernel_size=3, padding=1)]
  conf_layers += [nn.Conv2d(256, bbox_aspect_num[4] * num_classes, kernel_size=3, padding=1)]

  # extra(source 6)의 합성곱 층
  loc_layers  += [nn.Conv2d(256, bbox_aspect_num[5] * 4, kernel_size=3, padding=1)]
  conf_layers += [nn.Conv2d(256, bbox_aspect_num[5] * num_classes, kernel_size=3, padding=1)]

  return nn.ModuleList(loc_layers), nn.ModuleList(conf_layers)

# 동작 확인
loc_test, conf_test = make_loc_conf()
print(loc_test)
print(conf_test)

ModuleList(
  (0): Conv2d(512, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (1): Conv2d(1024, 24, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (2): Conv2d(512, 24, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (3): Conv2d(256, 24, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (4): Conv2d(256, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (5): Conv2d(256, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
)
ModuleList(
  (0): Conv2d(512, 84, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (1): Conv2d(1024, 126, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (2): Conv2d(512, 126, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (3): Conv2d(256, 126, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (4): Conv2d(256, 84, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (5): Conv2d(256, 84, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
)


### L2Norm 층 구현

`conv4_3`에 적용하는 L2Norm층 구현. 

L2Norm층은 특징맵의 통계적 특징이 채널마다 다른 점을 정규화 해줌.

In [16]:
# conv4_3의 출력을 scale = 20의 L2Norm으로 정규화하는 층

class L2Norm(nn.Module):
  def __init__(self, input_channels=512, scale=20):
    super(L2Norm, self).__init__()  # 부모 클래스의 새성자 실행
    self.weight = nn.Parameter(torch.Tensor(input_channels))
    self.scale = scale  # 계수 weight의 초기값으로 설정할 값
    self.reset_parameters()  # 파라미터 초기화
    self.eps = 1e-10

  # 결합 파라미터의 scale 크기 값으로 초기화 실행
  def reset_parameters(self):
    init.constant_(self.weight, self.scale)  # weight값이 모두 scale(=20)이 됨.

  def forward(self, x):
    # 38 * 38의 특징에 대해 512 채널에 걸쳐 제곱합의 루트를 구함.
    # 38 * 38개의 값을 사용하여 각 특징량을 정규화한 후 계수를 곱하여 계산하는 층

    # 각 채널의 38 * 38개의 특징의 채널 방향 제곱합을 계산하고 루트를 구해 나누어 정규화.
    # norm의 텐서 사이즈는 torch.Size([batch_num, 1, 38, 38])
    norm = x.pow(2).sum(dim=1, keepdim=True).sqrt()+self.eps
    x = torch.div(x, norm)

    # 계수를 곱함. 계수는 채널마다 하나로 512의 계수를 가짐.
    # self.weight의 텐서 사이즈는 torch.Size([512]).
    # torch.Size([batch_num, 512, 38, 38])까지 변형함.
    weights = self.weight.unsqueeze(0).unsqueeze(2).unsqueeze(3).expand_as(x)
    out = weights * x

    return out

### 디폴트 박스 구현

마지막으로, 8732개의 디폴트 박스를 준비하는 클래스를 작성. source 1 ~ 6까지 크기가 서로 다른 특징맵에 대해 각각 4 또는 6종류의 DBox를 만듦. DBox는 네 종류 설정일 때 `작은 정사각형`, `큰 정사각형`, `1:2 비율의 직사각형`, `2:1 비율의 직사각형`임. (여섯 종류면 3:1 과 1:3 직사각형을 준비)

아래 코드의 `for i, j in product(...)` 문은 조합을 꺼내는 명령임. 이 조합의 취득을 활용하여 DBox의 중심 좌표를 작성함.

In [17]:
# 디폴트 박스를 출력하는 클래스

class DBox(object):
  def __init__(self, cfg):
    super(DBox, self).__init__()

    self.image_size = cfg['input_size']  # 화상 크기 300
    # [38, 19, ...] 각 source의 특징맵 크기
    self.feature_maps = cfg['feature_maps']
    self.num_priors = len(cfg["feature_maps"])  # source의 개수 = 6
    self.steps = cfg['steps']                   # [8, 16, ...] DBox의 픽셀 크기

    self.min_sizes = cfg['min_sizes']
        # [30, 60, ...] 작은 정사각형의 DBox 픽셀 크기(면적)
    self.max_sizes = cfg['max_sizes']
        # [60, 111, ...] 큰 정사각형의 DBox 픽셀 크기(면적)
    self.aspect_ratios = cfg['aspect_ratios']
        # 정사각형의 DBox 화면비(종횡비)

  def make_dbox_list(self):
    mean = []
    for k, f in enumerate(self.feature_maps):
      for i, j in product(range(f), repeat=2):
        # 특징량의 화상 크기
        # 300 / 'steps': [8, 16, 32, 64, 100, 300],
        f_k = self.image_size / self.steps[k]

        # DBox의 중심좌표 x, y. 0 ~ 1로 정규화 되어있음.
        cx = (j + 0.5) / f_k
        cy = (i + 0.5) / f_k

        # 화면비 1의 작은 DBox [cx, cy, width, height]
        # 'min_sizes': [30, 60, 111, 162, 213, 264]
        s_k = self.min_sizes[k]/self.image_size
        mean += [cx, cy, s_k, s_k]

        # 화면비 1의 큰 DBox [cx,cy, width, height]
        # 'max_sizes': [60, 111, 162, 213, 264, 315]
        s_k_prime = sqrt(s_k * (self.max_sizes[k]/self.image_size))
        mean += [cx, cy, s_k_prime, s_k_prime]

        # 그 외 화면비의 DBox [cx, cy, width, height]
        for ar in self.aspect_ratios[k]:
          mean += [cx, cy, s_k*sqrt(ar), s_k/sqrt(ar)]
          mean += [cx, cy, s_k/sqrt(ar), s_k*sqrt(ar)]

    # DBox를 텐서로 변환. torch.Size([8732, 4])
    output = torch.Tensor(mean).view(-1, 4)

    # DBox가 화상 밖으로 돌출되는 것을 막기 위해 크기를 최소 0, 최대 1로 함.
    output.clamp_(max=1, min=0)

    return output

In [18]:
# 동작 확인

# SSD300 설정
ssd_cfg = {
  'num_classes': 21,                          # 배경클래스를 포함한 총 클래스 수
  'input_size': 300,                          # 화상의 입력 크기
  'bbox_aspect_num': [4, 6, 6, 6, 4, 4],      # 출력할 Box 화면비 종류
  'feature_maps': [38, 19, 10, 5, 3, 1],      # 각 source 화상 크기
  'steps': [8, 16, 32, 64, 100, 300],         # DBOX 크기 설정
  'min_sizes': [30, 60, 111, 162, 213, 264],  # DBOX 크기 설정
  'max_sizes': [60, 111, 162, 213, 264, 315], #  BOX 크기 설정
  'aspect_ratios': [[2], [2, 3], [2, 3], [2, 3], [2], [2]],
}

# DBox 작성
dbox = DBox(ssd_cfg)
dbox_list = dbox.make_dbox_list()

# DBox 출력 확인
pd.DataFrame(dbox_list.numpy())

Unnamed: 0,0,1,2,3
0,0.013333,0.013333,0.100000,0.100000
1,0.013333,0.013333,0.141421,0.141421
2,0.013333,0.013333,0.141421,0.070711
3,0.013333,0.013333,0.070711,0.141421
4,0.040000,0.013333,0.100000,0.100000
...,...,...,...,...
8727,0.833333,0.833333,0.502046,1.000000
8728,0.500000,0.500000,0.880000,0.880000
8729,0.500000,0.500000,0.961249,0.961249
8730,0.500000,0.500000,1.000000,0.622254


## SSD 클래스 구현

파이토치 네트워크 층 클래스인 nn.Module 상속.

SSD 클래스는 훈련할 때와 추론할 때 다르게 동작함. 추론할 때는 Detect 클래스를 사용.

In [19]:
class SSD(nn.Module):

  def __init__(self, phase, cfg):
    super(SSD, self).__init__()

    self.phase = phase  # train or inference 지정
    self.num_classes = cfg["num_classes"] # 클래스 수 21

    # SSD 네트워크 작성
    self.vgg = make_vgg()
    self.extras = make_extras()
    self.L2Norm = L2Norm()
    self.loc, self.conf = make_loc_conf(cfg["num_classes"], cfg["bbox_aspect_num"])

    # DBox 작성
    dbox = DBox(cfg)
    self.dbox_list = dbox.make_dbox_list()

    # 추론시 Detect 클래스 준비
    if phase == 'inference':
      self.detect = Detect()

# 동작 확인
ssd_test = SSD(phase="train", cfg=ssd_cfg)
print(ssd_test)

SSD(
  (vgg): ModuleList(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU(inplace=True)
    (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (6): ReLU(inplace=True)
    (7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (8): ReLU(inplace=True)
    (9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU(inplace=True)
    (12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (13): ReLU(inplace=True)
    (14): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (15): ReLU(inplace=True)
    (16): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, cei

# 순전파 함수 구현

물체감지는 더 복잡하게 순전파 처리를 함. 여기서 Non-Maximum Suppression을 사용할 것임.

## decode 함수 구현

SSD 추론 시에는 순전파 끝에 Detect 클래스를 적용함. 여기서 사용하는 decode 함수와 nm_supression 함수를 구현.

decode 함수는 DBox = (***cx_d,cy_d,w_d,h_d***)와 SSD 모델에서 구한 오프셋 정보 loc = (***Δcx, Δcy, Δw, Δh***)를 사용하여 BBox의 좌표를 생성.

cx = cx_d + 0.1Δcx * w_d

cy = cy_d + 0.1Δcy * h_d

w = w_d * exp(0.2Δw)

h = h_d * exp(0.2Δh) 

위 식을 구현하여 BBox 좌표 표시 형식을 (Δcx, Δcy, Δw, Δh) 에서 (xmin, ymin, xmax, ymax)로 변환

In [21]:
def decode(loc, dbox_list):
  # 오프셋 정보로 DBox를 BBox로 전환
  # Parameters
  #   loc: [8732, 4]       - SSD 모델로 추론하는 오프셋 정보
  #   dbox_list: [8732, 4] - DBox 정보
  # 
  # Returns
  #   boxes : [xmin, ymin, xmax, ymax] - BBox 정보

  # DBox는 [cx, cy, w, h] 로 저장됨.
  # loc는  [Δcx, Δcy, Δw, Δh] 로 저장됨.
  boxes = torch.cat((
    dbox_list[:, :2] + loc[:, :2] * 0.1 * dbox_list[:, 2:],
    dbox_list[:, 2:] * torch.exp(loc[:, 2:] * 0.2)), dim=1)
  # boxes 크기는 torch.Size([8732, 4])가 됨.

  # BBox의 좌표를 [cx, cy, w, h]에서 [xmin, ymin, xmax, ymax]으로 변경.
  boxes[:, :2] -= boxes[:, 2:] / 2  # 좌표 (xmin, ymin)로 변환
  boxes[:, 2:] += boxes[:, :2]      # 좌표 (xmax, ymax)로 변환

  return boxes

## Non-Maximum Suppression 실시 함수 구현

**Non-Maximum Suppression:** 8732개의 DBox로 물체를 감지하면 화상속 동일한 물체에 다른 BBox가 조금 다르게 복수 피팅 될 때가 있음. 이러한 BBox의 중복을 삭제하고 하나의 물체에 하나의 BBox만 남기는 처리를 말함.

`scores` 인수는 SSD 모델에서 각 DBox의 신뢰도를 구할 때 일정 값 이상의 신뢰도를 가진 DBox의 신뢰도 conf 값임. 물체 클래스마다 Non-Maximum Suppression을 실행하기 위해 `scores` 인수의 텐서 크기는 '신뢰도 임계값을 넘은 DBox 수'임.

In [22]:
def nm_suppression(boxes, scores, overlap=0.45, top_k=200):
  # Parameters
  #   boxes  : [신뢰도 임계값(0.01)을 넘은 BBox 수, 4] - BBox 정보
  #   scores : [신뢰도 임계값(0.01)을 넘은 BBox 수]    - conf 정보
  #
  # Returns
  #   keep : 리스트 - conf의 내림차순으로 nms를 통과한 index 저장
  #   count : int - nms를 통과한 BBox 수

  # return 모형 작성
  count = 0
  keep = scores.new(scores.size(0)).zero_().long()
  # keep : torch.Size([신뢰도 임계값을 넘은 BBox 수]), 요소는 전부 0

  # 각 BBox 면적의 area 계산
  x1 = boxes[:, 0]
  y1 = boxes[:, 1]
  x2 = boxes[:, 2]
  y2 = boxes[:, 3]
  area = torch.mul(x2 - x1, y2 - y1)

  # boxes 복사. 나중에 BBox 중복도(IOU) 계산 시 모형으로 준비
  tmp_x1 = boxes.new()
  tmp_y1 = boxes.new()
  tmp_x2 = boxes.new()
  tmp_y2 = boxes.new()
  tmp_w = boxes.new()
  tmp_h = boxes.new()

  # score를 오름차순으로 나열
  v, idx = scores.sort(0)

  # 상위 top_k개(200개)의 BBox index를 꺼낸다. (200개가 다 있지 않을 수도 있음.)
  idx = idx[-top_k:]

  # idx의 요소 수가 0이 아닌 한, 루프한다.
  while idx.numel() > 0:
    i = idx[-1]  # conf의 최대 index를 i로

    # keep의 끝에 conf 최대 index 저장
    # 이 index의 BBox와 크게 겹치는 BBox 삭제
    keep[count] = i
    count += 1

    # 마지막 BBox는 루프를 빠져나옴
    if idx.size(0) == 1:
      break

    # 현재 conf 최대의 index를 keep에 저장 했으므로 idx를 하나 감소시킨다.
    idx = idx[:-1]

    # 지금부터 keep에 저장한 BBox와 크게 겹치는 BBox를 추출하여 삭제
    # 하나 감소시킨 idx까지의 BBox를 out으로 지정한 변수 작성.
    torch.index_select(x1, 0, idx, out=tmp_x1)
    torch.index_select(y1, 0, idx, out=tmp_y1)
    torch.index_select(x2, 0, idx, out=tmp_x2)
    torch.index_select(y2, 0, idx, out=tmp_y2)

    # 모든 BBox를 현재 Bbox = index가 i 로 겹치는 값까지로 설정(clamp)
    tmp_x1 = torch.clamp(tmp_x1, min=x1[i])
    tmp_y1 = torch.clamp(tmp_y1, min=y1[i])
    tmp_x2 = torch.clamp(tmp_x2, max=x2[i])
    tmp_y2 = torch.clamp(tmp_y2, max=y2[i])

    # w와 h의 텐서 크기를 index 하나 줄인 것으로 한다.
    tmp_w.resize_as_(tmp_x2)
    tmp_h.resize_as_(tmp_y2)

    # clamp한 상태에서 Bbox의 폭과 높이를 구한다.
    tmp_w = tmp_x2 - tmp_x1
    tmp_h = tmp_y2 - tmp_y1

    # 폭이나 높이가 음수인 것은 0으로.
    tmp_w = torch.clamp(tmp_w, min=0.0)
    tmp_h = torch.clamp(tmp_h, min=0.0)

    # clamp된 상태의 면적을 구함.
    inter = tmp_w*tmp_h

    # IoU = Intersect 부분 / (area(a) + area(b) - intersect) 부분 계산
    rem_areas = torch.index_select(area, 0, idx)  # 각 BBox 원래 면적
    union = (rem_areas - inter) + area[i]         # 두 구역 합(OR) 면적
    IoU = inter/union

    idx = idx[IoU.le(overlap)] # le는 Less than or Equal to 처리를 하는 연산. IoU가 overlap 보다 큰 idx는 처음 선택한 keep에 저장한 idx와 동일한 물체에 BBox를 둘러싸고 있어 삭제.

  # while에서 빠져나옴.
  return keep, count

## Detect 클래스 구현

추론 시 마지막에 Detect 클래스를 적용. `(batch_num, 21, 200, 5)`의 출력 텐서를 만듦. 

(`미니 배치 번호 차원`, `각 클래스의 인덱스를 나타내는 차원`, `신뢰도 상위 200개의 BBox중 몇 번째인지를 나타내는 차원`, `다섯가지 BBox 정보`)

- **다섯 가지 BBox 정보:** (conf, xmin, ymin, width, height)

Detect 클래스 입력요소 3개 

1. loc:  (batch_num, 8732, 4)  - 오프셋 정보
2. conf: (batch_num, 8732, 21) - 신뢰도, 소프트맥스 함수로 규격화
3. 디폴트 박스 정보: (8732, 4)

Detect 클래스는 `torch.autograd.Function`을 상속함.

순전파 함수 foward: 크게 세 단꼐로 구성
1. decode 함수를 사용하여 DBox의 정보와 오프셋 정보를 BBox로 변환
2. conf가 임계값 이상인 BBox 추출.
3. nm_suppression 함수를 실시하여 중복된 BBox 삭제

In [23]:
class Detect(Function):
  def __init__(self, conf_thresh=0.01, top_k=200, nms_thresh=0.45):
    self.softmax = nn.Softmax(dim=-1)  # conf를 소프트맥수 함수로 정규화 하는 준비
    self.conf_thresh = conf_thresh     # conf가 conf_threh = 0.01보다 높은 DBox만 취급
    self.top_k = top_k                 # conf가 높은 top_k개를 nm_supression으로 계산하여 사용하는 top_k = 200
    self.nms_thresh = nms_thresh       # nm_suppression으로 IOU가 nms_thresh = 0.45 보다 크면 동일한 물체의 BBox로 간주.

  def forward(self, loc_data, conf_data, dbox_list):
    # 순전파 계산 실행
    # Parameters
    #   loc_data: [batch_num, 8732, 4]
    #   conf_data: [batch_num, 8732, num_classes]
    #   dbox_list: [8732, 4]
    # Returns
    #   output : torch.Size([batch_num, 21, 200, 5]) - (batch_num 클래스, conf의 top200, BBox 정보)


    # 각 크기 취득
    num_batch = loc_data.size(0)    # 미니 배치 크기
    num_dbox = loc_data.size(1)     # DBox의 수 = 8732
    num_classes = conf_data.size(2) # 클래스 수 = 21

    # conf는 소프트맥스를 적용하여 정규화
    conf_data = self.softmax(conf_data)

    # 츨력 형식을 작성. 텐서 크기 [미니 배치 수, 21, 200, 5]
    output = torch.zeros(num_batch, num_classes, self.top_k, 5)

    # conf_data 순서를 [batch_num, 8732, num_classes] 에서 [batch_num, num_classes, 8732]로 변경
    conf_preds = conf_data.transpose(2, 1)

    # 미니 배치마다 루프
    for i in range(num_batch):
      # 1. loc와 DBox로 수정한 BBox (xmin, ymin, xmax, ymax]를 구함.
      decoded_boxes = decode(loc_data[i], dbox_list)

      # conf의 복사본 저장
      conf_scores = conf_preds[i].clone()

      # 화상 클래스별 루프(배경 클래스의 index인 0은 계산하지 않고 index = 1 부터)
      for cl in range(1, num_classes):
        # 2. conf의 임계값을 넘은 BBox를 꺼냄
        # conf의임계값을 넘고 있는지 마스크를 작성하여 임계값을 넘은 conf의 인덱스를 c_mask로 취득
        c_mask = conf_scores[cl].gt(self.conf_thresh)
        # gt는 Greater than을 의미, gt로 임계값이 넘으면 i, 이하는 0
        # conf_scores: torch.Size([21, 8732])
        # c_mask: torch.Size([8732])

        # scores는 torch.Size([임계값을 넘은 BBox 수])
        scores = conf_scores[cl][c_mask]

        # 임계값을 넘은 conf가 없는 경우, 즉 scores=[]는 아무것도 하지 않음.
        if scores.nelement() == 0:  # nelement로 요소 수의 합계를 구함.
          continue

        # c_mask를 decoded_boxes에 적용할 수 있도록 크기 변경.
        l_mask = c_mask.unsqueeze(1).expand_as(decoded_boxes)
        # l_mask: torch.Size([8732, 4])

        # l_mask를 decoded_boxes로 적용
        boxes = decoded_boxes[l_mask].view(-1, 4)
        # decoded_boxes[l_mask]로 1차원이 되므로 view에서 (임계값을 넘은 BBox의 수, 4) 크기로 바꾼다.

        # 3. Non-Maximum Suppression을 실시하여 중복되는 BBox 제거
        ids, count = nm_suppression(
          boxes, scores, self.nms_thresh, self.top_k)
        # ids:   conf의 내림차순으로 Non-Maximum Suppression을 통과한 index 저장.
        # count: Non-Maximum Suppression을 통과한 BBox의 수

        # output에 Non-Maximum Suppression을 뺀 결과 저장
        output[i, cl, :count] = torch.cat((scores[ids[:count]].unsqueeze(1), boxes[ids[:count]]), 1)

    return output  # torch.Size([1, 21, 200 ,5])

# SSD 모델 구현

마지막으로 SSD 모델을 구현.

In [24]:
class SSD(nn.Module):
  def __init__(self, phase, cfg):
    super(SSD, self).__init__()

    self.phase = phase  # train or inference 지정
    self.num_classes = cfg["num_classes"]  # 클래스 수 = 21

    # SSD 네트워크
    self.vgg = make_vgg()
    self.extras = make_extras()
    self.L2Norm = L2Norm()
    self.loc, self.conf = make_loc_conf(
      cfg["num_classes"], cfg["bbox_aspect_num"])

    # DBox 작성
    dbox = DBox(cfg)
    self.dbox_list = dbox.make_dbox_list()

    # 추론시 Detect 클래스 준비
    if phase == 'inference':
      self.detect = Detect()

  
  def forward(self, x):
    sources = list()  # loc와 conf에 입력 source1 ~ 6 저장
    loc = list()      # loc 의 출력 저장
    conf = list()     # conf의 출력 저장

    # vgg의 conv4_3까지 계산
    for k in range(23):
      x = self.vgg[k](x)

    # conv4_3의 출력을 L2Norm에 입력. source1을 작성하여 sources에 추가
    source1 = self.L2Norm(x)
    sources.append(source1)

    # vgg를 마지막까지 계산하여 source2를 작성하고 sources에 추가
    for k in range(23, len(self.vgg)):
      x = self.vgg[k](x)

    sources.append(x)

    # extras의conv와 ReLU 계산
    # source3 ~ 6을、sources에 추가
    for k, v in enumerate(self.extras):
      x = F.relu(v(x), inplace=True)
      if k % 2 == 1:  # conv→ReLU→cov→ReLU를 하여 source에 삽입
        sources.append(x)

    # source1 ~ 6에 각각 대응하는 합성곱을 1회씩 적용
    # zip으로 for 루프의 여러 리스트 요소 획득
    # source1 ~ 6까지 있으므로 6회의 루프 실시

    for (x, l, c) in zip(sources, self.loc, self.conf):
      # Permuteは要素の順番を入れ替え
      loc.append(l(x).permute(0, 2, 3, 1).contiguous())
      conf.append(c(x).permute(0, 2, 3, 1).contiguous())
      # l(x)와 c(x)으로 합성곱 실행
      # l(x)와 c(x)의 출력 크기는 [batch_num, 4 * 화면비의 종류 수, featuremap 높이, featuremap폭]
      # source에 따라 화면비의 종류가 다르며 번거로워 순서를 바꾸어 조정
      # permute로 요소 순서를 다음과 같이 교체
      # [minibatch 수, featuremap 수, featuremap 수,4 * 화면비의 종류 수]へ
      # torch.contiguous()은 메모리 상에 연속적으로 요소를 배치하는 명령
      # 이후 view 함수 적용
      # view를 수행하므로 대상의 변수가 메모리 상에 연속적으로 배치되어야 한다.

    # loc와 conf의 모양 변경
    # loc 크기는 torch.Size([batch_num, 34928]), conf의 크기는 torch.Size([batch_num, 183372])가 됨.
    loc = torch.cat([o.view(o.size(0), -1) for o in loc], 1)
    conf = torch.cat([o.view(o.size(0), -1) for o in conf], 1)

    # loc와 conf의 모양 조정
    # loc의 크기는  torch.Size([batch_num, 8732, 4])
    # conf의 크기는 torch.Size([batch_num, 8732, 21])
    loc = loc.view(loc.size(0), -1, 4)
    conf = conf.view(conf.size(0), -1, self.num_classes)

    # 마지막으로 출력
    output = (loc, conf, self.dbox_list)

    if self.phase == "inference":  # 추론 시
      # Detect 클래스의 forward 실행
      # 반환 값의 크기는 torch.Size([batch_num, 21, 200, 5])
      return self.detect(output[0], output[1], output[2])

    else:  # 학습 시
      return output
      # 반환 값은(loc, conf, dbox_list)의 튜플