In [None]:
%matplotlib inline

import cv2
import numpy as np
import pandas as pd
from tqdm import tqdm
import matplotlib.pyplot as plt

import tensorflow as tf
import tensorflow.keras.backend as K
from tensorflow.keras.models import Model
from tensorflow.keras.utils import get_file
from tensorflow.keras.utils import plot_model

np.random.seed(10)
tf.random.set_seed(10)

## SSD 모델 구성하기

### 모델 구성하기

In [None]:
from tensorflow.keras.layers import Input
from tensorflow.keras.layers import Conv2D
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.layers import Flatten
from tensorflow.keras.layers import Concatenate
from tensorflow.keras.layers import Reshape
from tensorflow.keras.layers import Softmax
from functools import partial

Conv2D = partial(Conv2D, kernel_size=(3,3), 
                 activation='relu', padding='same')

In [None]:
num_classes = 10 + 1 # 0~9 + Background
num_priors = 5
num_units = 16

inputs = Input(shape=(None,None,3)) 

# BLOCK 1
conv1_1 = Conv2D(num_units, name='conv1_1')(inputs)
norm1_1 = BatchNormalization(name='norm1_1')(conv1_1)
conv1_2 = Conv2D(num_units, name='conv1_2')(norm1_1)
norm1_2 = BatchNormalization(name='norm1_2')(conv1_2)
conv1_3 = Conv2D(num_units, name='conv1_3')(norm1_2)
norm1_3 = BatchNormalization(name='norm1_3')(conv1_3)

# BLOCK 2
conv2_1 = Conv2D(num_units * 2, name='conv2_1')(norm1_3)
norm2_1 = BatchNormalization(name='norm2_1')(conv2_1)
conv2_2 = Conv2D(num_units * 2, strides=(2,2),name='conv2_2')(norm2_1)
norm2_2 = BatchNormalization(name='norm2_2')(conv2_2)

# BLOCK 3
conv3_1 = Conv2D(num_units * 4, name='conv3_1')(norm2_2)
norm3_1 = BatchNormalization(name='norm3_1')(conv3_1)
conv3_2 = Conv2D(num_units * 4, strides=(2,2), name='conv3_2')(norm3_1)
norm3_2 = BatchNormalization(name='norm3_2')(conv3_2)

# BLOCK 4
conv4_1 = Conv2D(num_units * 8, name='conv4_1')(norm3_2)
norm4_1 = BatchNormalization(name='norm4_1')(conv4_1)
conv4_2 = Conv2D(num_units * 8, strides=(2,2), name='conv4_2')(norm4_1)
norm4_2 = BatchNormalization(name='norm4_2')(conv4_2)

# Block 5
conv5_1 = Conv2D(num_units * 8, name='conv5_1')(norm4_2)
norm5_1 = BatchNormalization(name='norm5_1')(conv5_1)
conv5_2 = Conv2D(num_units * 8, strides=(2,2), name='conv5_2')(norm5_1)
norm5_2 = BatchNormalization(name='norm5_2')(conv5_2)


heads = []
source_layers = [norm3_2, norm4_2, norm5_2]
for idx, source_layer in enumerate(source_layers):
    # Classification
    clf = Conv2D(num_priors * num_classes, activation=None,
                 name=f'clf_head{idx}_logit')(source_layer)
    clf = Reshape((-1, num_classes),
                  name=f'clf_head{idx}_reshape')(clf)
    clf = Softmax(axis=-1, name=f'clf_head{idx}')(clf)

    # Localization
    loc = Conv2D(num_priors * 4, activation=None,
                 name=f'loc_head{idx}')(source_layer)
    loc = Reshape((-1,4),
                  name=f'loc_head{idx}_reshape')(loc)
    
    
    head = Concatenate(axis=-1, name=f'head{idx}')([clf, loc])
    heads.append(head)
    
predictions = Concatenate(axis=1, name='predictions')(heads)

In [None]:
from tensorflow.keras.models import Model
model = Model(inputs, predictions)

model.summary()

In [None]:
from tensorflow.keras.utils import plot_model

plot_model(model)

### Anchor 구성하기

In [None]:
class Anchors:
    """
    Anchor Configuration Class
    """
    bbox_df = pd.DataFrame()

    def __init__(self, strides, scales, ratios):
        self.strides = strides
        self.scales = scales
        self.ratios = ratios
        self.setup()

    def generate(self, image_shape):
        """
        image_shape에 맞춰서, Anchor(==Default Boxes)를 구성

        return :
        (# Anchors, 4)로 이루어진 출력값 생성
        """
        height, width = image_shape[:2]
        multi_boxes = []
        for stride, df in self.bbox_df.groupby('stride'):
            boxes = []
            for idx, row in df.iterrows():
                stride, box_width, box_height = row.stride, row.w, row.h
                ys, xs = np.mgrid[0:height:stride, 0:width:stride]
                box_width = np.ones_like(xs) * box_width
                box_height = np.ones_like(ys) * box_height
                center_xs = stride // 2 + xs
                center_ys = stride // 2 + ys

                block_centers = np.stack((center_xs, center_ys,
                                          box_width, box_height),
                                         axis=-1)
                boxes.append(block_centers)
            boxes = np.stack(boxes, axis=2)
            boxes = np.reshape(boxes, (-1, 4))
            multi_boxes.append(boxes)
        multi_boxes = np.concatenate(multi_boxes, axis=0)
        return multi_boxes

    def setup(self):
        bbox_df = pd.DataFrame(columns=['stride', 'w', 'h'])
        for scale, stride in zip(self.scales, self.strides):
            for ratio in self.ratios:
                w = np.round(scale * ratio[0]).astype(np.int)
                h = np.round(scale * ratio[1]).astype(np.int)
                bbox_df.loc[len(bbox_df) + 1] = [stride, w, h]

        bbox_df.stride = bbox_df.stride.astype(np.int)
        bbox_df.w = bbox_df.w.astype(np.int)
        bbox_df.h = bbox_df.h.astype(np.int)
        self.bbox_df = bbox_df

In [None]:
strides = [4, 8, 16]
scales = [10, 20, 30]
ratios = [(1,1),     # ratio : 1.
          (0.5,1.5), # ratio : 0.33
          (0.8,1.2), # ratio : 0.67
          (1.2,0.8), # ratio : 1.5
          (1.4,1.4)  # ratio : 1
         ] 

anchors = Anchors(strides, scales, ratios)

## 모델 학습하기

우리는 학습할 모델과 데이터를 이전 시간에서 꾸렸습니다. 이제 학습까지 남은 것은 모델을 어떻게 학습시킬까?입니다. 이 중 제일 핵심은 바로 Loss함수 설계에 있습니다.

### Loss 구성하기

우리가 학습해야 하는 것은 위치를 추론하는 Regressor와 사물을 분류하는 Classifer입니다. Regressor의 경우에는 SmoothL1이라 불리는 Loss로 학습을 시키고, Classifier은 분류모델에서 주로 사용하는 Cross-Entropy Loss를 이용합니다. Regressor의 경우에는 당연하게도 Matched prior box의 경우 한에서만 학습을 해야 합니다. SmoothL1은 MAE와 MSE의 합쳐놓은 형태로, 수식은 아래와 같습니다.

$
smooth_{L1}(x) = \begin{cases}
0.5x^2, \mbox{  if  } |x| <1\\
|x| - 0.5 \mbox{   otherwise,}
\end{cases}
$


Confidence Loss를 계산할 때, 중요한 문제가 하나 있습니다. 바로 Class Imbalance 문제입니다. 영상에서 대부분은 BackGround에 해당합니다. 우리가 원하는 Foreground에 매칭된 Prior box는 극히 일부분에 불과합니다. 이 때문에, Easy Negative Sample, 즉 Background에 대한 Loss가 지나치게 커서, 실제로 학습하고자 하는 Foreground에 대한 학습은 잘 이루어지지 않게 됩니다. 이를 방지하기 위해, Negative Sample 중 가장 Loss가 큰것들을 위주로만 추출하여 학습하도록 합니다.

In [None]:
def SSDLoss(alpha=1., pos_neg_ratio=3.):
    def ssd_loss(y_true, y_pred):
        num_classes = tf.shape(y_true)[2] - 4
        y_true = tf.reshape(y_true, [-1, num_classes + 4])
        y_pred = tf.reshape(y_pred, [-1, num_classes + 4])
        eps = K.epsilon()

        # Split Classification and Localization output
        y_true_clf, y_true_loc = tf.split(y_true, 
                                          [num_classes, 4], 
                                          axis=-1)
        y_pred_clf, y_pred_loc = tf.split(y_pred, 
                                          [num_classes, 4], 
                                          axis=-1)

        # split foreground & background
        neg_mask = y_true_clf[:, -1]
        pos_mask = 1 - neg_mask
        num_pos = tf.reduce_sum(pos_mask)
        num_neg = tf.reduce_sum(neg_mask)
        num_neg = tf.minimum(pos_neg_ratio * num_pos, num_neg)

        # softmax loss
        y_pred_clf = K.clip(y_pred_clf, eps, 1. - eps)
        clf_loss = -tf.reduce_sum(y_true_clf * tf.math.log(y_pred_clf),
                                  axis=-1)
        pos_clf_loss = tf.reduce_sum(clf_loss * pos_mask) / (num_pos + eps)
        neg_clf_loss = clf_loss * neg_mask
        values, indices = tf.nn.top_k(neg_clf_loss,
                                      k=tf.cast(num_neg, tf.int32))
        neg_clf_loss = tf.reduce_sum(values) / (num_neg + eps)
        clf_loss = pos_clf_loss + neg_clf_loss
        
        # smooth l1 loss
        l1_loss = tf.math.abs(y_true_loc - y_pred_loc)
        l2_loss = 0.5 * (y_true_loc - y_pred_loc) ** 2
        loc_loss = tf.where(tf.less(l1_loss, 1.0),
                            l2_loss,
                            l1_loss - 0.5)
        loc_loss = tf.reduce_sum(loc_loss, axis=-1)
        loc_loss = tf.reduce_sum(loc_loss * pos_mask) / (num_pos + eps)

        # total loss
        return clf_loss + alpha * loc_loss
    return ssd_loss

### 데이터 구성하기


#### - IOU 계산하기

In [None]:
def calculate_iou(gt_boxes, pr_boxes):
    # 1. pivot bounding boxes 
    exp_gt_boxes = gt_boxes[:,None] # Ground truth box가 행 기준으로 정렬되도록
    exp_pr_boxes = pr_boxes[None,:] # prior box가 열 기준으로 정렬되도록

    # 2. calculate intersection over union
    # 2.1. Calculate Intersection
    gt_cx, gt_cy, gt_w, gt_h = exp_gt_boxes.transpose(2,0,1)
    pr_cx, pr_cy, pr_w, pr_h = exp_pr_boxes.transpose(2,0,1)

    # (cx,cy,w,h) -> (xmin,ymin,xmax,ymax)
    gt_xmin, gt_xmax = gt_cx-gt_w/2, gt_cx+gt_w/2
    gt_ymin, gt_ymax = gt_cy-gt_h/2, gt_cy+gt_h/2
    pr_xmin, pr_xmax = pr_cx-pr_w/2, pr_cx+pr_w/2
    pr_ymin, pr_ymax = pr_cy-pr_h/2, pr_cy+pr_h/2

    # 겹친 사각형의 너비와 높이 구하기
    in_xmin = np.maximum(gt_xmin, pr_xmin)
    in_xmax = np.minimum(gt_xmax, pr_xmax)
    in_width = np.maximum(0,in_xmax - in_xmin)

    in_ymin = np.maximum(gt_ymin, pr_ymin)
    in_ymax = np.minimum(gt_ymax, pr_ymax)
    in_height = np.maximum(0,in_ymax - in_ymin)

    # 겹친 사각형의 넓이 구하기
    intersection = in_width*in_height

    gt_sizes = exp_gt_boxes[...,2] * exp_gt_boxes[...,3]
    pr_sizes = exp_pr_boxes[...,2] * exp_pr_boxes[...,3]

    # 2.2. Calculate Union
    union = (gt_sizes + pr_sizes) - intersection

    # 0 나누기 방지를 위함
    return (intersection / (union+1e-5))

#### - Classification Network의 정답 구성하기

In [None]:
from tensorflow.keras.utils import to_categorical

def convert_classification_gt_to_model_form(
    gt_labels, pr_boxes, match_indices):
    num_classes = 10
    num_anchors = len(pr_boxes)
    gt_match_indices = match_indices[:, 0]
    pr_match_indices = match_indices[:, 1]
        
    y_true_clf = np.full((num_anchors,), num_classes)
    y_true_clf[pr_match_indices] = gt_labels[gt_match_indices]
    return to_categorical(y_true_clf, num_classes=num_classes+1) 

#### - Localization Network의 정답 구성하기

In [None]:
def convert_localization_gt_to_model_form(
    gt_boxes, pr_boxes, match_indices):
    num_anchors = len(pr_boxes)
    gt_match_indices = match_indices[:, 0]
    pr_match_indices = match_indices[:, 1]
        
    y_true_loc = np.zeros((num_anchors, 4))
    g_cx, g_cy, g_w, g_h = gt_boxes[gt_match_indices].transpose()
    p_cx, p_cy, p_w, p_h = pr_boxes[pr_match_indices].transpose()

    hat_g_cx = (g_cx - p_cx) / p_w
    hat_g_cy = (g_cy - p_cy) / p_h
    hat_g_w = np.log(g_w / p_w)
    hat_g_h = np.log(g_h / p_h)

    hat_g = np.stack([hat_g_cx,hat_g_cy,hat_g_w,hat_g_h],axis=1)
    y_true_loc[pr_match_indices] = hat_g
    return y_true_loc

#### - 데이터 가져오기

In [None]:
fpath = get_file("mnist_detection.npz",
"https://pai-datasets.s3.ap-northeast-2.amazonaws.com/alai-deeplearning/mnist_detection.npz")
data = np.load(fpath)

train_images = data['train_images']
train_labels = data['train_labels']
test_images = data['test_images']
test_labels = data['test_labels']

#### - 입력값 정규화하기

In [None]:
# 0.~ 1. 으로 값의 범위 정규화하기
train_X = train_images / 255.

#### - 라벨 폼 변환하기

In [None]:
num_data = 10000
iou_threshold = 0.5

image_shape = train_images.shape[1:]
pr_boxes = anchors.generate(image_shape)
num_anchors = len(pr_boxes)

train_Y = np.zeros((num_data, num_anchors, 15))
for i in tqdm(range(num_data)):
    gt_boxes = train_labels[train_labels[:,0]==i,1:5]
    gt_labels = train_labels[train_labels[:,0]==i,-1]
    
    iou = calculate_iou(gt_boxes, pr_boxes)
    match_indices = np.argwhere(iou>=iou_threshold)
    
    y_true_clf = convert_classification_gt_to_model_form(
        gt_labels, pr_boxes, match_indices)
    y_true_loc = convert_localization_gt_to_model_form(
        gt_boxes, pr_boxes, match_indices)    
    
    y_true = np.concatenate([y_true_clf, y_true_loc],axis=-1)
    
    train_Y[i] = y_true

### 모델 학습하기

In [None]:
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint

model.compile(Adam(1e-3),
              loss=SSDLoss(1.0,3.))

In [None]:
model.fit(train_X, train_Y, 
          validation_split=0.1,
          batch_size=64, epochs=50)