In [3]:
import json
import os
import cv2
import numpy as np
from tqdm import tqdm
import re
import shutil
from concurrent.futures import ThreadPoolExecutor


In [4]:
#train2017
#val2017
data_dir = 'val2017'
panoptic_label_dir = 'panoptic_val2017'
json_file = 'panoptic_val2017.json'
files_to_check = [panoptic_label_dir, json_file]

In [5]:
save_panoptic_label_dir_name = panoptic_label_dir + '_' + 'edited'
save_panoptic_data_dir_name = data_dir + '_' + 'edited'
save_dir = 'val_coco_dataset_ex'
save_panoptic_label_dir_path = (save_dir + '/' + save_panoptic_label_dir_name)
save_panoptic_data_dir_path = (save_dir + '/' + save_panoptic_data_dir_name)

In [7]:
print(save_panoptic_label_dir_path)
print(save_panoptic_data_dir_path)

val_coco_dataset_ex/panoptic_val2017_edited
val_coco_dataset_ex/val2017_edited


In [8]:
os.makedirs(save_dir, exist_ok=True)
os.makedirs(save_panoptic_label_dir_path, exist_ok=True)
os.makedirs(save_panoptic_data_dir_path, exist_ok=True)

In [9]:
def check_file_endings(files_to_check, expected_ending):
    for file_path in files_to_check:
        if os.path.isfile(file_path):
            result = re.search(r'_([^.]+)', os.path.basename(file_path)).group(1)
            if result != expected_ending:
                raise AssertionError(f"Error: File '{file_path}' has ending '{result}', but expected '{expected_ending}'.")
        else:
            if not file_path.endswith(expected_ending):
                actual_ending = os.path.splitext(file_path)[-1]
                raise AssertionError(f"Error: File '{file_path}' does not end with '{expected_ending}', found ending '{actual_ending}' instead.")


In [10]:
check_file_endings(files_to_check, data_dir)

In [11]:
annotations_count = []

In [12]:
with open(json_file, 'r') as f:
    data = json.load(f)
for annotation in data['annotations']:
    annotations_count.append(annotation['file_name'])

In [13]:
def check_region(zeros_slice, bbox):
    x, y, w, h = bbox
    region = zeros_slice[y:y+h, x:x+w]
    mask_zeros = zeros_slice == 255
    mask_region = region == 255
    return np.sum(mask_zeros) == np.sum(mask_region)

# check_sizes(non_human_sizes, human_sizes, non_human_masks, human_masks , mask_layers[i])
def check_sizes(no_h_sizes, h_sizes, no_h_memory, h_memory, mask_layer_i):
    # [obj1_size, obj2_size, ....]
    filtered_no_h_iou = [ np.sum(mask_layer_i == 255)/ no_h_sizes[j] for j in range(len(no_h_memory)) if no_h_memory[j]]
    # [hum1_size, hum2_size, .....]
    filtered_h_iou = [ np.sum(mask_layer_i == 255)/ h_sizes[j] for j in range(len(h_memory)) if h_memory[j]]

    max_no_h_iou = 0
    max_h_iou = 0

    # maximal ious comparison
    for no_h_size in filtered_no_h_iou:
        if no_h_size>max_no_h_iou:
            max_no_h_iou = no_h_size
    for h_size in filtered_h_iou:
        if h_size>max_h_iou:
            max_h_iou = h_size

    if max_no_h_iou > max_h_iou:
        return True
    else:
        return False

In [14]:
print(len(annotations_count))
print(len(os.listdir(data_dir)))
print(len(os.listdir(panoptic_label_dir)))

5000
5000
5000


In [None]:
def extract_segments(annotations, filename):
    human_bbox = []
    no_human_bbox = []
    obj_bbox = []
    for annotation in annotations:
        if annotation['file_name'] == filename:
            for segment in annotation['segments_info']:
                if segment['category_id'] == 1:
                    human_bbox.append(segment)
                elif 1 < segment['category_id'] <= 90:
                    obj_bbox.append(segment)
                elif segment['category_id'] >= 92:
                    no_human_bbox.append(segment)
    return human_bbox, no_human_bbox, obj_bbox

def process_file(filename):
    human_bbox, no_human_bbox, obj_bbox = extract_segments(data['annotations'], filename)
    segments = {
        1: human_bbox,  # Human segments
        2: no_human_bbox,  # Non-human segments
        3: obj_bbox,  # Object segments
    }


    im = cv2.imread(os.path.join(panoptic_label_dir, filename))
    height, width, _ = im.shape
    unique_colors = np.unique(im.reshape(-1, 3), axis=0)
    mask_layers = np.zeros((len(unique_colors), height, width), dtype=np.uint8)

    for i, color in enumerate(unique_colors):
        mask = (im == color).all(axis=2)
        mask_layers[i] = mask.astype(np.uint8) * 255

        human_sizes = np.array([s['bbox'][2] * s['bbox'][3] for s in segments[1]])
        non_human_sizes = np.array([s['bbox'][2] * s['bbox'][3] for s in segments[2]])
        object_sizes = np.array([s['bbox'][2] * s['bbox'][3] for s in segments[3]])
        human_masks = np.array([check_region(mask_layers[i], s['bbox']) for s in segments[1]])
        non_human_masks = np.array([check_region(mask_layers[i], s['bbox']) for s in segments[2]])
        object_masks = np.array([check_region(mask_layers[i], s['bbox']) for s in segments[3]])

        if not np.any(human_masks):
            mask_layers[i] = np.zeros_like(mask_layers[i])
        elif np.any(human_masks) and np.any(non_human_masks) and check_sizes(non_human_sizes, human_sizes, non_human_masks, human_masks , mask_layers[i]):
            mask_layers[i] = np.zeros_like(mask_layers[i])
        elif np.any(human_masks) and np.any(object_masks) and check_sizes(object_sizes, human_sizes, object_masks, human_masks, mask_layers[i]):
            mask_layers[i] = np.zeros_like(mask_layers[i])

    overlay_sum = np.sum(mask_layers, axis=0)
    if np.all(overlay_sum == 0):
        return
    base_name, ext = os.path.splitext(filename)
    cv2.imwrite(f"{save_panoptic_label_dir_path}/{filename}", overlay_sum)
    shutil.copy(f'{data_dir}/{base_name}.jpg', f"{save_panoptic_data_dir_path}/{base_name}.jpg")


def main():
    filenames = os.listdir(panoptic_label_dir)

    with ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
        list(tqdm(executor.map(process_file, filenames), desc="Processing files", unit="file", total=len(filenames)))

if __name__ == "__main__":
    main()


Processing files:   2%|▏         | 77/5000 [00:21<22:50,  3.59file/s]  
