# Top_Down faster-rcnn scratch code review

# Import Library

In [74]:
import warnings
warnings.filterwarnings(action='ignore')

# default
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm

# torch
import torch
import torch.nn as nn
from torch.nn import functional as F
from torch.utils.data import Dataset 
from torch.utils.data import DataLoader

# torchvision
from torchvision.models import vgg16
from torchvision.ops import RoIPool
from torchvision.ops import nms

# image load
import cv2
from PIL import Image

# augumentation
import albumentations as A
from albumentations.pytorch import ToTensorV2

# util
from pathlib import Path # Path 객체
from pycocotools.coco import COCO # COCO형식 JSON 파일 손쉽게 읽기
from torchsummary import summary # model summary
from collections import namedtuple # tuple 생성

from torchnet.meter import ConfusionMeter, AverageValueMeter

# hyper-parameter set

In [75]:
data_dir = Path('../../detection/dataset') # 데이터 경로
use_drop = False   # use dropout in RoIHead

epochs=1
learning_rate = 1e-3
lr_decay = 0.1
weight_decay = 0.0005

train_load_path = None  # train시 checkpoint 경로
inf_load_path = './checkpoints/faster_rcnn_scratch_checkpoints.pth' # inference시 체크포인트 경로

# Util Functions

In [76]:
def loc2bbox(src_bbox, loc):
    """
    from src_bbox to dst bbox using loc
    Args:
        src_bbox: 소스 바운딩 박스
        loc: 델타
    Returns: dst_bbox
    """

    if src_bbox.shape[0] == 0:
        return np.zeros((0, 4), dtype=loc.dtype)

    src_bbox = src_bbox.astype(src_bbox.dtype, copy=False)

    # x_min, y_min, x_max, y_max
    src_height = src_bbox[:, 2] - src_bbox[:, 0]
    src_width = src_bbox[:, 3] - src_bbox[:, 1]
    src_ctr_y = src_bbox[:, 0] + 0.5 * src_height
    src_ctr_x = src_bbox[:, 1] + 0.5 * src_width

    dy = loc[:, 0::4]
    dx = loc[:, 1::4]
    dh = loc[:, 2::4]
    dw = loc[:, 3::4]

    ctr_y = dy * src_height[:, np.newaxis] + src_ctr_y[:, np.newaxis]
    ctr_x = dx * src_width[:, np.newaxis] + src_ctr_x[:, np.newaxis]
    h = np.exp(dh) * src_height[:, np.newaxis]
    w = np.exp(dw) * src_width[:, np.newaxis]

    dst_bbox = np.zeros(loc.shape, dtype=loc.dtype)
    dst_bbox[:, 0::4] = ctr_y - 0.5 * h
    dst_bbox[:, 1::4] = ctr_x - 0.5 * w
    dst_bbox[:, 2::4] = ctr_y + 0.5 * h
    dst_bbox[:, 3::4] = ctr_x + 0.5 * w

    return dst_bbox


def bbox2loc(src_bbox, dst_bbox):
    """
    src_bbox : 예측된 좌표값(or anchor), dst_bbox: gt 좌표값 -> loc(y, x, h, w)
    """

    # x_min, y_min, x_max, y_max
    height = src_bbox[:, 2] - src_bbox[:, 0]
    width = src_bbox[:, 3] - src_bbox[:, 1]
    ctr_y = src_bbox[:, 0] + 0.5 * height
    ctr_x = src_bbox[:, 1] + 0.5 * width

    # x_min, y_min, x_max, y_max
    base_height = dst_bbox[:, 2] - dst_bbox[:, 0]
    base_width = dst_bbox[:, 3] - dst_bbox[:, 1]
    base_ctr_y = dst_bbox[:, 0] + 0.5 * base_height
    base_ctr_x = dst_bbox[:, 1] + 0.5 * base_width

    eps = np.finfo(height.dtype).eps
    height = np.maximum(height, eps)
    width = np.maximum(width, eps)

    dy = (base_ctr_y - ctr_y) / height
    dx = (base_ctr_x - ctr_x) / width
    dh = np.log(base_height / height)
    dw = np.log(base_width / width)

    loc = np.vstack((dy, dx, dh, dw)).transpose()
    return loc


def normal_init(m, mean, stddev, truncated=False):
    """
    weight initialization
    """
    if truncated:
        m.weight.data.normal_().fmod_(2).mul_(stddev).add_(mean) 
    else:
        m.weight.data.normal_(mean, stddev)
        m.bias.data.zero_()


def get_inside_index(anchor, H, W):
    # Calc indicies of anchors which are located completely inside of the image
    # whose size is speficied.
    index_inside = np.where(
        (anchor[:, 0] >= 0) &
        (anchor[:, 1] >= 0) &
        (anchor[:, 2] <= H) &
        (anchor[:, 3] <= W)
    )[0]
    return index_inside


def unmap(data, count, index, fill=0):
    # Unmap a subset of item (data) back to the original set of items (of size count)
    if len(data.shape) == 1:
        ret = np.empty((count,), dtype=data.dtype)
        ret.fill(fill)
        ret[index] = data
    else:
        ret = np.empty((count,) + data.shape[1:], dtype=data.dtype)
        ret.fill(fill)
        ret[index, :] = data
    return ret


## util ##
def tonumpy(data):
    if isinstance(data, np.ndarray):
        return data
    if isinstance(data, torch.Tensor):
        return data.detach().cpu().numpy()

def totensor(data, cuda = True):
    if isinstance(data, np.ndarray):
        tensor = torch.from_numpy(data)
    if isinstance(data, torch.Tensor):
        tensor = data.detach()
    if cuda:
        tensor = tensor.cuda()
    return tensor

def scalar(data):
    if isinstance(data, np.ndarray):
        return data.reshape(1)[0]
    if isinstance(data, torch.Tensor):
        return data.item()
    
def bbox_iou(bbox_a, bbox_b):
    if bbox_a.shape[1] != 4 or bbox_b.shape[1] != 4:
        raise IndexError

    #bbox_a 1개와 bbox_b k개를 비교해야하므로 None을 이용해서 차원을 늘려서 연산한다.
    # top left
    tl = np.maximum(bbox_a[:, None, :2], bbox_b[:, :2])
    # bottom right
    br = np.minimum(bbox_a[:, None, 2:], bbox_b[:, 2:])

    area_i = np.prod(br - tl, axis=2) * (tl < br).all(axis=2)
    area_a = np.prod(bbox_a[:, 2:] - bbox_a[:, :2], axis=1)
    area_b = np.prod(bbox_b[:, 2:] - bbox_b[:, :2], axis=1)
    return area_i / (area_a[:, None] + area_b - area_i)

def nograd(f):
    def new_f(*args, **kwargs):
        with torch.no_grad():
            return f(*args, **kwargs)
    return new_f

# 1. Train

## 1-0. code

In [78]:
def train():
    data_dir = Path('../../detection/dataset')
    annotation = data_dir / 'train.json'
    dataset = CustomDataset(annotation, data_dir, transforms=get_train_transform())
    
    print('-' * 50)
    print('load data')
    print(f'image shape : {dataset[0][0].shape}')
    dataloader = DataLoader(dataset, 
                            batch_size=1,     # only batch_size=1 support
                            shuffle=True, 
                            pin_memory=False,
                            num_workers=0)
    
    # faster rcnn trainer 불러오기
    faster_rcnn = FasterRCNNVGG16().cuda()
    trainer = FasterRCNNTrainer(faster_rcnn).cuda()
    
    # checkpoint load
    # if train_load_path:
    #     trainer.load(train_load_path)
    #     print('load pretrained model from %s' % train_load_path)
    # lr_ = learning_rate
    # best_loss = 1000
    
    for epoch in range(epochs):
        trainer.reset_meters() # 🧡
        for ii, (img, bbox_, label_, scale) in enumerate(tqdm(dataloader)):
            img, bbox, label = img.cuda().float(), bbox_.cuda(), label_.cuda()
            
             # 💚 batch size 1로 고정
            # print(img.shape, bbox.shape, label.shape) 
            # torch.Size([1, 3, 800, 800]) torch.Size([1, 1, 4]) torch.Size([1, 1])
            
            trainer.train_step(img, bbox, label, float(scale))
            break
    
        losses = trainer.get_meter_data()
        print(f"Epoch #{epoch+1} loss: {losses}")
        if losses['total_loss'] < best_loss :
            trainer.save()
            
        if epoch == 9:
            trainer.faster_rcnn.scale_lr(lr_decay)
            lr_ = lr_ * lr_decay

        if epoch == 13: 
            break

## 1-1. Dataset

In [79]:
class CustomDataset(Dataset):
    def __init__(self, annotation, data_dir, transforms):
        super().__init__()
        self.data_dir = data_dir
        self.coco = COCO(annotation) # Experiment 1-1
        self.transforms = transforms 
        
    def __getitem__(self, index):
        # 이미지 정보 불러오기
        image_id = self.coco.getImgIds(imgIds=index)
        image_info = self.coco.loadImgs(image_id)[0]

        # 이미지 로드 
        image = cv2.imread(str(Path(self.data_dir) / image_info['file_name']))
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB).astype(np.float32)
        image /= 255.0

        # 어노테이션 정보 로드 
        ann_ids = self.coco.getAnnIds(imgIds=image_info['id'])
        anns = self.coco.loadAnns(ann_ids)

        # 박스 가져오기
        boxes = np.array([x['bbox'] for x in anns])

        #-- x, y, w, h -> x1, y1, x2, y2(coco -> pascal_voc)
        # boxes (x_min, y_min, x_max, y_max)
        boxes[:, 2] = boxes[:, 0] + boxes[:, 2]
        boxes[:, 3] = boxes[:, 1] + boxes[:, 3]

        # 레이블 가져오기
        labels = np.array([x['category_id'] for x in anns])
        labels = torch.as_tensor(labels, dtype=torch.int64)
        
        # transform
        sample = {
            'image': image,
            'bboxes': boxes,
            'labels': labels
        }
        sample = self.transforms(**sample)
        image = sample['image']
        bboxes = torch.tensor(sample['bboxes'], dtype=torch.float32)
        boxes = torch.tensor(sample['bboxes'], dtype=torch.float32)

        # bboxes (x_min, y_min, x_max, y_max) -> boxes (y_min, x_min, y_max, x_max)
        boxes[:, 0] = bboxes[:, 1]
        boxes[:, 1] = bboxes[:, 0]
        boxes[:, 2] = bboxes[:, 3]
        boxes[:, 3] = bboxes[:, 2]

        return image, boxes, labels, 1.0
    
    def __len__(self) -> int:
        return len(self.coco.getImgIds())

In [6]:
# Train dataset transform
def get_train_transform():
    return A.Compose([
        A.Resize(height = 800, width = 800),
        A.Flip(p=0.5),
        ToTensorV2(p=1.0)
    ], bbox_params={'format': 'pascal_voc', 'label_fields': ['labels']})

# No transform
def no_transform():
    return A.Compose([
        ToTensorV2(p=1.0) # format for pytorch tensor
    ], bbox_params={'format': 'pascal_voc', 'label_fields': ['labels']})

## 1-2) Faster-RCNN Trainer

### 1-2-0) code

In [73]:
LossTuple = namedtuple('LossTuple', ['rpn_loc_loss', 'rpn_cls_loss',
                                     'roi_loc_loss', 'roi_cls_loss',
                                     'total_loss'])
class FasterRCNNTrainer(nn.Module):
    def __init__(self, faster_rcnn):
        super(FasterRCNNTrainer, self).__init__()

        self.faster_rcnn = faster_rcnn
        self.rpn_sigma = 3.     # sigma for l1_smooth_loss (RPN loss)
        self.roi_sigma = 1.     # sigma for l1_smooth_loss (ROI loss)

        # target creator create gt_bbox gt_label etc as training targets. 
        self.anchor_target_creator = AnchorTargetCreator()
        self.proposal_target_creator = ProposalTargetCreator()

        self.loc_normalize_mean = faster_rcnn.loc_normalize_mean
        self.loc_normalize_std = faster_rcnn.loc_normalize_std
        self.optimizer = self.faster_rcnn.get_optimizer()

        # training 상태 보여주는 지표
        self.rpn_cm = ConfusionMeter(2) # confusion matrix for classification
        self.roi_cm = ConfusionMeter(11)  # confusion matrix for classification
        self.meters = {k: AverageValueMeter() for k in LossTuple._fields}  # average loss

    def forward(self, imgs, bboxes, labels, scale):
        # 💚 torch.Size([1, 3, 800, 800]) torch.Size([1, 1, 4]) torch.Size([1, 1])
        n = bboxes.shape[0]
        
        if n != 1:
            raise ValueError('Currently only batch size 1 is supported.')

        _, _, H, W = imgs.shape
        # 💚 (800, 800)
        img_size = (H, W)
        
        # VGG (features extractor)
        # torch.Size([1, 512, 50, 50])
        features = self.faster_rcnn.extractor(imgs)
        
        # RPN (region proposal)
        # 💚 input :  torch.Size([1, 512, 50, 50]), (800, 800), 1
        # 💚 output : torch.size([1, 22500, 4]), torch.size([1, 22500, 2]), (2000, 4) (2000,), (22500, 4)
        rpn_locs, rpn_scores, rois, roi_indices, anchor = self.faster_rcnn.rpn(features, img_size, scale)

        # Since batch size is one, convert variables to singular form
        bbox = bboxes[0] # batch 1
        label = labels[0] # batch 1
        rpn_score = rpn_scores[0]
        rpn_loc = rpn_locs[0]
        roi = rois
        
        """
        sample roi =  rpn에서 nms 거친 2000개의 roi들 중 positive/negative 비율 고려해 최종 sampling한 roi
        """
        #  💚 input : (2000, 4), (box수, 4), (box수, 1), (0., 0., 0., 0.), (0.1, 0.1, 0.2, 0.2))
        sample_roi, gt_roi_loc, gt_roi_label = self.proposal_target_creator(
            roi,
            tonumpy(bbox),
            tonumpy(label),
            self.loc_normalize_mean,
            self.loc_normalize_std)
        
        print(sample_roi.shape, gt_roi_loc.shape, gt_roi_label.shape)
        # NOTE it's all zero because now it only support for batch=1 now
        # Faster R-CNN head (prediction head)
        sample_roi_index = torch.zeros(len(sample_roi))
        roi_cls_loc, roi_score = self.faster_rcnn.head(features,sample_roi,sample_roi_index) 

        # ------------------ RPN losses -------------------#
        gt_rpn_loc, gt_rpn_label = self.anchor_target_creator(tonumpy(bbox),anchor,img_size) 
        gt_rpn_label = totensor(gt_rpn_label).long() 
        gt_rpn_loc = totensor(gt_rpn_loc) 
        
        # rpn bounding box regression loss
        rpn_loc_loss = _fast_rcnn_loc_loss(rpn_loc,gt_rpn_loc,gt_rpn_label.data,self.rpn_sigma)
        # rpn classification loss
        rpn_cls_loss = F.cross_entropy(rpn_score, gt_rpn_label.cuda(), ignore_index=-1)
        
        _gt_rpn_label = gt_rpn_label[gt_rpn_label > -1]
        _rpn_score = tonumpy(rpn_score)[tonumpy(gt_rpn_label) > -1]
        self.rpn_cm.add(totensor(_rpn_score, False), _gt_rpn_label.data.long())

        # ------------------ ROI losses (fast rcnn loss) -------------------#
        n_sample = roi_cls_loc.shape[0] 
        roi_cls_loc = roi_cls_loc.view(n_sample, -1, 4)
        roi_loc = roi_cls_loc[torch.arange(0, n_sample).long().cuda(), \
                              totensor(gt_roi_label).long()]
        gt_roi_label = totensor(gt_roi_label).long() 
        gt_roi_loc = totensor(gt_roi_loc) 

        # faster rcnn bounding box regression loss
        roi_loc_loss = _fast_rcnn_loc_loss(
            roi_loc.contiguous(),
            gt_roi_loc,
            gt_roi_label.data,
            self.roi_sigma)

        # faster rcnn classification loss
        roi_cls_loss = nn.CrossEntropyLoss()(roi_score, gt_roi_label.cuda())
        
        self.roi_cm.add(totensor(roi_score, False), gt_roi_label.data.long())

        losses = [rpn_loc_loss, rpn_cls_loss, roi_loc_loss, roi_cls_loss]
        losses = losses + [sum(losses)] # total_loss == sum(losses)

        return LossTuple(*losses)
    
    # training
    def train_step(self, imgs, bboxes, labels, scale):
        
        # 💚 torch.Size([1, 3, 800, 800]) torch.Size([1, 1, 4]) torch.Size([1, 1])
        self.optimizer.zero_grad()
        losses = self.forward(imgs, bboxes, labels, scale)
        losses.total_loss.backward()
        self.optimizer.step()
        self.update_meters(losses)
        return losses
    
    # checkpoint 만들기
    def save(self, save_optimizer=False, save_path=None):
        save_dict = dict()

        save_dict['model'] = self.faster_rcnn.state_dict()

        if save_optimizer:
            save_dict['optimizer'] = self.optimizer.state_dict()

        if save_path is None:
            save_path = './checkpoints/faster_rcnn_scratch_checkpoints.pth'

        save_dir = os.path.dirname(save_path)
        if not os.path.exists(save_dir):
            os.makedirs(save_dir)

        torch.save(save_dict, save_path)
        return save_path
    
    # checkpoint load
    def load(self, path, load_optimizer=True, parse_opt=False, ):
        state_dict = torch.load(path)
        if 'model' in state_dict:
            self.faster_rcnn.load_state_dict(state_dict['model'])
        else:  # legacy way, for backward compatibility
            self.faster_rcnn.load_state_dict(state_dict)
            return self
        if 'optimizer' in state_dict and load_optimizer:
            self.optimizer.load_state_dict(state_dict['optimizer'])
        return self

    def update_meters(self, losses):
        loss_d = {k: scalar(v) for k, v in losses._asdict().items()}
        for key, meter in self.meters.items():
            meter.add(loss_d[key])

    def reset_meters(self):
        for key, meter in self.meters.items():
            meter.reset()
        self.roi_cm.reset()
        self.rpn_cm.reset()

    def get_meter_data(self):
        return {k: v.value()[0] for k, v in self.meters.items()}


def _smooth_l1_loss(x, t, in_weight, sigma):
    sigma2 = sigma ** 2
    diff = in_weight * (x - t)
    abs_diff = diff.abs()
    flag = (abs_diff.data < (1. / sigma2)).float()
    y = (flag * (sigma2 / 2.) * (diff ** 2) +
         (1 - flag) * (abs_diff - 0.5 / sigma2))
    return y.sum()


def _fast_rcnn_loc_loss(pred_loc, gt_loc, gt_label, sigma):
    # Localization loss 구할 때는 positive example에 대해서만 계산
    in_weight = torch.zeros(gt_loc.shape).cuda()
    in_weight[(gt_label > 0).view(-1, 1).expand_as(in_weight).cuda()] = 1
    loc_loss = _smooth_l1_loss(pred_loc, gt_loc, in_weight.detach(), sigma)
    loc_loss /= ((gt_label >= 0).sum().float())
    return loc_loss

In [64]:
train()

loading annotations into memory...
Done (t=0.07s)
creating index...
index created!
--------------------------------------------------
load data
image shape : torch.Size([3, 800, 800])


  0%|          | 0/4883 [00:00<?, ?it/s]

(128, 4) (128, 4) (128,)
check





### 1-2-1) FasterRCNNVGG16

#### 1-2-1-0) code

In [16]:
class FasterRCNNVGG16(FasterRCNN):

    feat_stride = 16  # downsample 16x for output of conv5 in vgg16

    def __init__(self, n_fg_class=10, ratios=[0.5, 1, 2], anchor_scales=[8, 16, 32] ): # n_fg_class : 배경포함 하지 않은 class 개수        
        extractor, classifier = decom_vgg16(pretrained=True)
        
        rpn = RegionProposalNetwork(
            512, 512,
            ratios=ratios,
            anchor_scales=anchor_scales,
            feat_stride=self.feat_stride,
        )

        head = VGG16RoIHead(
            n_class=n_fg_class + 1,
            roi_size=7,
            spatial_scale=(1. / self.feat_stride),
            classifier=classifier
        )
        super(FasterRCNNVGG16, self).__init__(
            extractor,
            rpn,
            head,
        )

#### 1-2-1-1) extractor, classifier (backborn)

In [9]:
def decom_vgg16(pretrained: bool):
    # the 30th layer of features is relu of conv5_3
    model = vgg16(pretrained=pretrained)
    
    features = list(model.features)[:30] # max_poll 제외
    classifier = model.classifier

    classifier = list(classifier)
    del classifier[6]
    if not use_drop:
        del classifier[5]
        del classifier[2]
    classifier = nn.Sequential(*classifier)

    # freeze top4 conv
    for layer in features[:10]:
        for p in layer.parameters():
            p.requires_grad = False

    return nn.Sequential(*features), classifier

#### 1-2-1-2) rpn(RegionProposalNetwork)

##### 1-2-1-2-0) code

In [49]:
class RegionProposalNetwork(nn.Module):
    def __init__(self, in_channels=512, mid_channels=512, ratios=[0.5, 1, 2],
                 anchor_scales=[8, 16, 32], feat_stride=16, proposal_creator_params=dict(),):
        """ 
        vgg를 통과한 feature map에서 각각 anchor box 생성
        각 anchor box별로 score를 계산해야한다.
        base anchor box의 위치를 어느만큼 바꿀지 미세조정해야한다.
        
        Args:
            ratios: 비율
            anchor_scales: 스케일
        Returns: 
        """
        super(RegionProposalNetwork, self).__init__()

        self.anchor_base = generate_anchor_base(anchor_scales=anchor_scales, ratios=ratios) # 9개의 anchorbox 생성 (9 x 4)
        self.feat_stride = feat_stride  
        self.proposal_layer = ProposalCreator(self, **proposal_creator_params)
        n_anchor = self.anchor_base.shape[0] # anchor 개수 - 9
        self.conv1 = nn.Conv2d(in_channels=in_channels, out_channels=mid_channels, kernel_size=3, stride=1, padding=1)
        self.score = nn.Conv2d(mid_channels, n_anchor * 2, 1, 1, 0)  # 9*2 (anchor box 수 x 2(object true, false))
        self.loc = nn.Conv2d(mid_channels, n_anchor * 4, 1, 1, 0)   # 9*4 (anchor box 수 x (x, y, w, h) 조절 값)
        
        # 0. util에 정의 초기화하는거
        normal_init(self.conv1, 0, 0.01) # weight initalizer
        normal_init(self.score, 0, 0.01) # weight initalizer
        normal_init(self.loc, 0, 0.01)   # weight initalizer

    def forward(self, x, img_size, scale=1.):
        # input : 💚 torch.Size([1, 512, 50, 50]), (800, 800), 1
        n, _, hh, ww = x.shape

        # 전체 (h*w*9)개 anchor의 좌표값 # anchor_base:(9, 4)
        # 픽셀당 모든 anchor box 9개 생성
        # output = (22500, 4)
        anchor = _enumerate_shifted_anchor(self.anchor_base, self.feat_stride, hh, ww) # torch.size([22500, 4])
        n_anchor = self.anchor_base.shape[0]
        
        # middle 한번 layer 통과
        # 💚torch.size([1, 512, 50, 50]) -> torch.Size([1, 512, 50, 50])
        middle = F.relu(self.conv1(x))
        
        # predicted bounding box offset
        # bounding_box 보정 💚 torch.Size([1, 512, 50, 50])-> torch.size([1, 36, 50, 50])
        rpn_locs = self.loc(middle) 
        # torch.size([1, 22500, 4]) # anchor랑 맞춰줌
        rpn_locs = rpn_locs.permute(0, 2, 3, 1).contiguous().view(n, -1, 4)

        # predicted scores for anchor (foreground or background)
        # 💚 torch.Size([1, 512, 50, 50]) -> torch.size([1, 18, 50, 50])
        rpn_scores = self.score(middle)  
        # torch.size([1, 50, 50, 18])
        rpn_scores = rpn_scores.permute(0, 2, 3, 1).contiguous() 
        
        # scores for foreground
        rpn_softmax_scores = F.softmax(rpn_scores.view(n, hh, ww, n_anchor, 2), dim=4) # score로 softmax torch.size([1, 50, 50, 9, 2])
        rpn_fg_scores = rpn_softmax_scores[:, :, :, :, 1].contiguous() # torch.size([1, 50, 50, 9]) # object가 존재하는거 가져옴
        rpn_fg_scores = rpn_fg_scores.view(n, -1) # torch.size([1, 22500])
        
        rpn_scores = rpn_scores.view(n, -1, 2) # torch.size([n, 50, 50, 18]) -> torch.size([1, 22500, 2]) # anchor랑 맞춰줌

        # proposal생성 (ProposalCreator)
        # 22500 -> 2000개 뽑자
        rois = list()        # proposal의 좌표값이 있는 bounding box array
        roi_indices = list() # roi에 해당하는 image 인덱스
        for i in range(n):
            # (22500, 4) loc 정보, (22500,)의  fg score, (22500, 4) anchore box, 원본이미지 크기, scale 값 = 1
            # return (2000, 4)
            roi = self.proposal_layer(rpn_locs[i].cpu().data.numpy(),rpn_fg_scores[i].cpu().data.numpy(), anchor, img_size, scale=scale)
            
            batch_index = i * np.ones((len(roi),), dtype=np.int32)
            rois.append(roi)
            roi_indices.append(batch_index)
        rois = np.concatenate(rois, axis=0)
        roi_indices = np.concatenate(roi_indices, axis=0)
        
        # torch.size([1, 22500, 4]), torch.size([1, 22500, 2]), (2000, 4) (2000,), (22500, 4)
        return rpn_locs, rpn_scores, rois, roi_indices, anchor


def _enumerate_shifted_anchor(anchor_base, feat_stride, height, width):
    # -- 원본이미지 기준

    # anchor_base는 하나의 pixel에 9개 종류의 anchor box를 나타냄
    # 이것을 enumerate시켜 전체 이미지의 pixel에 각각 9개의 anchor box를 가지게 함
    # 32x32 feature map에서는 32x32x9=9216개 anchor box가짐

    shift_y = np.arange(0, height * feat_stride, feat_stride) # (1 x height)
    shift_x = np.arange(0, width * feat_stride, feat_stride) # (1 x width)
    shift_x, shift_y = np.meshgrid(shift_x, shift_y) # (width, height)
    shift = np.stack((shift_y.ravel(), shift_x.ravel(),
                      shift_y.ravel(), shift_x.ravel()), axis=1) # ((width x height), 4)

    A = anchor_base.shape[0] # anchor 수
    K = shift.shape[0] # 총 pixel 수

    anchor = anchor_base.reshape((1, A, 4)) + \
             shift.reshape((1, K, 4)).transpose((1, 0, 2))
    anchor = anchor.reshape((K * A, 4)).astype(np.float32)
    return anchor # (22500, 4) -> (anchor box 수, (y1, x1, y2, x2))


##### 1-2-1-2-1) gnerate_anchor_base

In [11]:
def generate_anchor_base(base_size=16, ratios=[0.5, 1, 2], anchor_scales=[8, 16, 32]):
    """ 
    원본 이미지 기준 base_size = 16
    한 픽셀당 몇개의 anchor box가 생성 되나?
    하나의 픽셀에 대해서 위치 정보 없이 크기만 return
    Args:
        ratios: 비율
        anchor_scales: 스케일
    Returns: basic anchor boxes, shape=(R, 4)
        R: len(ratio) * len(anchor_scales) = anchor 개수 = 9
        4: anchor box 좌표 값
    """
    
    py = base_size / 2. # center y
    px = base_size / 2. # center x

    # (9 x 4)
    anchor_base = np.zeros((len(ratios) * len(anchor_scales), 4), dtype=np.float32) # 빈 achor box anchor_box
    
    for i in range(len(ratios)):
        for j in range(len(anchor_scales)):
            
            # HW = S^2 라고 정의햇음 논문해서
            # H = rW 라고 정의했음 논문에서
            # rW^2 = S^2
            # W^2 = S^2 / r
            # w = s * sqrt(1/ r)
            
            h = base_size * anchor_scales[j] * np.sqrt(ratios[i])       # height
            w = base_size * anchor_scales[j] * np.sqrt(1. / ratios[i])  # width

            index = i * len(anchor_scales) + j
            
            # offset of anchor box
            anchor_base[index, 0] =  py - h / 2 # y_min
            anchor_base[index, 1] =  px - w / 2 # x_min
            anchor_base[index, 2] =  py + h / 2 # y_max
            anchor_base[index, 3] =  px + w / 2 # x_max
            
    return anchor_base # (9,4)

##### 1-2-1-2-2) Proposal

In [12]:
class ProposalCreator:
    """
    RPN의 class prediction(score)정보와 box prediction(조정) 정보를 이용해서 fast rcnn에 넘길 roi를 넘겨준다.
    """
    def __init__(self, parent_model,
                 nms_thresh=0.7, # nms threshold
                 n_train_pre_nms=12000, # train시 nms 전 roi 개수 - score로 우선 이정도 필터한다.
                 n_train_post_nms=2000, # train시 nms 후 roi 개수 - nms를 거치고 한번더 필터
                 n_test_pre_nms=6000,   # test시 nms 전 roi 개수
                 n_test_post_nms=300,   # test시 nms 후 roi 개수
                 min_size=16            
                 ):
        self.parent_model = parent_model # 해당 모델이 train중인지 test중인지 나타냄
        self.nms_thresh = nms_thresh
        self.n_train_pre_nms = n_train_pre_nms
        self.n_train_post_nms = n_train_post_nms
        self.n_test_pre_nms = n_test_pre_nms
        self.n_test_post_nms = n_test_post_nms
        self.min_size = min_size # 최소 bboxes 크기

    def __call__(self, loc, score, anchor, img_size, scale=1.):   
        # (22500, 4) loc 정보, (22500,)의  fg score, (22500, 4) anchore box, 원본이미지 크기, scale 값 = 1
        if self.parent_model.training: # train중일 때
            n_pre_nms = self.n_train_pre_nms
            n_post_nms = self.n_train_post_nms
        else: # test중일 때
            n_pre_nms = self.n_test_pre_nms
            n_post_nms = self.n_test_post_nms

        # anchor의 좌표값과 predicted bounding bounding box offset(y,x,h,w)를 통해 bounding box 좌표값(y_min, x_min, y_max, x_max) 생성
        roi = loc2bbox(anchor, loc) # (22500, 4), (22500, 4) -> (22500, 4)

        # img_size인 이유 feet_stride랑 연관
        # Clip predicted boxes to image.
        roi[:, slice(0, 4, 2)] = np.clip(roi[:, slice(0, 4, 2)], 0, img_size[0])
        roi[:, slice(1, 4, 2)] = np.clip(roi[:, slice(1, 4, 2)], 0, img_size[1])

        # min_size 보다 작은 box들은 제거
        min_size = self.min_size * scale
        hs = roi[:, 2] - roi[:, 0]
        ws = roi[:, 3] - roi[:, 1]
        
        # 22500 -> 22500 > ?(18000)
        keep = np.where((hs >= min_size) & (ws >= min_size))[0]
        roi = roi[keep, :] # (18000, 4)
        score = score[keep] # (18000,)
        
        # Sort all (proposal, score) pairs by score from highest to lowest.
        # Take top pre_nms_topN 
        order = score.ravel().argsort()[::-1] # 내림차순 정렬 index를 정보
        if n_pre_nms > 0:
            order = order[:n_pre_nms]
        roi = roi[order, :] # (12000, 4)
        score = score[order] # (12000,)

        # nms 적용 # (x<12000,) index 
        keep = nms(
            torch.from_numpy(roi).cuda(),
            torch.from_numpy(score).cuda(),
            self.nms_thresh)
        
        if n_post_nms > 0:
            keep = keep[:n_post_nms]
        roi = roi[keep.cpu().numpy()] # (2000, 4)
        
        # 최종적으로 2천개만 사용한다.
        return roi 

#### 1-2-1-3) head(RoI VGG16RoIHead)

RPN에서 나온 2000개 RoI로 부터 RoI pool을 한 후에 classifier, regressor 통과

In [13]:
class VGG16RoIHead(nn.Module):
    """
    Faster R-CNN head
    RoI pool 후에 classifier, regressior 통과
    """

    def __init__(self, n_class, roi_size, spatial_scale, classifier):
        """ 
        Args:
            n_class: 배경 포함 class 수
            roi_size: 7
            spatial_scale: (1. / feat_stride),
            classifier: vgg classifier
        """
        super(VGG16RoIHead, self).__init__()

        self.classifier = classifier  
        
        # 💚 4096?
        self.cls_loc = nn.Linear(4096, n_class * 4) # bounding box regressor
        self.score = nn.Linear(4096, n_class) # Classifier

        normal_init(self.cls_loc, 0, 0.001)  # weight initialize
        normal_init(self.score, 0, 0.01)     # weight initialize

        self.n_class = n_class # 배경 포함한 class 수
        self.roi_size = roi_size # RoI-pooling 후 feature map의  높이, 너비
        self.spatial_scale = spatial_scale # roi resize scale
        self.roi = RoIPool(output_size=(self.roi_size, self.roi_size), spatial_scale=self.spatial_scale)

    def forward(self, x, rois, roi_indices):
        # in case roi_indices is  ndarray
        roi_indices = totensor(roi_indices).float()
        rois = totensor(rois).float()
        indices_and_rois = torch.cat([roi_indices[:, None], rois], dim=1)
        # NOTE: important: yx->xy
        xy_indices_and_rois = indices_and_rois[:, [0, 2, 1, 4, 3]]
        indices_and_rois =  xy_indices_and_rois.contiguous() 

        # 각 이미지 roi pooling 
        pool = self.roi(x, indices_and_rois) 
        # flatten 
        pool = pool.view(pool.size(0), -1)
        # fully connected
        fc7 = self.classifier(pool)
        # regression 
        roi_cls_locs = self.cls_loc(fc7)
        # softmax
        roi_scores = self.score(fc7)
        
        return roi_cls_locs, roi_scores

#### 1-2-1-4) FasterRCNN

In [14]:
class FasterRCNN(nn.Module):
    def __init__(self, extractor, rpn, head,
                loc_normalize_mean = (0., 0., 0., 0.),
                loc_normalize_std = (0.1, 0.1, 0.2, 0.2)):
        super(FasterRCNN, self).__init__()
        self.extractor = extractor  # extractor : vgg
        self.rpn = rpn              # rpn : region proposal network
        self.head = head            # head : RoiHead

        # mean and std
        self.loc_normalize_mean = loc_normalize_mean
        self.loc_normalize_std = loc_normalize_std
        self.use_preset()

    @property
    def n_class(self): # 최종 class 개수 (배경 포함)
        return self.head.n_class

    # 💚💙💛🧡💜
    # predict 시 사용하는 forward
    # train 시 FasterRCNNTrainer을 사용하여 FasterRcnn에 있는 extractor, rpn, head를 모듈별로 불러와서 forward
    def forward(self, x, scale=1.):
        img_size = x.shape[2:]

        h = self.extractor(x) # extractor 통과
        rpn_locs, rpn_scores, rois, roi_indices, anchor = self.rpn(h, img_size, scale) # rpn 통과
        roi_cls_locs, roi_scores = self.head(h, rois, roi_indices) # head 통과
        return roi_cls_locs, roi_scores, rois, roi_indices 

    def use_preset(self): # prediction 과정 쓰이는 threshold 정의
        self.nms_thresh = 0.3
        self.score_thresh = 0.05

    def _suppress(self, raw_cls_bbox, raw_prob):
        bbox = list()
        label = list()
        score = list()
        
        # skip cls_id = 0 because it is the background class
        for l in range(1, self.n_class):
            cls_bbox_l = raw_cls_bbox.reshape((-1, self.n_class, 4))[:, l, :]
            prob_l = raw_prob[:, l]
            mask = prob_l > self.score_thresh
            cls_bbox_l = cls_bbox_l[mask]
            prob_l = prob_l[mask]
            keep = nms(cls_bbox_l, prob_l,self.nms_thresh)
            bbox.append(cls_bbox_l[keep].cpu().numpy())
            # The labels are in [0, self.n_class - 2].
            label.append((l - 1) * np.ones((len(keep),)))
            score.append(prob_l[keep].cpu().numpy())
        
        bbox = np.concatenate(bbox, axis=0).astype(np.float32)
        label = np.concatenate(label, axis=0).astype(np.int32)
        score = np.concatenate(score, axis=0).astype(np.float32)
        return bbox, label, score

    @nograd
    def predict(self, imgs,sizes=None):
        """
        이미지에서 객체 검출
        Input : images
        Output : bboxes, labels, scores
        """
        self.eval()
        prepared_imgs = imgs
                
        bboxes = list()
        labels = list()
        scores = list()
        for img, size in zip(prepared_imgs, sizes):
            img = totensor(img[None]).float()
            scale = img.shape[3] / size[1]
            roi_cls_loc, roi_scores, rois, _ = self(img, scale=scale) # self = FasterRCNN
            # We are assuming that batch size is 1.
            roi_score = roi_scores.data
            roi_cls_loc = roi_cls_loc.data
            roi = totensor(rois) / scale

            # Convert predictions to bounding boxes in image coordinates.
            # Bounding boxes are scaled to the scale of the input images.
            mean = torch.Tensor(self.loc_normalize_mean).cuda(). repeat(self.n_class)[None]
            std = torch.Tensor(self.loc_normalize_std).cuda(). repeat(self.n_class)[None]

            roi_cls_loc = (roi_cls_loc * std + mean)
            roi_cls_loc = roi_cls_loc.view(-1, self.n_class, 4)
            roi = roi.view(-1, 1, 4).expand_as(roi_cls_loc)
            cls_bbox = loc2bbox(tonumpy(roi).reshape((-1, 4)),tonumpy(roi_cls_loc).reshape((-1, 4)))
            cls_bbox = totensor(cls_bbox)
            cls_bbox = cls_bbox.view(-1, self.n_class * 4)
            # clip bounding box
            cls_bbox[:, 0::2] = (cls_bbox[:, 0::2]).clamp(min=0, max=size[0])
            cls_bbox[:, 1::2] = (cls_bbox[:, 1::2]).clamp(min=0, max=size[1])

            prob = (F.softmax(totensor(roi_score), dim=1))

            bbox, label, score = self._suppress(cls_bbox, prob)
            bboxes.append(bbox)
            labels.append(label)
            scores.append(score)

        self.use_preset()
        self.train()
        return bboxes, labels, scores

    def get_optimizer(self):
        '''
        Optimizer 선언
        '''
        lr = learning_rate
        params = []
        for key, value in dict(self.named_parameters()).items():
            if value.requires_grad:
                if 'bias' in key:
                    params += [{'params': [value], 'lr': lr * 2, 'weight_decay': 0}]
                else:
                    params += [{'params': [value], 'lr': lr, 'weight_decay': weight_decay}]
        self.optimizer = torch.optim.SGD(params, momentum=0.9)
        return self.optimizer

    def scale_lr(self, decay=0.1):
        for param_group in self.optimizer.param_groups:
            param_group['lr'] *= decay
        return self.optimizer


### 1-2-2) AnchortargetCreator

In [15]:
class AnchorTargetCreator(object):

    def __init__(self,
                 n_sample=256,
                 pos_iou_thresh=0.7, neg_iou_thresh=0.3,
                 pos_ratio=0.5):
        self.n_sample = n_sample
        self.pos_iou_thresh = pos_iou_thresh
        self.neg_iou_thresh = neg_iou_thresh
        self.pos_ratio = pos_ratio

    def __call__(self, bbox, anchor, img_size):

        img_H, img_W = img_size

        n_anchor = len(anchor) # 9216
        inside_index = get_inside_index(anchor, img_H, img_W) # (2272,)
        anchor = anchor[inside_index] # (2272, 4)
        argmax_ious, label = self._create_label(
            inside_index, anchor, bbox)

        # compute bounding box regression targets
        loc = bbox2loc(anchor, bbox[argmax_ious]) # (2272, 4)

        # map up to original set of anchors
        label = unmap(label, n_anchor, inside_index, fill=-1) # (9216,)
        loc = unmap(loc, n_anchor, inside_index, fill=0) # (9216, 4)

        return loc, label

    def _create_label(self, inside_index, anchor, bbox):
        # label) 1 :positive, 0 : negative, -1 : dont care
        label = np.empty((len(inside_index),), dtype=np.int32)
        label.fill(-1)

        argmax_ious, max_ious, gt_argmax_ious = self._calc_ious(anchor, bbox, inside_index)

        label[max_ious < self.neg_iou_thresh] = 0 # 0.3

        # 가장 iou가 큰 것은 positive label
        label[gt_argmax_ious] = 1

        # positive label
        label[max_ious >= self.pos_iou_thresh] = 1 # 0.7

        # subsample positive labels if we have too many
        n_pos = int(self.pos_ratio * self.n_sample)
        pos_index = np.where(label == 1)[0]
        if len(pos_index) > n_pos:
            disable_index = np.random.choice(
                pos_index, size=(len(pos_index) - n_pos), replace=False)
            label[disable_index] = -1

        # subsample negative labels if we have too many
        n_neg = self.n_sample - np.sum(label == 1)
        neg_index = np.where(label == 0)[0]
        if len(neg_index) > n_neg:
            disable_index = np.random.choice(
                neg_index, size=(len(neg_index) - n_neg), replace=False)
            label[disable_index] = -1

        return argmax_ious, label

    def _calc_ious(self, anchor, bbox, inside_index):
        # ious between the anchors and the gt boxes
        ious = bbox_iou(anchor, bbox)
        argmax_ious = ious.argmax(axis=1)
        max_ious = ious[np.arange(len(inside_index)), argmax_ious]
        gt_argmax_ious = ious.argmax(axis=0)
        gt_max_ious = ious[gt_argmax_ious, np.arange(ious.shape[1])]
        gt_argmax_ious = np.where(ious == gt_max_ious)[0]

        return argmax_ious, max_ious, gt_argmax_ious

### 1-2-3) ProposalTargetCreator

RPN에서 NMS를 거친 roi들을 ground truth와의 iou를 비교  
positive / negative sampling 수행 (총 128개)  
sample roi와 gt_bbox를 이용해 bbox regression에서 regression해야할 ground truth loc값(t_x, t_y, t_w, t_h)을 구함

In [80]:
class ProposalTargetCreator:
    def __init__(self,
                 n_sample=128,
                 pos_ratio=0.25, pos_iou_thresh=0.5,
                 neg_iou_thresh_hi=0.5, neg_iou_thresh_lo=0.0
                 ):
        self.n_sample = n_sample
        self.pos_ratio = pos_ratio
        self.pos_iou_thresh = pos_iou_thresh # positive iou threshold
        self.neg_iou_thresh_hi = neg_iou_thresh_hi # negitave iou threshold = (neg_iou_thresh_hi ~ neg_iou_thresh_lo)
        self.neg_iou_thresh_lo = neg_iou_thresh_lo 

    def __call__(self, roi, bbox, label,
                 loc_normalize_mean=(0., 0., 0., 0.),
                 loc_normalize_std=(0.1, 0.1, 0.2, 0.2)):
        """
        # input : (2000, 4), (box수, 4), (box수, 1), (0., 0., 0., 0.), (0.1, 0.1, 0.2, 0.2))
        """
        n_bbox, _ = bbox.shape

        # roi = np.concatenate((roi, bbox), axis=0)
        
        pos_roi_per_image = np.round(self.n_sample * self.pos_ratio) # positive image 갯수 = 32
        iou = bbox_iou(roi, bbox) # RoI와 bounding box IoU
        
        gt_assignment = iou.argmax(axis=1)
        max_iou = iou.max(axis=1)
        gt_roi_label = label[gt_assignment] + 1 # class label [0, n_fg_class - 1] -> [1, n_fg_class].

        # positive sample 선택 (>= pos_iou_thresh IoU)
        pos_index = np.where(max_iou >= self.pos_iou_thresh)[0]
        pos_roi_per_this_image = int(min(pos_roi_per_image, pos_index.size))
        if pos_index.size > 0:
            pos_index = np.random.choice(
                pos_index, size=pos_roi_per_this_image, replace=False)

        # Negative sample 선택 [neg_iou_thresh_lo, neg_iou_thresh_hi)
        neg_index = np.where((max_iou < self.neg_iou_thresh_hi) &
                             (max_iou >= self.neg_iou_thresh_lo))[0]
        neg_roi_per_this_image = self.n_sample - pos_roi_per_this_image
        neg_roi_per_this_image = int(min(neg_roi_per_this_image,
                                         neg_index.size))
        if neg_index.size > 0:
            neg_index = np.random.choice(
                neg_index, size=neg_roi_per_this_image, replace=False)

        # The indices that we're selecting (both positive and negative).
        keep_index = np.append(pos_index, neg_index)
        gt_roi_label = gt_roi_label[keep_index]
        gt_roi_label[pos_roi_per_this_image:] = 0  # negative sample의 label = 0
        sample_roi = roi[keep_index] # (128, 4)

        # sample roi와 gt_bbox를 이용해 bbox regression에서 regression해야할 ground truth loc값(t_x, t_y, t_w, t_h) 계산
        gt_roi_loc = bbox2loc(sample_roi, bbox[gt_assignment[keep_index]]) # (128, 4)
        gt_roi_loc = ((gt_roi_loc - np.array(loc_normalize_mean, np.float32)) / np.array(loc_normalize_std, np.float32))

        return sample_roi, gt_roi_loc, gt_roi_label

In [72]:
train()

loading annotations into memory...
Done (t=0.19s)
creating index...
index created!
--------------------------------------------------
load data
image shape : torch.Size([3, 800, 800])


  0%|          | 0/4883 [00:00<?, ?it/s]

(2002, 2)
(128, 4) (128, 4) (128,)
check



