In [52]:
import os
import copy
import torch
import detectron2
from detectron2.data import detection_utils as utils
from detectron2.utils.logger import setup_logger
setup_logger()

from detectron2 import model_zoo
from detectron2.config import get_cfg
from detectron2.engine import DefaultTrainer
from detectron2.data import DatasetCatalog, MetadataCatalog
from detectron2.data.datasets import register_coco_instances
from detectron2.evaluation import COCOEvaluator
from detectron2.data import build_detection_test_loader, build_detection_train_loader

import albumentations as A

import numpy as np
import random
from albumentations.pytorch.transforms import ToTensorV2

In [53]:
# Register Dataset
try:
    register_coco_instances('coco_trash_train', {}, '../../dataset/train.json', '../../dataset/')
except AssertionError:
    pass

try:
    register_coco_instances('coco_trash_test', {}, '../../dataset/test.json', '../../dataset/')
except AssertionError:
    pass

MetadataCatalog.get('coco_trash_train').thing_classes = ["General trash", "Paper", "Paper pack", "Metal", 
                                                         "Glass", "Plastic", "Styrofoam", "Plastic bag", "Battery", "Clothing"]

In [54]:
# config 불러오기
cfg = get_cfg()
cfg.merge_from_file(model_zoo.get_config_file('COCO-Detection/faster_rcnn_R_101_FPN_3x.yaml'))

In [55]:
# config 수정하기
cfg.DATASETS.TRAIN = ('coco_trash_train',)
cfg.DATASETS.TEST = ('coco_trash_test',)

cfg.DATALOADER.NUM_WOREKRS = 2

cfg.MODEL.WEIGHTS = model_zoo.get_checkpoint_url('COCO-Detection/faster_rcnn_R_101_FPN_3x.yaml')

cfg.SOLVER.IMS_PER_BATCH = 4
cfg.SOLVER.BASE_LR = 0.001
cfg.SOLVER.MAX_ITER = 15000
cfg.SOLVER.STEPS = (8000,12000)
cfg.SOLVER.GAMMA = 0.005
cfg.SOLVER.CHECKPOINT_PERIOD = 3000

cfg.OUTPUT_DIR = './output'

cfg.MODEL.ROI_HEADS.BATCH_SIZE_PER_IMAGE = 128
cfg.MODEL.ROI_HEADS.NUM_CLASSES = 10

cfg.TEST.EVAL_PERIOD = 3000

In [None]:
train_ds = DatasetCatalog.get('coco_trash_train')
test_ds = DatasetCatalog.get('coco_trash_test')

In [57]:
metadata = MetadataCatalog.get('coco_trash_train')
# metadata

In [None]:
train_ds[1]

In [59]:
# 클래스 리스트 초기화
general_trash_list, paper_list, paper_pack_list, metal_list = [], [], [], []
glass_list, plastic_list, styrofoam_list, plastic_bag_list = [], [], [], []
battery_list, clothing_list = [], []

# 클래스별 카운터 초기화
num_general_trash, num_paper, num_paper_pack, num_metal = 0, 0, 0, 0
num_glass, num_plastic, num_styrofoam, num_plastic_bag = 0, 0, 0, 0
num_battery, num_clothing = 0, 0

# 데이터셋 순회
for i in range(len(train_ds)):
    category_id = train_ds[i]["annotations"][0]["category_id"]
    if category_id == 0:
        general_trash_list.append(train_ds[i])
        num_general_trash += 1
    elif category_id == 1:
        paper_list.append(train_ds[i])
        num_paper += 1
    elif category_id == 2:
        paper_pack_list.append(train_ds[i])
        num_paper_pack += 1
    elif category_id == 3:
        metal_list.append(train_ds[i])
        num_metal += 1
    elif category_id == 4:
        glass_list.append(train_ds[i])
        num_glass += 1
    elif category_id == 5:
        plastic_list.append(train_ds[i])
        num_plastic += 1
    elif category_id == 6:
        styrofoam_list.append(train_ds[i])
        num_styrofoam += 1
    elif category_id == 7:
        plastic_bag_list.append(train_ds[i])
        num_plastic_bag += 1
    elif category_id == 8:
        battery_list.append(train_ds[i])
        num_battery += 1
    elif category_id == 9:
        clothing_list.append(train_ds[i])
        num_clothing += 1

# 클래스 리스트와 카운터 리스트 생성
class_list = [
    general_trash_list, paper_list, paper_pack_list, metal_list,
    glass_list, plastic_list, styrofoam_list, plastic_bag_list,
    battery_list, clothing_list
]
num = [
    num_general_trash, num_paper, num_paper_pack, num_metal,
    num_glass, num_plastic, num_styrofoam, num_plastic_bag,
    num_battery, num_clothing
]

In [60]:
# print(class_list)

In [61]:
def train_ds2Numpy_arr(annotations):
    gt_bboxes = []
    classes = []
    
    for i in range(len(annotations)):
        # Bounding box를 그대로 사용
        gt_bboxes.append(annotations[i]["bbox"])
        
        # Category ID 추가 (클래스 정보)
        classes.append(annotations[i]["category_id"])
    
    return gt_bboxes, classes

In [71]:
# original image's size: rotate + centrer crop
ROTATED_CENTRER_CROP = A.Compose([
                        A.Rotate(limit=175, border_mode=1, p=1.0),
                        # A.CenterCrop(height=325, width=440, p=1.0),  #(325/440 = 65/85 = 520/704)
                        A.CenterCrop(height=400, width=400, p=1.0), 
                        A.HorizontalFlip(p=0.5),
                        A.VerticalFlip(p=0.5),
                        ], bbox_params=A.BboxParams(format="pascal_voc", label_fields=["bbox_classes"])) 

# original image's size: transpose + random crop
TRANSPOSE_RANDOM_CROP = A.Compose([
                        A.Transpose(p=1.0),
                        # A.RandomCrop(height=325, width=440, p=1.0),  #(325/440 = 65/85 = 520/704)
                        A.RandomCrop(height=400, width=400, p=1.0), 
                        A.HorizontalFlip(p=0.5),
                        A.VerticalFlip(p=0.5),
                        ], bbox_params=A.BboxParams(format="pascal_voc", label_fields=["bbox_classes"])) 

def is_tiny_box(height, width, min_area=30):
    return True if height * width < min_area else False


def create_a_mosaic_set(image_list, xc, yc, H_mosaic_img, W_mosaic_img):
    mosaic_image = np.full((H_mosaic_img, W_mosaic_img, 3), 1, dtype=np.uint8) 
    mosaic_bboxes, mosaic_bbox_classes = [], []        

    for i, image_set in enumerate(image_list):
        shape = image_set["image"].shape
        if i == 0: #top-left
            mosaic_image[0:yc, 0:xc, :] = image_set["image"]
            for box, bbox_class in zip(image_set["bboxes"], image_set["bbox_classes"]):
                box = list(box)
                # "pascal_voc" to "coco"
                box[2] -= box[0]
                box[3] -= box[1]
                assert box[0] < xc and box[1] < yc and box[2] <= xc and box[3] <= yc, f"sub-image shape: {shape} || box: {box}"
                if is_tiny_box(*box[2:]):
                    continue
                mosaic_bboxes.append(box)
                mosaic_bbox_classes.append(bbox_class)
        elif i == 1: #top-right
            mosaic_image[0:yc, xc:, :] = image_set["image"]
            for box, bbox_class in zip(image_set["bboxes"], image_set["bbox_classes"]):
                box = list(box)
                # "pascal_voc" to "coco"
                box[2] -= box[0]
                box[3] -= box[1]
                if is_tiny_box(*box[2:]):
                    continue
                assert box[0] < W_mosaic_img - xc and box[1] < yc and box[2] <= W_mosaic_img - xc and box[3] <= yc, f"sub-image shape: {shape} || box: {box}"
                box[0] += xc
                mosaic_bboxes.append(box)
                mosaic_bbox_classes.append(bbox_class)
        elif i == 2: #bottom-left
            mosaic_image[yc:, 0:xc, :] = image_set["image"]
            for box, bbox_class in zip(image_set["bboxes"], image_set["bbox_classes"]):
                box = list(box)
                # "pascal_voc" to "coco"
                box[2] -= box[0]
                box[3] -= box[1]
                assert box[0] < xc and box[1] < H_mosaic_img - yc and box[2] <= xc and box[3] <= H_mosaic_img - yc, f"sub-image shape: {shape} || box: {box}"
                if is_tiny_box(*box[2:]):
                    continue
                box[1] += yc
                mosaic_bboxes.append(box)
                mosaic_bbox_classes.append(bbox_class)
        else:  # bottom-right
            mosaic_image[yc:, xc:, :] = image_set["image"]
            for box, bbox_class in zip(image_set["bboxes"], image_set["bbox_classes"]):
                box = list(box)
                # "pascal_voc" to "coco"
                box[2] -= box[0]
                box[3] -= box[1]
                if is_tiny_box(*box[2:]):
                    continue
                assert box[0] < W_mosaic_img - xc and box[1] < H_mosaic_img - yc and box[2] <= W_mosaic_img - xc and box[3] <= H_mosaic_img - yc, f"sub-image shape: {shape} || box: {box}"
                box[0] += xc
                box[1] += yc
                mosaic_bboxes.append(box)
                mosaic_bbox_classes.append(bbox_class)

    return mosaic_image, mosaic_bboxes, mosaic_bbox_classes

In [74]:
# mapper - input data를 어떤 형식으로 return할지 (따라서 augmnentation 등 데이터 전처리 포함 됨)
import detectron2.data.transforms as T

def AlbumentationsMapper(dataset_dict):
    # READ IMAGE 1:
    dataset_dict = copy.deepcopy(dataset_dict)
    image_1 = utils.read_image(dataset_dict["file_name"])
    
    gt_bboxes_1, classes_1 = train_ds2Numpy_arr(dataset_dict["annotations"])  # 마스크 관련 제거
    # print('classes_1 = ', classes_1)
    if random.random() < 0.6:
        #FINAL
        # n_height = np.random.choice([640, 672, 704, 736, 768, 800])
        # n_width = int((n_height * 704 / 520) + 0.5)

        n_height = 1024
        n_width = 1024

        FINAL = A.Compose([
                            A.HorizontalFlip(p=0.5),
                            A.VerticalFlip(p=0.5),
                            A.Resize(height=n_height, width=n_width, p=1.0),
                            ToTensorV2(p=1.0),
                            ], bbox_params=A.BboxParams(format="coco", label_fields=["bbox_classes"]))
        transformed = FINAL(image=image_1, bboxes=gt_bboxes_1, bbox_classes=classes_1)
        transformed_image, transformed_bboxes, transformed_bbox_classes = \
        transformed["image"], transformed["bboxes"], transformed["bbox_classes"]
    else: 
        # dataset_dict_2, dataset_dict_3, dataset_dict_4 of 3 images with same cell type. 
        cell_type = classes_1[0]
        listOfdict = class_list[cell_type]
        num_dict = num[cell_type]
        dataset_dict_2 = copy.deepcopy(listOfdict[random.randint(0, num_dict - 1)])
        dataset_dict_3 = copy.deepcopy(listOfdict[random.randint(0, num_dict - 1)])
        dataset_dict_4 = copy.deepcopy(listOfdict[random.randint(0, num_dict - 1)])
        del listOfdict
        
        # READ 3 IMAGES + CHANGE BBOX FORMAT FROM "coco" TO "pascal_voc":
        # IMAGE 1:
        for idx, box in enumerate(gt_bboxes_1):
            gt_bboxes_1[idx][2] += box[0]
            gt_bboxes_1[idx][3] += box[1]
        # IMAGE 2:
        image_2 = utils.read_image(dataset_dict_2["file_name"])
        gt_bboxes_2, classes_2 = train_ds2Numpy_arr(dataset_dict_2["annotations"])
        for idx, box in enumerate(gt_bboxes_2):
            gt_bboxes_2[idx][2] += box[0]
            gt_bboxes_2[idx][3] += box[1]
        # IMAGE 3:
        image_3 = utils.read_image(dataset_dict_3["file_name"])
        gt_bboxes_3, classes_3 = train_ds2Numpy_arr(dataset_dict_3["annotations"])
        for idx, box in enumerate(gt_bboxes_3):
            gt_bboxes_3[idx][2] += box[0]
            gt_bboxes_3[idx][3] += box[1]
        # IMAGE 4:
        image_4 = utils.read_image(dataset_dict_4["file_name"])
        gt_bboxes_4, classes_4 = train_ds2Numpy_arr(dataset_dict_4["annotations"])
        for idx, box in enumerate(gt_bboxes_4):
            gt_bboxes_4[idx][2] += box[0]
            gt_bboxes_4[idx][3] += box[1]

        # CUSTOMED_MOSAIC: 4 images (height=400, width=400) & "rotate + centrer crop" or "transpose + random crop" -> height=800, width=800
        if random.random() < 0.5: 
            image_list = []
            # IMAGE 1:
            transformed = ROTATED_CENTRER_CROP(image=image_1, bboxes=gt_bboxes_1, bbox_classes=classes_1)
            image_list.append({"image": transformed["image"], "bboxes": transformed["bboxes"], "bbox_classes": transformed["bbox_classes"]})
            # IMAGE 2:
            transformed = ROTATED_CENTRER_CROP(image=image_2, bboxes=gt_bboxes_2, bbox_classes=classes_2)
            image_list.append({"image": transformed["image"], "bboxes": transformed["bboxes"], "bbox_classes": transformed["bbox_classes"]})
            # IMAGE 3:
            transformed = TRANSPOSE_RANDOM_CROP(image=image_3, bboxes=gt_bboxes_3, bbox_classes=classes_3)
            image_list.append({"image": transformed["image"], "bboxes": transformed["bboxes"], "bbox_classes": transformed["bbox_classes"]})
            # IMAGE 4:
            transformed = TRANSPOSE_RANDOM_CROP(image=image_4, bboxes=gt_bboxes_4, bbox_classes=classes_4)
            image_list.append({"image": transformed["image"], "bboxes": transformed["bboxes"], "bbox_classes": transformed["bbox_classes"]})
            # CUSTOMED MOSAIC SET:
            random.shuffle(image_list)
            inf = {"xc": 400, "yc": 400, "H_mosaic_img": 800, "W_mosaic_img": 800}
            cus_mosaic_image, cus_mosaic_bboxes, cus_mosaic_bbox_classes = create_a_mosaic_set(image_list, **inf)

            # FINAL:
            # n_height = np.random.choice([672, 704, 736, 768, 800])
            # n_width = int((n_height * 704 / 520) + 0.5)

            n_height = 1024
            n_width = 1024

            FINAL = A.Compose([
                                A.Resize(height=n_height, width=n_width, p=1.0),
                                ToTensorV2(p=1.0),
                                ], bbox_params=A.BboxParams(format="coco", label_fields=["bbox_classes"]))
            transformed = FINAL(image=cus_mosaic_image, bboxes=cus_mosaic_bboxes, bbox_classes=cus_mosaic_bbox_classes)
            transformed_image, transformed_bboxes, transformed_bbox_classes = \
            transformed["image"], transformed["bboxes"], transformed["bbox_classes"]
            
        # MOSAIC: 4 images & "random height and width" + "random crop" -> height=800, width=800
        else:
            image_list = []
            H_mosaic = 800
            W_mosaic = 800
            # xc = int(random.uniform(400, 624)) # W_mosaic - 400
            # yc = int(random.uniform(400, 624)) # H_mosaic - 400
            xc = 400
            yc = 400
            # IMAGE 1: top-left
            RANDOM_CROP = A.Compose([
                                A.RandomCrop(height=yc, width=xc, p=1),
                                A.HorizontalFlip(p=0.5),
                                A.VerticalFlip(p=0.5),
                                ], bbox_params=A.BboxParams(format="pascal_voc", label_fields=["bbox_classes"]))
            transformed = RANDOM_CROP(image=image_1, bboxes=gt_bboxes_1, bbox_classes=classes_1)
            image_list.append({"image": transformed["image"], "bboxes": transformed["bboxes"], "bbox_classes": transformed["bbox_classes"]})
            # IMAGE 2: top-right
            RANDOM_CROP = A.Compose([
                                A.RandomCrop(height=yc, width=W_mosaic-xc, p=1),
                                A.HorizontalFlip(p=0.5),
                                A.VerticalFlip(p=0.5),
                                ], bbox_params=A.BboxParams(format="pascal_voc", label_fields=["bbox_classes"]))
            transformed = RANDOM_CROP(image=image_2, bboxes=gt_bboxes_2, bbox_classes=classes_2)
            image_list.append({"image": transformed["image"], "bboxes": transformed["bboxes"], "bbox_classes": transformed["bbox_classes"]})
            # IMAGE 3: bottom-left
            RANDOM_CROP = A.Compose([
                                A.RandomCrop(height=H_mosaic-yc, width=xc, p=1),
                                A.HorizontalFlip(p=0.5),
                                A.VerticalFlip(p=0.5),
                                ], bbox_params=A.BboxParams(format="pascal_voc", label_fields=["bbox_classes"]))
            transformed = RANDOM_CROP(image=image_3, bboxes=gt_bboxes_3, bbox_classes=classes_3)
            image_list.append({"image": transformed["image"], "bboxes": transformed["bboxes"], "bbox_classes": transformed["bbox_classes"]})
            # IMAGE 4: bottom-right
            RANDOM_CROP = A.Compose([
                                A.RandomCrop(height=H_mosaic-yc, width=W_mosaic-xc, p=1),
                                A.HorizontalFlip(p=0.5),
                                A.VerticalFlip(p=0.5),
                                ], bbox_params=A.BboxParams(format="pascal_voc", label_fields=["bbox_classes"]))
            transformed = RANDOM_CROP(image=image_4, bboxes=gt_bboxes_4, bbox_classes=classes_4)
            image_list.append({"image": transformed["image"], "bboxes": transformed["bboxes"], "bbox_classes": transformed["bbox_classes"]})
            # MOSAIC SET:
            inf = {"xc": xc, "yc": yc, "H_mosaic_img": 800, "W_mosaic_img": 800}
            mosaic_image, mosaic_bboxes, mosaic_bbox_classes = create_a_mosaic_set(image_list, **inf)

            # FINAL:
            # n_height = np.random.choice([736, 768, 800])
            # n_width = int((n_height * 704 / 520) + 0.5)

            n_height = 1024
            n_width = 1024

            FINAL = A.Compose([
                                A.Resize(height=n_height, width=n_width, p=1.0),
                                ToTensorV2(p=1.0),
                                ], bbox_params=A.BboxParams(format="coco", label_fields=["bbox_classes"]))
            transformed = FINAL(image=mosaic_image, bboxes=mosaic_bboxes, bbox_classes=mosaic_bbox_classes)
            transformed_image, transformed_bboxes, transformed_bbox_classes = \
            transformed["image"], transformed["bboxes"], transformed["bbox_classes"]
    
    annos = []  # 바운딩 박스와 클래스 정보를 인스턴스 어노테이션으로 변환
    for gt_box, gt_class in zip(transformed_bboxes, transformed_bbox_classes):
        instance_annotations = {'bbox': gt_box, 'bbox_mode': dataset_dict["annotations"][0]["bbox_mode"], 'category_id': gt_class}
        annos.append(instance_annotations)
    
    final_dataset_dict = {'image': transformed_image, 'height': n_height, 'width': n_width} 
    
    instances = utils.annotations_to_instances(annos, image_size=(n_height, n_width))
    final_dataset_dict["instances"] = utils.filter_empty_instances(instances)

    return final_dataset_dict

In [64]:
# trainer - DefaultTrainer를 상속
class MyTrainer(DefaultTrainer):
    
    @classmethod
    def build_train_loader(cls, cfg, sampler=None):
        return build_detection_train_loader(
        cfg, mapper = AlbumentationsMapper, sampler = sampler
        )
    
    @classmethod
    def build_evaluator(cls, cfg, dataset_name, output_folder=None):
        if output_folder is None:
            os.makedirs('./output_eval', exist_ok = True)
            output_folder = './output_eval'
            
        return COCOEvaluator(dataset_name, cfg, False, output_folder)

In [None]:
# train
os.makedirs(cfg.OUTPUT_DIR, exist_ok = True)

trainer = MyTrainer(cfg)
trainer.resume_or_load(resume=False)
trainer.train()