In [1]:
# python native
import os

# external library
import cv2
import numpy as np
import pandas as pd
from tqdm.auto import tqdm

# torch
import torch
from torch.utils.data import Dataset, DataLoader

# visualization
import matplotlib.pyplot as plt

from sklearn.model_selection import GroupKFold
from prettytable import PrettyTable

from torch import nn

Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd
  from .autonotebook import tqdm as notebook_tqdm


In [2]:
from mmseg.registry import DATASETS, TRANSFORMS, MODELS, METRICS
from mmseg.datasets import BaseSegDataset
from mmseg.models.segmentors import EncoderDecoder
from mmseg.models.decode_heads import ASPPHead, FCNHead, SegformerHead 
from mmseg.models.utils.wrappers import resize


from mmengine.config import Config
from mmengine.dataset import Compose
from mmengine.runner import Runner, load_checkpoint
from mmengine.evaluator import BaseMetric
from mmengine.logging import MMLogger, print_log
from mmengine.structures import PixelData

from mmcv.transforms import BaseTransform

In [3]:
IMAGE_ROOT = "/data/ephemeral/home/level2-cv-semanticsegmentation-cv-13/data/test/DCM" # 이미지 데이터 경로설정
SAVE_DIR = "/data/ephemeral/home/level2-cv-semanticsegmentation-cv-13/model/uper" # CSV파일 저장 경로설정

In [5]:
from mmseg.apis import init_model

config_path = 'configs/_teamconfig_/upernet/config.py'
checkpoint_path = '../model/uper/iter_19200.pth'

cfg = Config.fromfile(config_path)
cfg.launcher = "none"
cfg.work_dir = os.path.join('../predictions', "upernet")

# init model and load checkpoint
model = init_model(config_path, checkpoint_path)


Loads checkpoint by local backend from path: ../model/uper/iter_19200.pth


In [6]:
from collections import defaultdict

CLASSES = [
    'finger-1', 'finger-2', 'finger-3', 'finger-4', 'finger-5',
    'finger-6', 'finger-7', 'finger-8', 'finger-9', 'finger-10',
    'finger-11', 'finger-12', 'finger-13', 'finger-14', 'finger-15',
    'finger-16', 'finger-17', 'finger-18', 'finger-19', 'Trapezium',
    'Trapezoid', 'Capitate', 'Hamate', 'Scaphoid', 'Lunate',
    'Triquetrum', 'Pisiform', 'Radius', 'Ulna',
]

CLASS2IND = {v: i for i, v in enumerate(CLASSES)}
IND2CLASS = {v: k for k, v in CLASS2IND.items()}

def _preprare_data(imgs, model):

    pipeline = Compose([
            dict(type='LoadImageFromFile'),
            # dict(type='Resize', scale=(1450, 1450)),
            # dict(type='LoadXRayAnnotations'),
            # dict(type='TransposeAnnotations'),
            dict(type='PackSegInputs')
        ])
            
    is_batch = True
    if not isinstance(imgs, (list, tuple)):
        imgs = [imgs]
        is_batch = False

    if isinstance(imgs[0], np.ndarray):
        cfg.test_pipeline[0]['type'] = 'LoadImageFromNDArray'

    # TODO: Consider using the singleton pattern to avoid building
    # a pipeline for each inference

    data = defaultdict(list)
    for img in imgs:
        if isinstance(img, np.ndarray):
            data_ = dict(img=img)
        else:
            data_ = dict(img_path=img)
        data_ = pipeline(data_)
        data['inputs'].append(data_['inputs'])
        data['data_samples'].append(data_['data_samples'])

    return data, is_batch

# define colors
PALETTE = [
    (220, 20, 60), (119, 11, 32), (0, 0, 142), (0, 0, 230), (106, 0, 228),
    (0, 60, 100), (0, 80, 100), (0, 0, 70), (0, 0, 192), (250, 170, 30),
    (100, 170, 30), (220, 220, 0), (175, 116, 175), (250, 0, 30), (165, 42, 42),
    (255, 77, 255), (0, 226, 252), (182, 182, 255), (0, 82, 0), (120, 166, 157),
    (110, 76, 0), (174, 57, 255), (199, 100, 0), (72, 0, 118), (255, 179, 240),
    (0, 125, 92), (209, 0, 151), (188, 208, 182), (0, 220, 176),
]

# utility function
# this does not care overlap
def label2rgb(label):
    image_size = label.shape[1:] + (3, )
    image = np.zeros(image_size, dtype=np.uint8)
    
    for i, class_label in enumerate(label):
        image[class_label == 1] = PALETTE[i]
        
    return image

In [7]:
pngs = {
    os.path.relpath(os.path.join(root, fname), start=IMAGE_ROOT)
    for root, _dirs, files in os.walk(IMAGE_ROOT)
    for fname in files
    if os.path.splitext(fname)[1].lower() == ".png"
}

In [8]:
class XRayInferenceDataset(Dataset):
    def __init__(self, transforms=None):
        _filenames = pngs
        _filenames = np.array(sorted(_filenames))
        
        self.filenames = _filenames
        # self.transforms = transforms
    
    def __len__(self):
        return len(self.filenames)
    
    def __getitem__(self, item):
        image_name = self.filenames[item]
        image_path = os.path.join(IMAGE_ROOT, image_name)
        image = cv2.imread(image_path)
            
        return image, image_name , image_path

In [9]:
def encode_mask_to_rle(mask):
    '''
    mask: numpy array binary mask 
    1 - mask 
    0 - background
    Returns encoded run length 
    '''
    pixels = mask.flatten()
    pixels = np.concatenate([[0], pixels, [0]])
    runs = np.where(pixels[1:] != pixels[:-1])[0] + 1
    runs[1::2] -= runs[::2]
    return ' '.join(str(x) for x in runs)

def decode_rle_to_mask(rle, height, width):
    s = rle.split()
    starts, lengths = [np.asarray(x, dtype=int) for x in (s[0:][::2], s[1:][::2])]
    starts -= 1
    ends = starts + lengths
    img = np.zeros(height * width, dtype=np.uint8)
    
    for lo, hi in zip(starts, ends):
        img[lo:hi] = 1
    
    return img.reshape(height, width)

def test(model, data_loader, thr=0.5):
    rles = []
    filename_and_class = []
    with torch.no_grad():
        n_class = len(CLASSES)

        for step, (images, image_names, image_path) in tqdm(enumerate(data_loader), total=len(data_loader)):
            # images = images.cuda()    
            images = image_path
            data, is_batch = _preprare_data(images, model)
            results = model.test_step(data)
            results = results[0].pred_sem_seg.data
            results = (results > thr).detach().cpu().numpy()
            results = results.astype(np.uint8)
            results = results.reshape(1,29,2048,2048)
            # print(results.shape)
            
            for output, image_name in zip(results, image_names):
                for c, segm in enumerate(output):
                    rle = encode_mask_to_rle(segm)
                    rles.append(rle)
                    filename_and_class.append(f"{IND2CLASS[c]}_{image_name}")
                    
    return rles, filename_and_class

In [10]:
test_dataset = XRayInferenceDataset(transforms=None)

test_loader = DataLoader(
    dataset=test_dataset, 
    batch_size=1,
    shuffle=False,
    num_workers=2,
    drop_last=False
)

In [11]:
rles, filename_and_class = test(model, test_loader)

100%|██████████| 288/288 [07:48<00:00,  1.63s/it]


In [12]:
classes, filename = zip(*[x.split("_") for x in filename_and_class])

image_name = [os.path.basename(f) for f in filename]

df = pd.DataFrame(
    {
        "image_name": image_name,
        "class": classes,
        "rle": rles,
    }
)

df.to_csv(os.path.join(SAVE_DIR, "output.csv"), index=False)