In [None]:
from ensemble_boxes import nms, soft_nms, non_maximum_weighted, weighted_boxes_fusion

from PIL import Image
from tqdm.autonotebook import tqdm
from glob import glob

import json
import os
import numpy as np

In [None]:
TEST_IMAGE_DIR = "/home/tupleteam/git/chula_icip/test/data"

NMS_THRESH=0.55
BOX_THRESH=0.55
PP_THRESH=0.55

# 2022-05-20
result_files = [
    {
        "name": "htc-x101-64x4d",
        "path": "predictions/best_20220520/htc-x101-64x4d-fpn-dconv_pseudo-fusion_mIoU=0.928_threshold=0.1.json",
        "weight": 0.2,
    },
    {
        "name": "htc-hrnet-w32",
        "path": "predictions/best_20220520/htc-hrnet-w32_pseudo-fusion_mIoU=0.928_threshold=0.1.json",
        "weight": 0.2,
    },
    {
        "name": "tood-r101",
        "path": "predictions/best_20220520/tood-r101-dconv_12epoch_pseudo-fusion_mIoU=0.926_threshold=0.1.json",
        "weight": 0.2,
    },
    {
        "name": "hrnetv2p",
        "path": "predictions/best_20220520/cascade-rcnn-hrnetv2p-w32_10epoch_pseudo-fusion_mIoU=0.927_threshold=0.1.json",
        "weight": 0.2,
    },
        {
        "name": "gfl-r101",
        "path": "predictions/best_20220520/gfl-r101-dcn_pseudo-fusion_mIoU=0.923_threshod=0.1.json",
        "weight": 0.2,
    },

]

In [None]:
def format_annotations(file_path):
    
    try:
        annotations = json.load(open(file_path, "r"))["annotations"]
    except TypeError as e:
        annotations = json.load(open(file_path, "r"))
    
    data = {}
    for ann in annotations:
        
        try:
            file_name = ann["file_name"]
        except KeyError as e: 
            image_id = ann["image_id"]
            file_name = "{}".format(image_id).zfill(4)
            file_name = f"{file_name}.jpg"
        
        image_info = image_dict[file_name]
        
        orig_bbox = ann["bbox"]
        bbox = norm_coco_bbox(ann["bbox"], image_info["width"], image_info["height"])
        score = ann["score"]
        category_id = ann["category_id"]

        if file_name not in data.keys():
            data[file_name] = {
                "original_boxes_list": [],
                "boxes_list": [],
                "labels_list": [],
                "scores_list": [],
            }

        data[file_name]["original_boxes_list"].append(orig_bbox)
        data[file_name]["boxes_list"].append(bbox)
        data[file_name]["labels_list"].append(category_id)
        data[file_name]["scores_list"].append(score)
        
    return data

def norm_coco_bbox(coco_bbox, w, h):
    # coco bbox format: [x,y,width,height]
    # Coordinates for boxes expected to be normalized e.g in range [0; 1]. Order: x1, y1, x2, y2.
    
    x1 = coco_bbox[0] / w
    x2 = x1 + (coco_bbox[2] / w)
    y1 = coco_bbox[1] / h
    y2 = y1 + (coco_bbox[3] / h)
    
    return [x1, y1, x2, y2]

def convert_norm_box_to_coco_bbox(norm_box, w, h):
    
    bbox = [
            norm_box[0] * w,
            norm_box[1] * h,
            (norm_box[2] - norm_box[0]) * w,
            (norm_box[3] - norm_box[1]) * h,
        ]
    return bbox

In [None]:
# Create image witdh, height dict

image_dict = {}
image_paths = sorted(glob(f"{TEST_IMAGE_DIR}/*.jpg"))

coco_images = []

image_id = 0
for p in tqdm(image_paths):
    file_name = os.path.basename(p)
    
    im = Image.open(p)
    w, h = im.size
    
    image_id += 1
    
    image_dict[file_name] = {"width": w, "height": h, "image_id": image_id}
    
    coco_images.append({
        "id": image_id,
        "file_name": file_name,
        "height": h,
        "width": w,
        "license": 1,
        "coco_url": None
    })

### Format annotation each models before fusion

In [None]:
format_annotation_dict = {}
file_paths = []
for rf in result_files:
    
    print(rf["path"])
    
    format_ann = format_annotations(rf["path"])
    
    format_annotation_dict[rf["name"]] = format_ann
    
    file_paths.extend(format_ann.keys())
    
uniq_file_paths = sorted(list(set(file_paths)))
len(uniq_file_paths)

In [None]:
weight_dict = {r["name"]: r["weight"] for r in result_files}
weight_dict

### Fusion model

In [None]:
def process_fusion(image_names, iou_thr, skip_box_thr):
    fusions_dict = {}

    for path in tqdm(image_names):

        boxes_list = []
        scores_list = []
        labels_list = []
        weights = []

        for model_name, format_ann in format_annotation_dict.items():
            #print(model_name)

            info = format_ann.get(path)

            if not info:
                # no prediction for this model
                continue

            boxes_list.append(info["boxes_list"])
            scores_list.append(info["scores_list"])
            labels_list.append(info["labels_list"])
            weights.append(weight_dict.get(model_name))

        if len(boxes_list) < 4:
            print(path, len(boxes_list) )
            print(labels_list)
            print(scores_list)
            print("-" * 20)
            
        boxes, scores, labels = weighted_boxes_fusion(
                                    boxes_list, 
                                    scores_list, 
                                    labels_list, 
                                    weights=weights, 
                                    iou_thr=iou_thr, 
                                    skip_box_thr=skip_box_thr
                                )

        fusions_dict[path] = {
            "boxes": boxes,
            "scores": scores,
            "labels": labels,
        }
        
    return fusions_dict

fusions_dict = process_fusion(uniq_file_paths[:], NMS_THRESH, BOX_THRESH)

## Convert fusions_dict back to submission

In [None]:
items = []
cnt_id = 0
cnt_lower_threshold = 0
threshold = PP_THRESH

for file_name, info in fusions_dict.items():
    #print(file_name)
    
    image_info = image_dict[file_name]
    
    score_indexes = np.argsort(info["scores"])
    
    if len(info["scores"]) == 0:
        continue
    
    # filter max score
    index = np.argmax(info["scores"])
    
    cnt_annotate = 0
    found_category_id = None
    for index in score_indexes:
    
        label = info["labels"][index]
        score = info["scores"][index]
        box = info["boxes"][index]

        if score < threshold:
            continue

        category_id = int(label)
        coco_bbox = convert_norm_box_to_coco_bbox(box, image_info["width"], image_info["height"])

        item = {
            "id": cnt_id,
            "bbox": coco_bbox,
            "category_id": category_id,
            "file_name": file_name,
            "score": float(score),
        }
        items.append(item)

        cnt_id += 1
        cnt_annotate += 1
        found_category_id = category_id

## Fill missing prediction with selected model

In [None]:
missing_pred_images = set(image_dict.keys()) - set(
    list(map(lambda r: r["file_name"], items))
)
len(missing_pred_images)

In [None]:
selected_model = "tood-r101"

In [None]:
missing_items = []
for missing_im in missing_pred_images:
    file_name = missing_im
    print(file_name)
    
    image_info = image_dict[file_name]
    
    info = format_annotation_dict[selected_model].get(file_name, None)
    if info is None:
        print("Not found: {}".format(file_name))
        continue
    
    # filter max score
    index = np.argmax(info["scores_list"])
    
    label = info["labels_list"][index]
    score = info["scores_list"][index]
    box = info["boxes_list"][index]
    category_id = int(label)

    coco_bbox = convert_norm_box_to_coco_bbox(box, image_info["width"], image_info["height"])

    item = {
            "id": cnt_id,
            "bbox": coco_bbox,
            "category_id": category_id,
            "file_name": missing_im,
            "score": float(score),
        }
    items.append(item)
    cnt_id += 1

In [None]:
submission_output = {"annotations": items}

In [None]:
json.dump(submission_output, open("submission_fusion_20220530_01_thr={}.json".format(threshold), "w"), indent=2, sort_keys=False)