## Panoptic Segmentation task
#### - PyTorch 및 detectron2-oneformer 모델 구현
#### - 이미지에서 객체 검출 및 분할 수행, COCO 포맷으로 변환

In [1]:
import warnings
warnings.filterwarnings("ignore", message="...")

In [2]:
cd

/Users/sm


In [3]:
cd myenv4/OneFormer/

/Users/sm/myenv4/OneFormer


In [4]:
import detectron2
from detectron2.utils.logger import setup_logger
setup_logger()
setup_logger(name="oneformer")

import numpy as np
import cv2
import torch
import imutils
from detectron2.config import get_cfg
from detectron2.projects.deeplab import add_deeplab_config
from detectron2.data import MetadataCatalog

from oneformer import (
    add_oneformer_config,
    add_common_config,
    add_swin_config,
    add_dinat_config,
    add_convnext_config,
)

In [5]:
SWIN_CFG_DICT = {"cityscapes": "configs/cityscapes/oneformer_swin_large_IN21k_384_bs16_90k.yaml",
            "coco": "configs/coco/oneformer_swin_large_IN21k_384_bs16_100ep.yaml",
            "ade20k": "configs/ade20k/oneformer_swin_large_IN21k_384_bs16_160k.yaml",}

DINAT_CFG_DICT = {"cityscapes": "configs/cityscapes/oneformer_dinat_large_bs16_90k.yaml",
            "coco": "configs/coco/oneformer_dinat_large_bs16_100ep.yaml",
            "ade20k": "configs/ade20k/oneformer_dinat_large_IN21k_384_bs16_160k.yaml",}

def setup_cfg(dataset, model_path, use_swin):
    # load config from file and command-line arguments
    cfg = get_cfg()
    add_deeplab_config(cfg)
    add_common_config(cfg)
    add_swin_config(cfg)
    add_dinat_config(cfg)
    add_convnext_config(cfg)
    add_oneformer_config(cfg)
    if use_swin:
      cfg_path = SWIN_CFG_DICT[dataset]
    else:
      cfg_path = DINAT_CFG_DICT[dataset]
    cfg.merge_from_file(cfg_path)
    cfg.MODEL.DEVICE = 'cpu'
    cfg.MODEL.WEIGHTS = model_path
    cfg.freeze()
    return cfg

def setup_modules(dataset, model_path, use_swin):
    cfg = setup_cfg(dataset, model_path, use_swin)
    predictor = DefaultPredictor(cfg)
    metadata = MetadataCatalog.get(
        cfg.DATASETS.TEST_PANOPTIC[0] if len(cfg.DATASETS.TEST_PANOPTIC) else "__unused"
    )
    if 'cityscapes_fine_sem_seg_val' in cfg.DATASETS.TEST_PANOPTIC[0]:
        from cityscapesscripts.helpers.labels import labels
        stuff_colors = [k.color for k in labels if k.trainId != 255]
        metadata = metadata.set(stuff_colors=stuff_colors)
    
    return predictor, metadata

def panoptic_run(img, predictor, metadata):
    visualizer = Visualizer(img[:, :, ::-1], metadata=metadata, instance_mode=ColorMode.IMAGE)
    predictions = predictor(img, "panoptic")
    panoptic_seg, segments_info = predictions["panoptic_seg"]
    out = visualizer.draw_panoptic_seg_predictions(
    panoptic_seg.to(cpu_device), segments_info, alpha=0.5
)
    return out

def instance_run(img, predictor, metadata):
    visualizer = Visualizer(img[:, :, ::-1], metadata=metadata, instance_mode=ColorMode.IMAGE)
    predictions = predictor(img, "instance")
    instances = predictions["instances"].to(cpu_device)
    out = visualizer.draw_instance_predictions(predictions=instances, alpha=0.5)
    return out

def semantic_run(img, predictor, metadata):
    visualizer = Visualizer(img[:, :, ::-1], metadata=metadata, instance_mode=ColorMode.IMAGE)
    predictions = predictor(img, "semantic")
    out = visualizer.draw_sem_seg(
        predictions["sem_seg"].argmax(dim=0).to(cpu_device), alpha=0.5
    )
    return out

TASK_INFER = {"panoptic": panoptic_run, 
              "instance": instance_run, 
              "semantic": semantic_run}

In [6]:
from demo.defaults import DefaultPredictor
from demo.visualizer import Visualizer, ColorMode

In [7]:
use_swin = False
import os
import subprocess
if not use_swin:
  if not os.path.exists("250_16_dinat_l_oneformer_cityscapes_90k.pth"):
    subprocess.run('wget https://shi-labs.com/projects/oneformer/cityscapes/250_16_dinat_l_oneformer_cityscapes_90k.pth', shell=True)
  predictor, metadata = setup_modules("cityscapes", "250_16_dinat_l_oneformer_cityscapes_90k.pth", use_swin)
else:
  if not os.path.exists("250_16_swin_l_oneformer_cityscapes_90k.pth"):
    subprocess.run('wget https://shi-labs.com/projects/oneformer/cityscapes/250_16_swin_l_oneformer_cityscapes_90k.pth', shell=True)
  predictor, metadata = setup_modules("cityscapes", "250_16_swin_l_oneformer_cityscapes_90k.pth", use_swin)

Loading config configs/cityscapes/Base-Cityscapes-UnifiedSegmentation.yaml with yaml.unsafe_load. Your machine may be at risk if the file contains malicious content.


[32m[03/28 15:48:09 d2.checkpoint.detection_checkpoint]: [0m[DetectionCheckpointer] Loading from 250_16_dinat_l_oneformer_cityscapes_90k.pth ...


The checkpoint state_dict contains keys that are not used by the model:
  [35mtext_encoder.positional_embedding[0m
  [35mtext_encoder.transformer.resblocks.0.attn.{in_proj_bias, in_proj_weight}[0m
  [35mtext_encoder.transformer.resblocks.0.attn.out_proj.{bias, weight}[0m
  [35mtext_encoder.transformer.resblocks.0.ln_1.{bias, weight}[0m
  [35mtext_encoder.transformer.resblocks.0.mlp.c_fc.{bias, weight}[0m
  [35mtext_encoder.transformer.resblocks.0.mlp.c_proj.{bias, weight}[0m
  [35mtext_encoder.transformer.resblocks.0.ln_2.{bias, weight}[0m
  [35mtext_encoder.transformer.resblocks.1.attn.{in_proj_bias, in_proj_weight}[0m
  [35mtext_encoder.transformer.resblocks.1.attn.out_proj.{bias, weight}[0m
  [35mtext_encoder.transformer.resblocks.1.ln_1.{bias, weight}[0m
  [35mtext_encoder.transformer.resblocks.1.mlp.c_fc.{bias, weight}[0m
  [35mtext_encoder.transformer.resblocks.1.mlp.c_proj.{bias, weight}[0m
  [35mtext_encoder.transformer.resblocks.1.ln_2.{bias, weight}[

In [8]:
CITYSCAPES_CLASSES_SETTING = {
    "unlabeled": 255,
    "ego vehicle": 255,
    "rectification border": 255,
    "out of roi": 255,
    "static": 255,
    "dynamic": 255,
    "ground": 255,
    "road": 0,
    "sidewalk": 1,
    "parking": 255,
    "rail track": 255,
    "building": 2,
    "wall": 3,
    "fence": 4,
    "guard rail": 255,
    "bridge": 255,
    "tunnel": 255,
    "pole": 5,
    "polegroup": 255,
    "traffic light": 6,
    "traffic sign": 7,
    "vegetation": 8,
    "terrain": 9,
    "sky": 10,
    "person": 11,
    "rider": 12,
    "car": 13,
    "truck": 14,
    "bus": 15,
    "caravan": 255,
    "trailer": 255,
    "train": 16,
    "motorcycle": 17,
    "bicycle": 18,
    "license plate": -1
}

### 진행사항
##### 1. oneformer output(predictions, json file) -> coco json file format 변환
##### 2. json dump -> segmentation 필드(tensor 정보 등)
##### 3. semantic segmentation, annotation -> sem_seg 필드에
##### 4. instance segmentation, annotation -> instances 필드에
##### 5. semantic + instance 정보 합쳐서 annotations 필드에 포함

In [10]:
from pycocotools import mask as coco_mask

import json
import numpy as np

input_path = "samples/aachen/"
output_path = "./output_json/"
os.makedirs(output_path, exist_ok=True)

img_files = os.listdir(input_path)
img_files.sort()

for filename in os.listdir(input_path):
    if filename.endswith(".jpg") or filename.endswith(".jpeg") or filename.endswith(".png"):
        input_filepath = os.path.join(input_path, filename)
        img = cv2.imread(input_filepath)
        img = imutils.resize(img, width=512)
        
        # 이미지에 대한 예측 결과 얻음 (predictions)
        task = "panoptic"
        cpu_device = torch.device('cpu')
        predictions = predictor(img, task)
        
        # 3 field
        annotations = []
        categories = []
        images = []
        
        # Create annotation
        annotation = {}
        instances = predictions["instances"]
        masks = instances.pred_masks
        boxes = instances.pred_boxes.tensor
        scores = instances.scores
        classes = instances.pred_classes
        size_list = []
        num_instances = len(predictions["panoptic_seg"][1])
        
        # semantic segmentation
        # sem_seg: 모든 픽셀에 대해 해당 픽셀에 어떤 카테고리에 속하는 나타내는 이미지
        sem_seg = predictions["sem_seg"].argmax(dim=0).cpu().numpy()
        # np.unique(sem_seg): 이미지에 있는 모든 카테고리
        for sem_seg_val in np.unique(sem_seg):
            # 카테고리의 마스크 추출 -> coco 형식의 RLE(Run Length Encoding) 포맷으로 변환
            mask = (sem_seg == sem_seg_val).astype(np.uint8)
            # 이진 마스크 인코딩
            binary_mask = np.expand_dims(mask, axis=-1)
            rle = coco_mask.encode(np.asfortranarray(binary_mask))
            # mask 넓이
            area = float(np.sum(binary_mask))
            # mask 둘러싸는 bounding box
            bbox = cv2.boundingRect(mask)
            x, y, w, h = bbox
            bbox_list = list([x,y,w,h])
            counts_decoded = rle[0]['counts'].decode('utf-8')
            # 디코딩된 mask를 coco 포맷으로 변환
            counts = ''.join(counts_decoded.split())
            category_id = None
            for i, seg_info in enumerate(predictions['panoptic_seg'][1]):
                if seg_info['category_id'] == sem_seg_val:
                    category_id = seg_info['category_id']
                    break
            if category_id is None:
                continue
            segment_field = {
                'counts': counts,
                'size': [rle[0]['size'][0], rle[0]['size'][1]]
            }
            annotation = {
                "area": area,
                "category_id": int(category_id),
                "id": int(sem_seg_val),
                "bbox": bbox_list,
                "image_id": 0,
                "is_crowd": 0,
                "segmentation": segment_field
            }
            annotations.append(annotation)

        # instance segmentation
        # num_instances: 예측된 인스턴스 수
        for j in range(num_instances):
            categoryid = predictions["panoptic_seg"][1][j]
            category_id = int(categoryid["category_id"])
            if len(masks) <= j:
                continue
            mask = np.asarray(masks[j].cpu().numpy(), dtype=np.uint8)
            binary_mask = np.zeros((mask.shape[0], mask.shape[1], 1), dtype=np.uint8)
            binary_mask[mask >0.5] = 1
            rle = coco_mask.encode(np.asfortranarray(binary_mask))
            area = float(np.sum(binary_mask))
            bbox = boxes[j].tolist()
            bbox[2] -= bbox[0]
            bbox[3] -= bbox[1]
            annotation = {
                "category_id": int(category_id),
                "id": j + len(np.unique(sem_seg)),
                "bbox": bbox,
                "image_id": 0,
                "is_crowd": 0
            }
            # 인스턴스 세그멘테이션 정보가 있는 경우
            if instances.has("pred_boxes") and len(instances.pred_boxes) > j:
                counts_decoded = rle[0]['counts'].decode('utf-8')
                counts = ''.join(counts_decoded.split())
                segment_field = {
                    'counts': counts,
                    'size': [rle[0]['size'][0], rle[0]['size'][1]]
                }
                annotation["segmentation"] = segment_field
            if len(scores) > j:
                annotation["score"] = float(scores[j])
            else:
                annotation["score"] = 0
            if len(classes) > j:
                annotation["category_id"] = int(classes[j])
            annotations.append(annotation)

        # Create categories            
        for key, value in CITYSCAPES_CLASSES_SETTING.items():
            categories.append({'id': value, 'name': key})
            
        # Create images
        image = {}
        image["id"] = 0
        image["width"] = int(predictions["instances"].image_size[1])
        image["height"] = int(predictions["instances"].image_size[0])
        image["file_name"] = filename
        images.append(image)

        # Create COCO JSON dictionary
        coco_dict = {}
        coco_dict["annotations"] = annotations
        coco_dict["categories"] = categories
        coco_dict["images"] = images

        output_filename = f"{filename.split('.')[0]}_oneformer_convert_coco.json"
        output_filepath = os.path.join(output_path, output_filename)

        with open(output_filepath, "w", encoding="utf-8") as f:
            json.dump(coco_dict, f)

        print(f"Output coco json saved at {output_filepath}")

Output coco json saved at ./output_json/aachen_1_oneformer_convert_coco.json
Output coco json saved at ./output_json/aachen_0_oneformer_convert_coco.json
Output coco json saved at ./output_json/aachen_2_oneformer_convert_coco.json
Output coco json saved at ./output_json/aachen_3_oneformer_convert_coco.json


### Update
##### 1. 각 이미지에 대한 고유 식별자인 "image_id" 지정
##### 2. input 폴더에 있는 모든 이미지에 대한 annotation 결과를 하나의 json 파일에 저장
##### 3. import coco json file (1 file) -> 모든 이미지 한 번에 annotation 가능

In [11]:
from pycocotools import mask as coco_mask

import json
import numpy as np

input_path = "samples/aachen/"
output_path = "./output_json/"
os.makedirs(output_path, exist_ok=True)

img_files = os.listdir(input_path)
img_files.sort()

annotations = []
categories = []
images = []
image_id = 1

for i, filename in enumerate(os.listdir(input_path)):
    if filename.endswith(".jpg") or filename.endswith(".jpeg") or filename.endswith(".png"):
        input_filepath = os.path.join(input_path, filename)
        img = cv2.imread(input_filepath)
        img = imutils.resize(img, width=512)
        task = "panoptic"
        cpu_device = torch.device('cpu')
        predictions = predictor(img, task)
        
        annotation = {}
        instances = predictions["instances"]
        masks = instances.pred_masks
        boxes = instances.pred_boxes.tensor
        scores = instances.scores
        classes = instances.pred_classes
        size_list = []
        num_instances = len(predictions["panoptic_seg"][1])
        
        sem_seg = predictions["sem_seg"].argmax(dim=0).cpu().numpy()
        for sem_seg_val in np.unique(sem_seg):
            mask = (sem_seg == sem_seg_val).astype(np.uint8)
            binary_mask = np.expand_dims(mask, axis=-1)
            rle = coco_mask.encode(np.asfortranarray(binary_mask))
            area = float(np.sum(binary_mask))
            bbox = cv2.boundingRect(mask)
            x, y, w, h = bbox
            bbox_list = list([x,y,w,h])
            counts_decoded = rle[0]['counts'].decode('utf-8')
            counts = ''.join(counts_decoded.split())
            category_id = None
            for i, seg_info in enumerate(predictions['panoptic_seg'][1]):
                if seg_info['category_id'] == sem_seg_val:
                    category_id = seg_info['category_id']
                    break
            if category_id is None:
                continue
            segment_field = {
                'counts': counts,
                'size': [rle[0]['size'][0], rle[0]['size'][1]]
            }
            annotation = {
                "area": area,
                "category_id": int(category_id),
                "id": int(sem_seg_val),
                "bbox": bbox_list,
                "image_id": image_id,
                "is_crowd": 0,
                "segmentation": segment_field
            }
            annotations.append(annotation)

        for j in range(num_instances):
            categoryid = predictions["panoptic_seg"][1][j]
            category_id = int(categoryid["category_id"])
            if len(masks) <= j:
                continue
            mask = np.asarray(masks[j].cpu().numpy(), dtype=np.uint8)
            binary_mask = np.zeros((mask.shape[0], mask.shape[1], 1), dtype=np.uint8)
            binary_mask[mask >0.5] = 1
            rle = coco_mask.encode(np.asfortranarray(binary_mask))
            area = float(np.sum(binary_mask))
            bbox = boxes[j].tolist()
            bbox[2] -= bbox[0]
            bbox[3] -= bbox[1]
            annotation = {
                "category_id": int(category_id),
                "id": j + len(np.unique(sem_seg)),
                "bbox": bbox,
                "image_id": image_id,
                "is_crowd": 0
            }
            if instances.has("pred_boxes") and len(instances.pred_boxes) > j:
                counts_decoded = rle[0]['counts'].decode('utf-8')
                counts = ''.join(counts_decoded.split())
                segment_field = {
                    'counts': counts,
                    'size': [rle[0]['size'][0], rle[0]['size'][1]]
                }
                annotation["segmentation"] = segment_field
            if len(scores) > j:
                annotation["score"] = float(scores[j])
            else:
                annotation["score"] = 0
            if len(classes) > j:
                annotation["category_id"] = int(classes[j])
            annotations.append(annotation)
        
        image = {}
        image["id"] = image_id
        image["width"] = int(predictions["instances"].image_size[1])
        image["height"] = int(predictions["instances"].image_size[0])
        image["file_name"] = filename
        images.append(image)
        image_id += 1

for key, value in CITYSCAPES_CLASSES_SETTING.items():
    categories.append({'id': value, 'name': key})

coco_dict = {}
coco_dict["annotations"] = annotations
coco_dict["categories"] = categories
coco_dict["images"] = images

output_filename = "oneformer_final_coco.json"
output_filepath = os.path.join(output_path, output_filename)

with open(output_filepath, "w", encoding="utf-8") as f:
    json.dump(coco_dict, f)

print(f"Output coco json saved at {output_filepath}")

Output coco json saved at ./output_json/oneformer_final_coco.json


#### annotation 3가지 유형
##### 1. Semantic Segmentation
###### : 인스턴스 ID 예측, 모든 픽셀을 고유한 세그먼트로 분류, 각 세그먼트에 대한 정보 기록
##### 2. Detection
###### : 물체의 경계 상자와 해당 물체의 클래스 예측
##### 3. Panoptic Segmentation
###### : Detection 및 Semantic Segmentation 결합, 픽셀의 인스턴스 ID와 모든 픽셀이 속한 세그먼트 식별
