In [1]:
import torch
import torch.nn as nn

import albumentations as A
from albumentations.pytorch import ToTensorV2

import tqdm
import glob
import mmcv
import numpy as np
from accelerate import Accelerator
from mmdet.structures import DetDataSample

from utils import modules, custom_dataset

import json

In [2]:
accelerator = Accelerator()
device = accelerator.device

torch.set_float32_matmul_precision('high')

# Parameters

In [3]:
EPOCHS = 1
BATCH_SIZE = 1
NUM_WORKDERS = 1
VALID_RESOLUTION = 1e8
IMG_PREFIX = "data/test1/"

MODEL_CFG_CONFIG = "config.py"

test_transform = A.Compose(
    [
        A.SmallestMaxSize(max_size=VALID_RESOLUTION, p=1.),
    ]
)

MODEL_NAME = "RTMDet"
model_path = "work_dir/RTMDet_model.pt"

# DataLoader

In [4]:
class CustomDatset(torch.utils.data.Dataset):
    after_transform = A.Compose(
        [
            A.Normalize(),
            ToTensorV2()
        ],
    )
    
    def __init__(
        self,
        img_prefix="data/train/",
        transform=None,
        fill_pad_factor=32,
        fill_pad_value=114
    ):
        """ 自定义加载数据集
        
        Args:
            img_prefix (str): 图片的根目录
            transform (albumentations): 图像增强
            fill_pad_factor (int): 将宽高填充至倍数
            fill_pad_value (int): 填充值
        """
        self.images = sorted(glob.glob(f"{img_prefix}/*.jpg"))
        
        self.fill_pad_factor = fill_pad_factor
        self.fill_pad_value = fill_pad_value
        self.transform = transform
        self.len = len(self.images)
        
    def __getitem__(self, idx):
        img = self.images[idx]
        x = mmcv.imread(img, channel_order="rgb")
        
        inputs, data_sample = self.pipeline(x)
        return inputs, data_sample
    
    def pipeline(self, img, image_id=None):
        h, w, c = img.shape
        scale_factor = (1., 1.)
        if (
            h > VALID_RESOLUTION and
            w > VALID_RESOLUTION and
            self.transform is not None
        ):
            item = self.transform(image=img)
            img = item["image"]
            new_h, new_w, c = img.shape
            scale_factor = (new_h / h, new_w / w)
            h, w = new_h, new_w
            
        pad_h = h
        pad_w = w
        if (
            h % self.fill_pad_factor != 0 or
            w % self.fill_pad_factor != 0
        ):
            if h % self.fill_pad_factor != 0:
                pad_h = (h // self.fill_pad_factor + 1) * self.fill_pad_factor
                
            if w % self.fill_pad_factor != 0:
                pad_w = (w // self.fill_pad_factor + 1) * self.fill_pad_factor
            
            pad_img = np.full((pad_h, pad_w, c), self.fill_pad_value, dtype=img.dtype)
            pad_img[:h, :w] = img
            img = pad_img
            
        item = self.after_transform(image=img)
        img = item["image"]
        
        data_sample = DetDataSample()
        img_meta = dict(
            img_shape=(h, w),
            pad_shape=(pad_h, pad_w),
            scale_factor=scale_factor,
            image_id=image_id,
            keep_ratio=True,
            filp=False,
        )

        data_sample.set_metainfo(img_meta)

        return img, data_sample
        
    def __len__(self):
        return self.len

In [5]:
test_dataloader = torch.utils.data.DataLoader(
    CustomDatset(
        IMG_PREFIX,
        test_transform,
    ),
    batch_size=BATCH_SIZE,
    num_workers=NUM_WORKDERS,
    shuffle=False,
    collate_fn=custom_dataset.collate_fn
)

# Model

In [6]:
model = modules.Model(MODEL_CFG_CONFIG)
model.load_state_dict(torch.load(model_path, "cpu")["model"])
model.eval();

All Keys Matching


# Runtime

In [7]:
model, test_dataloader = accelerator.prepare(
    model, 
    test_dataloader
)

@torch.no_grad()
def test_one_epoch(model, dataloader):
    dataloader = tqdm.tqdm(dataloader, disable=not accelerator.is_main_process)
    
    model.eval()
    predict = []
    for batch in dataloader:
        out = model(batch, mode="predict")
        for data_sample in out:
            image_id = data_sample.image_id
            pred_instances = data_sample.pred_instances
            for i in range(len(pred_instances.labels)):
                bbox = pred_instances.bboxes[i].detach().cpu()
                bbox[2:] = bbox[2:] - bbox[:2]
                predict.append(
                    np.array([
                        image_id,
                        *bbox,
                        pred_instances.scores[i].detach().cpu(),
                        pred_instances.labels[i].detach().cpu()
                    ])
                )

In [None]:
results = []
dataloader = tqdm.tqdm(test_dataloader, disable=not accelerator.is_main_process)

"""
'knife': 1
'tongs': 2
'glassbottle': 3
'pressure': 4
'laptop': 5
'umbrella': 6
'metalcup': 7
'scissor': 8
lighter': 9
"""
with torch.no_grad():
    for i, batch in enumerate(dataloader):
        out = model(batch, mode="predict")
        res = []
        for data_sample in out:
            for label_id in [1, 4, 3, 8, 9, 6, 5, 2, 7]:
                r = []
                pred_instances = data_sample.pred_instances
                for i in range(len(pred_instances.labels)):
                    if pred_instances.labels[i].item() == label_id:
                        r.append([
                            b.item()
                            for b in pred_instances.bboxes[i].detach().cpu()
                        ] + [min(1, max(0.001, pred_instances.scores[i].detach().cpu().item()))]
                        )
                res.append(r)
        results.append(res)

  return _VF.meshgrid(tensors, **kwargs)  # type: ignore[attr-defined]
  5%|▌         | 59/1132 [00:07<02:09,  8.27it/s]

In [None]:
with open("results.json", "w") as f:
    json.dump(results, f)