In [None]:
from typing import List, Mapping, Union, Any

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision.ops import nms

import numpy as np
from tqdm.auto import tqdm
from PIL import Image, ImageDraw

def num_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

In [None]:
!pip install albumentations

In [8]:
import os
import glob
import json
from PIL import Image
from datasets import Dataset
from pycocotools.coco import COCO


subsets = [
    path for path in glob.glob("testdata/*") if os.path.isdir(path)
]

def coco2dataset():
    with open("train.json", "r", encoding="utf-8") as f:
        datas = json.load(f)

    for idx, data in enumerate(datas):
        # image = Image.open(f"testdata/{img['file_name']}")
        image = Image.open(data["image"])

        yield {
            "image_id": idx,
            "width": data['width'],
            "height": data['height'],
            "objects": data["objects"],
            "image": image,
        }

categories = ["crack", "pothole"]
id2label = {index: x for index, x in enumerate(categories, start=0)}
label2id = {v: k for k, v in id2label.items()}
dataset = Dataset.from_generator(coco2dataset)

In [None]:
from transformers import AutoImageProcessor, AutoModelForObjectDetection, get_cosine_schedule_with_warmup


model_name_or_path = "PekingU/rtdetr_r50vd"
image_processor = AutoImageProcessor.from_pretrained(
    model_name_or_path,
    do_resize=True,
)

model = AutoModelForObjectDetection.from_pretrained(
    model_name_or_path,
    id2label=id2label,
    label2id=label2id,
    anchor_image_size=None,
    ignore_mismatched_sizes=True,
)

In [10]:
import albumentations as A


max_size = 640
train_augmentation_and_transform = A.Compose(
    [
        A.ShiftScaleRotate(p=0.3),
        A.Compose(
            [
                A.SmallestMaxSize(max_size=max_size, p=1.0),
                A.RandomSizedBBoxSafeCrop(height=max_size, width=max_size, p=1.0),
            ],
            p=0.2,
        ),
        A.OneOf(
            [
                A.Blur(blur_limit=7, p=0.5),
                A.MotionBlur(blur_limit=7, p=0.5),
                A.Defocus(radius=(1, 5), alias_blur=(0.1, 0.25), p=0.1),
            ],
            p=0.2,
        ),
        A.Perspective(p=0.2),
        A.HorizontalFlip(p=0.5),
        A.VerticalFlip(p=0.5),
        A.RandomBrightnessContrast(p=0.5),
        A.HueSaturationValue(p=0.1),
        A.CLAHE(p=0.1),
        A.RGBShift(p=0.1),
        A.ChannelShuffle(p=0.1),
        A.Emboss(p=0.1),
    ],
    bbox_params=A.BboxParams(format="coco", label_fields=["category"], clip=True, min_area=25),
)


# to make sure boxes are clipped to image size and there is no boxes with area < 1 pixel
validation_transform = A.Compose(
    [A.NoOp()],
    bbox_params=A.BboxParams(format="coco", label_fields=["category"], clip=True, min_area=1),
)

In [11]:
for i in [0]:
    image = dataset[i]["image"]
    annotations = dataset[i]["objects"]

    # Apply the augmentation
    output = train_augmentation_and_transform(image=np.array(image), bboxes=annotations["bbox"], category=annotations["category"])

    # Unpack the output
    image = Image.fromarray(output["image"])
    categories, boxes = output["category"], output["bboxes"]

    # Draw the augmented image
    draw = ImageDraw.Draw(image)
    for category, box in zip(categories, boxes):
        x, y, w, h = box
        draw.rectangle((x, y, x + w, y + h), outline="red", width=1)
        draw.text((x, y), id2label[category], fill="fuchsia")
    image.show()

In [12]:
from torch.utils.data import Dataset, DataLoader


class CPPE5Dataset(Dataset):
    def __init__(self, dataset, image_processor, transform=None):
        self.dataset = dataset
        self.image_processor = image_processor
        self.transform = transform

    @staticmethod
    def format_image_annotations_as_coco(image_id, categories, boxes):
        annotations = []
        for category, bbox in zip(categories, boxes):
            formatted_annotation = {
                "image_id": image_id,
                "category_id": category,
                "bbox": list(bbox),
                "iscrowd": 0,
                "area": bbox[2] * bbox[3],
            }
            annotations.append(formatted_annotation)

        return {
            "image_id": image_id,
            "annotations": annotations,
        }

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        sample = self.dataset[idx]

        image_id = sample["image_id"]
        image = sample["image"]
        boxes = sample["objects"]["bbox"]
        categories = sample["objects"]["category"]

        # Convert image to RGB numpy array
        image = np.array(image.convert("RGB"))

        # Apply augmentations
        if self.transform:
            transformed = self.transform(image=image, bboxes=boxes, category=categories)
            image = transformed["image"]
            boxes = transformed["bboxes"]
            categories = transformed["category"]

        # Format annotations in COCO format for image_processor
        formatted_annotations = self.format_image_annotations_as_coco(image_id, categories, boxes)

        # Apply the image processor transformations: resizing, rescaling, normalization
        result = self.image_processor(
            images=image, annotations=formatted_annotations, return_tensors="pt"
        )
        result = {k: v[0] for k, v in result.items()}

        return result
    

def collate_fn(batch: list) -> Mapping[str, Union[torch.Tensor, List[Any]]]:
    data = {}
    data["pixel_values"] = torch.stack([x["pixel_values"] for x in batch])
    data["labels"] = [x["labels"] for x in batch]
    if "pixel_mask" in batch[0]:
        data["pixel_mask"] = torch.stack([x["pixel_mask"] for x in batch])
    return data
    

train_dataset = CPPE5Dataset(dataset, image_processor, transform=train_augmentation_and_transform)
train_loader = DataLoader(train_dataset, batch_size=12, shuffle=True, collate_fn=collate_fn)

In [None]:
model = model.to(device)

optimizer = optim.AdamW([
    {"params": [p for n, p in model.named_parameters() if "backbone" not in n], "lr": 1e-4},
    {"params": [p for n, p in model.named_parameters() if "backbone" in n], "lr": 1e-5},
])
scheduler = get_cosine_schedule_with_warmup(
    optimizer, num_warmup_steps=500, num_training_steps=10000
)
scaler = torch.cuda.amp.GradScaler()

In [None]:
cur_step = 0
bpar = tqdm(total=10000)
for epoch in range(300):
    for data in train_loader:
        data = {k: v.to(device) if isinstance(v, torch.Tensor) else v for k, v in data.items()}
        data["labels"] = [{k: v.to(device) for k, v in labels.items()} for labels in data["labels"]]

        with torch.cuda.amp.autocast():
            outputs = model(**data)
            loss = outputs["loss"]
        
        # loss.backward()
        # optimizer.step()
        # optimizer.zero_grad()
        # scheduler.step()

        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
        optimizer.zero_grad()
        scheduler.step()
        
        cur_step += 1
        if cur_step % 10 == 0:
            print(f"Epoch {epoch}, Loss: {loss.item()}")

        bpar.update(1)

In [15]:
model.save_pretrained("ckpt_rtdetr_r50vd_1epoch")

In [None]:
from typing import List, Mapping, Union, Any
import os
import json
import glob

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision.ops import nms

import numpy as np
from tqdm.auto import tqdm
from PIL import Image, ImageDraw

from transformers import AutoImageProcessor, AutoModelForObjectDetection, get_cosine_schedule_with_warmup

def num_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

model_name_or_path = "PekingU/rtdetr_r50vd"
image_processor = AutoImageProcessor.from_pretrained(
    model_name_or_path,
    do_resize=True,
)

model = AutoModelForObjectDetection.from_pretrained(
    "ckpt_rtdetr_r50vd_75epoch",
).eval().to(device)

In [None]:
image_files = glob.glob("data/test/images/*.jpg")
print(f"Found {len(image_files)} images")

In [None]:
batch_size = 8

submission = []
for idx in tqdm(range(0, len(image_files), batch_size)):
    # image = Image.open(image_file)

    batch_images = image_files[idx:idx+batch_size]
    images = [Image.open(image_file) for image_file in batch_images]

    # Preprocess the image and the annotations
    inputs = image_processor(images=images, return_tensors="pt")
    inputs = {k: v.to(device) for k, v in inputs.items()}

    # Perform inference
    with torch.no_grad():
        outputs = model(**inputs)

    # Post process the results
    results = image_processor.post_process_object_detection(outputs, target_sizes=torch.tensor([image.size[::-1] for image in images]), threshold=0.2)

    # Draw boxes on the image
    # draw = ImageDraw.Draw(image)
    # for score, label, box in zip(result["scores"], result["labels"], result["boxes"]):
    #     box = [round(i, 2) for i in box.tolist()]
    #     draw.rectangle(box, outline="red", width=3)
    #     draw.text((box[0], box[1]), f"{model.config.id2label[label.item()]}: {round(score.item(), 3)}", fill="red")
    # image.show()
    
    for image_file, result in zip(batch_images, results):
        labels = []

        keep = nms(result["boxes"], result["scores"], iou_threshold=0.5)
        for ridx, (score, label, box) in enumerate(zip(result["scores"], result["labels"], result["boxes"])):
            if ridx not in keep:
                continue

            x1, y1, x2, y2 = box
            w = x2 - x1
            h = y2 - y1
            
            xc = x1 + w / 2
            yc = y1 + h / 2

            xc = float(xc / image.size[0])
            yc = float(yc / image.size[1])
            w = float(w / image.size[0])
            h = float(h / image.size[1])

            labels.append({"class_id": label.item(), "conf": score.item(), "x": xc, "y": yc, "w": w, "h": h})
            print(labels)
        image_id = os.path.basename(image_file).split(".")[0]
        submission.append({"id": image_id, "labels": labels})


In [40]:
import pandas as pd

df_submission = pd.DataFrame(submission)
df_submission.to_csv("205submission.csv", index=False)

In [None]:
df_submission

In [None]:
[
    len(s["labels"]) for s in submission
]

In [None]:
result

In [None]:
{
    k:v.shape for k, v in inputs.items()
}

In [None]:
results

In [None]:
submission = []

for i, row in tqdm(df_test.iterrows(), total=len(df_test)):
    image_id = row["id"]
    labels = []
    for label in row["labels"]:
        class_id = label["class_id"]
        x = label["x"]
        y = label["y"]
        w = label["w"]
        h = label["h"]
        conf = label["conf"]

        labels.append({"class_id": class_id, "conf": conf, "x": x, "y": y, "w": w, "h": h})
    submission.append({"id": image_id, "labels": labels})

df_submission = pd.DataFrame(submission)
df_submission.to_csv("submission.csv", index=False)