# Library Import

In [None]:
from pycocotools.coco import COCO
from pycocotools.cocoeval import COCOeval
import numpy as np
import cv2
import os

import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2

import torch

# faster rcnn model이 포함된 library
import torchvision

from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

from torch.utils.data import DataLoader, Dataset
import pandas as pd
from tqdm import tqdm
import torchvision.models as models

import torch.nn as nn
import torch.nn.functional as F
from collections import OrderedDict
from torchvision.models.detection.rpn import RegionProposalNetwork, RPNHead

# Dataset 생성

In [None]:
class CustomDataset(Dataset):
    """
    data_dir: data가 존재하는 폴더 경로
    transforms: data transform (resize, crop, Totensor, etc,,,)
    """

    def __init__(self, annotation, data_dir, transforms=None):
        super().__init__()
        self.data_dir = data_dir
        # coco annotation 불러오기 (coco API)
        self.coco = COCO(annotation)
        self.predictions = {
            "images": self.coco.dataset["images"].copy(),
            "categories": self.coco.dataset["categories"].copy(),
            "annotations": None,
        }
        self.transforms = transforms

    def __getitem__(self, index: int):

        image_id = self.coco.getImgIds(imgIds=index)

        image_info = self.coco.loadImgs(image_id)[0]

        image = cv2.imread(os.path.join(self.data_dir, image_info["file_name"]))
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB).astype(np.float32)
        image /= 255.0

        ann_ids = self.coco.getAnnIds(imgIds=image_info["id"])
        anns = self.coco.loadAnns(ann_ids)

        boxes = np.array([x["bbox"] for x in anns])

        # boxex (x_min, y_min, x_max, y_max)
        boxes[:, 2] = boxes[:, 0] + boxes[:, 2]
        boxes[:, 3] = boxes[:, 1] + boxes[:, 3]

        # torchvision faster_rcnn은 label=0을 background로 취급
        # class_id를 1~10으로 수정
        labels = np.array([x["category_id"] + 1 for x in anns])
        labels = torch.as_tensor(labels, dtype=torch.int64)

        areas = np.array([x["area"] for x in anns])
        areas = torch.as_tensor(areas, dtype=torch.float32)

        is_crowds = np.array([x["iscrowd"] for x in anns])
        is_crowds = torch.as_tensor(is_crowds, dtype=torch.int64)

        target = {
            "boxes": boxes,
            "labels": labels,
            "image_id": torch.tensor([index]),
            "area": areas,
            "iscrowd": is_crowds,
        }

        # transform
        if self.transforms:
            sample = {"image": image, "bboxes": target["boxes"], "labels": labels}
            sample = self.transforms(**sample)
            image = sample["image"]
            target["boxes"] = torch.tensor(sample["bboxes"], dtype=torch.float32)

        return image, target, image_id

    def __len__(self) -> int:
        return len(self.coco.getImgIds())

In [None]:
def get_train_transform():
    return A.Compose(
        [A.Resize(1024, 1024), A.Flip(p=0.5), ToTensorV2(p=1.0)],
        bbox_params={"format": "pascal_voc", "label_fields": ["labels"]},
    )


def get_valid_transform():
    return A.Compose(
        [ToTensorV2(p=1.0)],
        bbox_params={"format": "pascal_voc", "label_fields": ["labels"]},
    )

# Util Functions

In [None]:
class Averager:
    def __init__(self):
        self.current_total = 0.0
        self.iterations = 0.0

    def send(self, value):
        self.current_total += value
        self.iterations += 1

    @property
    def value(self):
        if self.iterations == 0:
            return 0
        else:
            return 1.0 * self.current_total / self.iterations

    def reset(self):
        self.current_total = 0.0
        self.iterations = 0.0


def collate_fn(batch):
    return tuple(zip(*batch))

# Trainer

In [None]:
def train_fn(num_epochs, train_data_loader, optimizer, model, device):
    best_loss = 1000
    loss_hist = Averager()
    for epoch in range(num_epochs):
        loss_hist.reset()

        for images, targets, image_ids in tqdm(train_data_loader):

            # gpu 계산을 위해 image.to(device)
            images = list(image.float().to(device) for image in images)
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

            # calculate loss
            loss_dict = model(images, targets)

            losses = sum(loss for loss in loss_dict.values())
            loss_value = losses.item()

            loss_hist.send(loss_value)

            # backward
            optimizer.zero_grad()
            losses.backward()
            optimizer.step()

        print(f"Epoch #{epoch+1} loss: {loss_hist.value}")
        if loss_hist.value < best_loss:
            save_path = "./checkpoints/faster_rcnn_torchvision_checkpoints.pth"
            save_dir = os.path.dirname(save_path)
            if not os.path.exists(save_dir):
                os.makedirs(save_dir)

            torch.save(model.state_dict(), save_path)
            best_loss = loss_hist.value

#### FPN

In [None]:
class FPN(nn.Module):
    r"""Feature Pyramid Network.

    This is an implementation of paper `Feature Pyramid Networks for Object
    Detection <https://arxiv.org/abs/1612.03144>`_.

    Args:
        in_channels (List[int]): input feature map들의 channels.
        out_channels (int): output channel
        extra_level (bool): Number of output scales.
            Default: `True
        upsample_cfg (dict): Config dict for interpolate layer.
            Default: `dict(mode='nearest')`
    """

    def __init__(
        self,
        in_channels,
        out_channels,
        extra_level=True,
        upsample_cfg=dict(mode="nearest"),
    ):
        super(FPN, self).__init__()
        assert isinstance(in_channels, list)
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.num_ins = len(in_channels)
        self.upsample_cfg = upsample_cfg.copy()
        self.backbone_end_level = self.num_ins
        self.extra_level = extra_level

        self.lateral_convs = nn.ModuleList()
        self.fpn_convs = nn.ModuleList()

        """
        input list의 길이만큼 fpn_conv, lateral_conv 생성
        fpn_conv: top-down 수행 전 channel을 맞춰주는 convolution
        lateral_conv: top-down 수행 후 학습을 위해 통과하는 convolution
        """
        for i in range(self.backbone_end_level):
            l_conv = nn.Conv2d(
                in_channels[i], out_channels, kernel_size=1, stride=1, padding=0
            )
            fpn_conv = nn.Conv2d(
                out_channels, out_channels, kernel_size=3, stride=1, padding=1
            )

            self.lateral_convs.append(l_conv)
            self.fpn_convs.append(fpn_conv)

        self.normal_init(self.fpn_convs, 0, 0.01)
        self.normal_init(self.lateral_convs, 0, 0.01)

        if self.extra_level:
            in_channels = self.in_channels[self.backbone_end_level - 1]
            self.extra_conv = nn.Conv2d(
                out_channels, out_channels, kernel_size=3, stride=2, padding=1
            )

            self.normal_init(self.extra_conv, 0, 0.01)

    # default init_weights for conv(msra) and norm in ConvModule
    def normal_init(self, convs, mean, stddev, truncated=False):
        """
        weight initialization
        """
        if isinstance(convs, nn.ModuleList):
            for conv in convs:
                conv.weight.data.normal_(mean, stddev)
                conv.bias.data.zero_()
        else:
            convs.weight.data.normal_(mean, stddev)
            convs.bias.data.zero_()

    """
        inputs: list of feature maps from backbone
        outs: list of feature maps
                FPN을 통과한 feature map, input 과 shape 동일
                self.extra_level인 True인 경우 feature map 하나 추가
    """

    def forward(self, inputs):
        """Forward function."""
        assert len(inputs) == len(self.in_channels)

        # build laterals
        # use self.lateral_convs
        laterals = [
            lateral_conv(inputs[i]) for i, lateral_conv in enumerate(self.lateral_convs)
        ]

        # build top-down path
        # use F.interpolate(laterals[i], size, **self.upsample_cfg)

        used_backbone_levels = len(laterals)
        for i in range(used_backbone_levels - 1, 0, -1):
            prev_shape = laterals[i - 1].shape[2:]
            laterals[i - 1] += F.interpolate(
                laterals[i], size=prev_shape, **self.upsample_cfg
            )

        # build outputs
        # use self.fpn_convs
        # part 1: from original levels
        outs = [self.fpn_convs[i](laterals[i]) for i in range(used_backbone_levels)]

        # part 2: add extra levels
        # use self.extra_level
        if self.extra_level:
            outs.append(self.extra_conv(laterals[-1]))

        return tuple(outs)

#### custom backbone

In [None]:
class ResNextFPN(nn.Module):
    def __init__(self):
        super(ResNextFPN, self).__init__()
        self.backbone = resnext50_32x4d = models.resnext50_32x4d(pretrained=True)
        self.fpn = FPN(
            in_channels=[256, 512, 1024, 2048], out_channels=256, extra_level=True
        )

    def forward(self, x):
        x = self.backbone.conv1(x)
        x = self.backbone.bn1(x)
        x = self.backbone.relu(x)
        x = self.backbone.maxpool(x)

        # 각 stage를 지나는 outs에 저장
        outs = []
        outs.append(self.backbone.layer1(x))
        outs.append(self.backbone.layer2(outs[-1]))
        outs.append(self.backbone.layer3(outs[-1]))
        outs.append(self.backbone.layer4(outs[-1]))

        # fpn 통과
        fpn_outs = self.fpn(outs)

        # 기존 torchvision backbone에서 사용하던 양식 맞추기
        feat_list = ["0", "1", "2", "3", "4"]
        out = OrderedDict([(k, v) for k, v in zip(feat_list, fpn_outs)])

        return out

# Main

In [None]:
def main():
    # 데이터셋 불러오기
    annotation = "../../dataset/train.json"  # annotation 경로
    data_dir = "../../dataset"  # data_dir 경로
    train_dataset = CustomDataset(annotation, data_dir, get_train_transform())
    train_data_loader = DataLoader(
        train_dataset, batch_size=4, shuffle=False, num_workers=0, collate_fn=collate_fn
    )
    device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
    print(device)

    # torchvision model 불러오기
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
    # backbone 교체
    model.backbone = ResNextFPN()
    num_classes = 11  # class 개수= 10 + background

    # get number of input features for the classifier
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    model.to(device)
    params = [p for p in model.parameters() if p.requires_grad]

    optimizer = torch.optim.Adam(params, lr=0.0005, weight_decay=0.0005)
    num_epochs = 1

    # training
    train_fn(num_epochs, train_data_loader, optimizer, model, device)

In [None]:
if __name__ == "__main__":
    main()