In [3]:
import os
import json
from PIL import Image
import numpy as np
from pycocotools import mask
from skimage import measure
import cv2

CATEGORIES: dict[str, int] = {
    "SA": 1,
    "LI": 2,
    "RI": 3,
}

def _shift(category_id: int, fragment_id: int) -> int:
    return 10 * (category_id - 1) + fragment_id

def load_masks(path) -> tuple[np.ndarray, list[int], list[int]]:
    seg = np.array(Image.open(path))
    return seg_to_masks(seg)

def seg_to_masks(seg: np.ndarray) -> tuple[np.ndarray, list[int], list[int]]:
    """Convert a binary-encoded multi-label segmentation to masks."""
    category_ids = []
    fragment_ids = []
    masks = []
    for category_id in CATEGORIES.values():
        for fragment_id in range(1, 11):
            mask = np.right_shift(seg, _shift(category_id, fragment_id)) & 1
            if mask.sum() > 0:
                masks.append(mask.astype('uint8'))
                category_ids.append(category_id)
                fragment_ids.append(fragment_id)

    return np.array(masks), category_ids, fragment_ids


def create_coco_annotation(img_id, annotation_id, category_id, binary_mask, image_size):
    fortran_ground_truth_binary_mask = np.asfortranarray(binary_mask)
    encoded_ground_truth = mask.encode(fortran_ground_truth_binary_mask)
    ground_truth_area = mask.area(encoded_ground_truth)
    ground_truth_bounding_box = mask.toBbox(encoded_ground_truth)
    bool_mask = binary_mask > 0.5


    contours, _ = cv2.findContours(
            bool_mask.astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
        )
    # contours = measure.find_contours(binary_mask, 0.5)

    annotation = {
            "segmentation": [],
            "area": ground_truth_area.tolist(),
            "iscrowd": 0,
            "image_id": img_id,
            "bbox": ground_truth_bounding_box.tolist(),
            "category_id": category_id,
            "id": annotation_id
        }

    for contour in contours:
        contour = np.flip(contour, axis=1)
        segmentation = contour.ravel().tolist()
        annotation["segmentation"].append(segmentation)

def convert_to_coco_format(img_dir, ann_dir, output_file):
    coco_dataset = {
        "images": [],
        "annotations": [],
        "categories": []
    }

    # category_id = 1
    categories = [{"id": 1, "name": "SA"} , {'id' : 2 , 'name': 'LI'} , {'id':3 , 'name': 'RI'}]
    coco_dataset["categories"] = categories

    annotation_id = 1
    img_id = 1
    files = os.listdir(img_dir)
    files.sort()
    files = files[:50]
    for img_filename in files:
        img_path = os.path.join(img_dir, img_filename)
        img = Image.open(img_path)
        width, height = img.size
        
        image_info = {
            "file_name": img_filename,
            "height": height,
            "width": width,
            "id": img_id
        }
        coco_dataset["images"].append(image_info)
                
        binary_masks , category_ids , fragment_ids = load_masks(os.path.join(ann_dir, img_filename) )

        for binary_mask , category_id ,fragment_id in zip(binary_masks , category_ids , fragment_ids):
            annotation = create_coco_annotation(img_id, annotation_id, category_id, binary_mask, (width, height))
            # print(annotation)
            coco_dataset["annotations"].append(annotation)
            annotation_id += 1
        img_id += 1
        print(img_id)


    with open(output_file, 'w') as f:
        json.dump(coco_dataset, f, indent=4)

# Paths to your image and annotation directories
img_dir = '/scratch/dr/y.nawar/pengwin/train/input/images/x-ray/'
ann_dir = '/scratch/dr/y.nawar/pengwin/train/output/images/x-ray/'
output_file = '/scratch/dr/y.nawar/pengwin/train/coco_annotations.json'

convert_to_coco_format(img_dir, ann_dir, output_file)


2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51


In [None]:
import pengwin_utils
from PIL import Image

image_path = "/scratch/dr/y.nawar/pengwin/train/input/images/x-ray/001_0000.tif"
seg_path = "/scratch/dr/y.nawar/pengwin/train/output/images/x-ray/001_0000.tif"

# load image and masks
image = pengwin_utils.load_image(image_path) # raw intensity image
masks, category_ids, fragment_ids = pengwin_utils.load_masks(seg_path)

# save visualization of image and masks
# applies CLAHE normalization to the raw intensity image before overlaying segmentations.
vis_image = pengwin_utils.visualize_sample(image, masks, category_ids, fragment_ids)
vis_path = "vis_image.png"
Image.fromarray(vis_image).save(vis_path)
print(f"Wrote visualization to {vis_path}")

# Obtain predicted masks, category_ids, and fragment_ids
# Category IDs are {"SA": 1, "LI": 2, "RI": 3}
# Fragment IDs are the integer labels from label_{category}.nii.gz, with 1 corresponding to the main fragment.
pred_masks, pred_category_ids, pred_fragment_ids = masks, category_ids, fragment_ids # replace with your model

# save the predicted masks for upload to the challenge
# Note: cv2 does not work with uint32 images. It is recommended to use PIL or imageio.v3
pred_seg = pengwin_utils.masks_to_seg(pred_masks, pred_category_ids, pred_fragment_ids)
pred_seg_path = "pred/train/output/images/x-ray/001_0000.tif" # ensure dir exists!
Image.fromarray(pred_seg).save(pred_seg_path)
print(f"Wrote segmentation to {pred_seg_path}")

In [None]:
import os.path as osp

import mmcv

from mmengine.fileio import dump, load
from mmengine.utils import track_iter_progress


def convert_balloon_to_coco(ann_file, out_file, image_prefix):
    data_infos = load(ann_file)

    annotations = []
    images = []
    obj_count = 0
    for idx, v in enumerate(track_iter_progress(data_infos.values())):
        filename = v['filename']
        img_path = osp.join(image_prefix, filename)
        height, width = mmcv.imread(img_path).shape[:2]

        images.append(
            dict(id=idx, file_name=filename, height=height, width=width))

        for _, obj in v['regions'].items():
            assert not obj['region_attributes']
            obj = obj['shape_attributes']
            px = obj['all_points_x']
            py = obj['all_points_y']
            poly = [(x + 0.5, y + 0.5) for x, y in zip(px, py)]
            poly = [p for x in poly for p in x]

            x_min, y_min, x_max, y_max = (min(px), min(py), max(px), max(py))

            data_anno = dict(
                image_id=idx,
                id=obj_count,
                category_id=0,
                bbox=[x_min, y_min, x_max - x_min, y_max - y_min],
                area=(x_max - x_min) * (y_max - y_min),
                segmentation=[poly],
                iscrowd=0)
            annotations.append(data_anno)
            obj_count += 1

    coco_format_json = dict(
        images=images,
        annotations=annotations,
        categories=[{
            'id': 0,
            'name': 'balloon'
        }])
    dump(coco_format_json, out_file)


if __name__ == '__main__':
    convert_balloon_to_coco(ann_file='data/balloon/train/via_region_data.json',
                            out_file='data/balloon/train/annotation_coco.json',
                            image_prefix='data/balloon/train')
    convert_balloon_to_coco(ann_file='data/balloon/val/via_region_data.json',
                            out_file='data/balloon/val/annotation_coco.json',
                            image_prefix='data/balloon/val')


In [None]:
img_path = '001_002.tif'
