# Library Import

In [42]:
from pycocotools.coco import COCO
from pycocotools.cocoeval import COCOeval
import numpy as np
import cv2
import os

import albumentations as A
from albumentations.pytorch import ToTensorV2

import torch
# faster rcnn model이 포함된 library
import torchvision

from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

from torch.utils.data import DataLoader, Dataset
import pandas as pd
import random
from tqdm import tqdm


from crops.custom_transform import BBoxSafeRandomCrop

# Dataset 생성

In [43]:
class CustomDataset(Dataset):
    '''
      data_dir: data가 존재하는 폴더 경로
      transforms: data transform (resize, crop, Totensor, etc,,,)
    '''

    def __init__(self, annotation, data_dir, transforms=None, mosaic=False):
        super().__init__()
        self.data_dir = data_dir
        self.coco = COCO(annotation)
        self.predictions = {
            "images": self.coco.dataset["images"].copy(),
            "categories": self.coco.dataset["categories"].copy(),
            "annotations": None
        }
        self.transforms = transforms
        self.mosaic = mosaic

    def __getitem__(self, index: int):
        if self.mosaic and random.random() <= 0.5:
            self.transforms = A.Compose([
                ToTensorV2(p=1.0)
            ], bbox_params={'format': 'pascal_voc', 'label_fields': ['labels']})
            return self.load_mosaic(index)
        else:
            return self.load_image_target(index)

    def load_image_target(self, index):
        image_id = self.coco.getImgIds(imgIds=index)

        image_info = self.coco.loadImgs(image_id)[0]

        image = cv2.imread(os.path.join(self.data_dir, image_info['file_name']))
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB).astype(np.float32)
        image /= 255.0

        ann_ids = self.coco.getAnnIds(imgIds=image_info['id'])
        anns = self.coco.loadAnns(ann_ids)

        boxes = np.array([x['bbox'] for x in anns])

        # boxes (x_min, y_min, x_max, y_max)
        boxes[:, 2] = boxes[:, 0] + boxes[:, 2]
        boxes[:, 3] = boxes[:, 1] + boxes[:, 3]

        # torchvision faster_rcnn은 label=0을 background로 취급
        # torchvision 사용하는 경우 class_id를 1~10으로 수정 필요
        labels = np.array([x['category_id'] for x in anns])
        labels = torch.as_tensor(labels, dtype=torch.int64)

        # areas = np.array([x['area'] for x in anns])
        # areas = torch.as_tensor(areas, dtype=torch.float32)

        # is_crowds = np.array([x['iscrowd'] for x in anns])
        # is_crowds = torch.as_tensor(is_crowds, dtype=torch.int64)
        target = {'boxes': boxes, 'labels': labels, 'image_id': torch.tensor([index])}

        # transform
        if self.transforms :
            sample = {
                'image': image,
                'bboxes': target['boxes'],
                'labels': labels
            }
            sample = self.transforms(**sample)
            image = sample['image']
            target['boxes'] = torch.tensor(sample['bboxes'], dtype=torch.float32)
        return image, target, image_id

    def load_mosaic(self, index):
        # Mosaic에서 사용할 4장의 이미지를 선택
        indices = [index] + [random.randint(0, len(self.coco.getImgIds())+1) for _ in range(3)]
        images = []
        final_boxes = []
        labels = []

        s = 1024  # 기본 크기 (전체 Mosaic 이미지 크기)
        mosaic_img = np.zeros((s, s, 3), dtype=np.float32)
        xc, yc = np.random.randint(s * 0.25, s * 0.75, (2,))

        # Load images
        for i, idx in enumerate(indices):
            image, target, _ = self.load_image_target(idx)
            h, w = image.shape[:2]

            if not isinstance(image, np.ndarray):
                image = image.cpu().numpy().transpose(1,2,0)

            # 각각의 이미지를 중앙 기준으로 배치
            if i == 0:  # top-left
                x1a, y1a, x2a, y2a = 0, 0, xc, yc
                x1b, y1b, x2b, y2b = s - xc, s - yc, s, s
            elif i == 1:  # top-right
                x1a, y1a, x2a, y2a = xc, 0, s, yc
                x1b, y1b, x2b, y2b = 0, s - yc, s - xc, s
            elif i == 2:  # bottom-left
                x1a, y1a, x2a, y2a = 0, yc, xc, s
                x1b, y1b, x2b, y2b = s - xc, 0, s, s-yc
            elif i == 3:  # bottom-right
                x1a, y1a, x2a, y2a = xc, yc, s, s
                x1b, y1b, x2b, y2b = 0, 0, s-xc, s-yc

            offset_x = x1a - x1b
            offset_y = y1a - y1b

            boxes = target['boxes']
            boxes[:, 0] += offset_x
            boxes[:, 1] += offset_y
            boxes[:, 2] += offset_x
            boxes[:, 3] += offset_y

            # 이미지 배치
            mosaic_img[y1a:y2a, x1a:x2a] = image[y1b:y2b, x1b:x2b]
            final_boxes.append(boxes)
            labels.append(target['labels'])

        # 합친 이미지 생성
        final_boxes = np.vstack(final_boxes)
        final_boxes[:, 0:] = np.clip(final_boxes[:, 0:], 0, s).astype(np.int32)

        final_labels = np.concatenate(labels, axis=0)
        label_names = [self.coco.loadCats(x)[0]['name'] for x in final_labels.tolist()]

        # (xmin = 0 & xmax =0) | (ymin = 0 & ymax=0) 제외
        delete_list = []
        for i in range(len(final_boxes)):
            if final_boxes[i][0] == final_boxes[i][2] or final_boxes[i][1] == final_boxes[i][3]:
                delete_list.append(i)

        final_boxes = np.delete(final_boxes, delete_list, axis=0)
        final_labels = np.delete(final_labels, delete_list, axis=0)
        label_names = np.delete(label_names, delete_list, axis=0)

        target = {'boxes': final_boxes, 'labels': final_labels, 'image_id': torch.tensor([index]),
                   'label_names': label_names}

        return mosaic_img, target, index

    def __len__(self) -> int:
        return len(self.coco.getImgIds())

In [44]:
def get_train_transform():
    return A.Compose([
        A.HorizontalFlip(p=0.5),
        A.VerticalFlip(p=0.5),
        BBoxSafeRandomCrop(erosion_rate=0.0, p = 0.5),
        A.Resize(1024, 1024), 
        ToTensorV2(p=1.0)
    ], bbox_params={'format': 'pascal_voc', 'label_fields': ['labels']})


def get_valid_transform():
    return A.Compose([
        ToTensorV2(p=1.0)
    ], bbox_params={'format': 'pascal_voc', 'label_fields': ['labels']})

# Util Functions

In [45]:
class Averager:
    def __init__(self):
        self.current_total = 0.0
        self.iterations = 0.0

    def send(self, value):
        self.current_total += value
        self.iterations += 1

    @property
    def value(self):
        if self.iterations == 0:
            return 0
        else:
            return 1.0 * self.current_total / self.iterations

    def reset(self):
        self.current_total = 0.0
        self.iterations = 0.0


def collate_fn(batch):
    return tuple(zip(*batch))

# Trainer

In [46]:
def train_fn(num_epochs, train_data_loader, optimizer, model, device):
    best_loss = 1000
    loss_hist = Averager()
    for epoch in range(num_epochs):
        loss_hist.reset()

        for images, targets, image_ids in tqdm(train_data_loader):

            # gpu 계산을 위해 image.to(device)
            images = list(image.float().to(device) for image in images)
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

            # calculate loss
            loss_dict = model(images, targets)

            losses = sum(loss for loss in loss_dict.values())
            loss_value = losses.item()

            loss_hist.send(loss_value)

            # backward
            optimizer.zero_grad()
            losses.backward()
            optimizer.step()

        print(f"Epoch #{epoch+1} loss: {loss_hist.value}")
        if loss_hist.value < best_loss:
            save_path = './checkpoints/faster_rcnn_torchvision_checkpoints.pth'
            save_dir = os.path.dirname(save_path)
            if not os.path.exists(save_dir):
                os.makedirs(save_dir)
            
            torch.save(model.state_dict(), save_path)
            best_loss = loss_hist.value

# Main

In [47]:
def main():
    # 데이터셋 불러오기
    annotation = '../../../dataset/train.json' # annotation 경로
    data_dir = '../../../dataset' # data_dir 경로
    train_dataset = CustomDataset(annotation, data_dir, get_train_transform(), mosaic=False) 
    train_data_loader = DataLoader(
        train_dataset,
        batch_size=16,
        shuffle=False,
        num_workers=0,
        collate_fn=collate_fn
    )
    device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
    print(device)
    
    # torchvision model 불러오기
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
    num_classes = 11 # class 개수= 10 + background
    # get number of input features for the classifier
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    model.to(device)
    params = [p for p in model.parameters() if p.requires_grad]
    
    optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)
    num_epochs = 12
    
    # training
    train_fn(num_epochs, train_data_loader, optimizer, model, device)

In [48]:
if __name__ == '__main__':
    main()

loading annotations into memory...
Done (t=0.32s)
creating index...
index created!
cuda


  0%|          | 0/306 [00:00<?, ?it/s]


NotImplementedError: Method get_params_dependent_on_targets is not implemented in class BBoxSafeRandomCrop