# **DEtection TRansformer**

Postup vytvorenia modelu DETR v tomto notebooku je založený na <a href="https://github.com/roboflow/notebooks/blob/main/notebooks/train-huggingface-detr-on-custom-dataset.ipynb">originálnom notebooku.</a>



### Príprava prostredia

In [None]:
import os
HOME = os.getcwd()
print(HOME)

In [None]:
!pip install supervision==0.3.0
!pip install -q transformers
!pip install -q pytorch-lightning
!pip install -q roboflow
!pip install -q timm
!pip install pycocotools

In [None]:
import torch
!nvcc --version
TORCH_VERSION = ".".join(torch.__version__.split(".")[:2])
CUDA_VERSION = torch.__version__.split("+")[-1]
print("torch: ", TORCH_VERSION, "; cuda: ", CUDA_VERSION)

import roboflow
import supervision
import transformers
import pytorch_lightning

print(
    #"roboflow:", roboflow.__version__,
    "; supervision:", supervision.__version__,
    "; transformers:", transformers.__version__,
    "; pytorch_lightning:", pytorch_lightning.__version__
)

### Načítanie predtrénovaného modelu

In [None]:
import torch
from transformers import DetrForObjectDetection, DetrImageProcessor


# settings
DEVICE = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
CHECKPOINT = 'facebook/detr-resnet-50'
CONFIDENCE_TRESHOLD = 0.5
IOU_TRESHOLD = 0.8
num_classes = 1

image_processor = DetrImageProcessor.from_pretrained(CHECKPOINT)
model = DetrForObjectDetection.from_pretrained(CHECKPOINT, num_labels=num_classes, ignore_mismatched_sizes=True)
model.to(DEVICE)

### Príprava dát 

In [None]:
import os
import torchvision

ANNOTATION_FILE_NAME = "_annotations.coco.json" # Názov COCO anotácií v súbore .json

# Cesty jednotlivým obrázkom množín datasetu
TRAIN_DIRECTORY = "../dataset/train/images/"
VAL_DIRECTORY = "../dataset/val/images/"
TEST_DIRECTORY = "../dataset/TP_test/images"

class CocoDetection(torchvision.datasets.CocoDetection):
    def __init__(
        self,
        image_directory_path: str,
        image_processor,
        train: bool = True
    ):
        annotation_file_path = os.path.join(image_directory_path, ANNOTATION_FILE_NAME)
        super(CocoDetection, self).__init__(image_directory_path, annotation_file_path)
        self.image_processor = image_processor

    def __getitem__(self, idx):
        images, annotations = super(CocoDetection, self).__getitem__(idx)
        image_id = self.ids[idx]
        annotations = {'image_id': image_id, 'annotations': annotations}
        encoding = self.image_processor(images=images, annotations=annotations, return_tensors="pt")
        pixel_values = encoding["pixel_values"].squeeze()
        target = encoding["labels"][0]

        return pixel_values, target


TRAIN_DATASET = CocoDetection(
    image_directory_path=TRAIN_DIRECTORY,
    image_processor=image_processor,
    train=True)
VAL_DATASET = CocoDetection(
    image_directory_path=VAL_DIRECTORY,
    image_processor=image_processor,
    train=False)
TEST_DATASET = CocoDetection(
    image_directory_path=TEST_DIRECTORY,
    image_processor=image_processor,
    train=False)

print("Number of training examples:", len(TRAIN_DATASET))
print("Number of validation examples:", len(VAL_DATASET))
print("Number of test examples:", len(TEST_DATASET))

### Nastavenie konfigurácií pre tréning

In [None]:
from torch.utils.data import DataLoader

def collate_fn(batch):
    # DETR authors employ various image sizes during training, making it not possible
    # to directly batch together images. Hence they pad the images to the biggest
    # resolution in a given batch, and create a corresponding binary pixel_mask
    # which indicates which pixels are real/which are padding
    pixel_values = [item[0] for item in batch]
    encoding = image_processor.pad(pixel_values, return_tensors="pt")
    labels = [item[1] for item in batch]
    return {
        'pixel_values': encoding['pixel_values'],
        'pixel_mask': encoding['pixel_mask'],
        'labels': labels
    }

TRAIN_DATALOADER = DataLoader(dataset=TRAIN_DATASET, collate_fn=collate_fn, batch_size=4, shuffle=True)
VAL_DATALOADER = DataLoader(dataset=VAL_DATASET, collate_fn=collate_fn, batch_size=4)
TEST_DATALOADER = DataLoader(dataset=TEST_DATASET, collate_fn=collate_fn, batch_size=4)

In [None]:
import pytorch_lightning as pl
from transformers import DetrForObjectDetection
import torch


class Detr(pl.LightningModule):

    def __init__(self, lr, lr_backbone, weight_decay):
        super().__init__()
        self.model = DetrForObjectDetection.from_pretrained(
            pretrained_model_name_or_path=CHECKPOINT,
            num_labels=len(id2label),
            ignore_mismatched_sizes=True
        )

        self.lr = lr
        self.lr_backbone = lr_backbone
        self.weight_decay = weight_decay

    def forward(self, pixel_values, pixel_mask):
        return self.model(pixel_values=pixel_values, pixel_mask=pixel_mask)

    def common_step(self, batch, batch_idx):
        pixel_values = batch["pixel_values"]
        pixel_mask = batch["pixel_mask"]
        labels = [{k: v.to(self.device) for k, v in t.items()} for t in batch["labels"]]

        outputs = self.model(pixel_values=pixel_values, pixel_mask=pixel_mask, labels=labels)

        loss = outputs.loss
        loss_dict = outputs.loss_dict

        return loss, loss_dict

    def training_step(self, batch, batch_idx):
        loss, loss_dict = self.common_step(batch, batch_idx)
        # logs metrics for each training_step, and the average across the epoch
        self.log("training_loss", loss)
        for k,v in loss_dict.items():
            self.log("train_" + k, v.item())

        return loss

    def validation_step(self, batch, batch_idx):
        loss, loss_dict = self.common_step(batch, batch_idx)
        self.log("validation/loss", loss)
        for k, v in loss_dict.items():
            self.log("validation_" + k, v.item())

        return loss

    def configure_optimizers(self):
        # DETR authors decided to use different learning rate for backbone
        # you can learn more about it here:
        # - https://github.com/facebookresearch/detr/blob/3af9fa878e73b6894ce3596450a8d9b89d918ca9/main.py#L22-L23
        # - https://github.com/facebookresearch/detr/blob/3af9fa878e73b6894ce3596450a8d9b89d918ca9/main.py#L131-L139
        param_dicts = [
            {
                "params": [p for n, p in self.named_parameters() if "backbone" not in n and p.requires_grad]},
            {
                "params": [p for n, p in self.named_parameters() if "backbone" in n and p.requires_grad],
                "lr": self.lr_backbone,
            },
        ]
        return torch.optim.AdamW(param_dicts, lr=self.lr, weight_decay=self.weight_decay)

    def train_dataloader(self):
        return TRAIN_DATALOADER

    def val_dataloader(self):
        return VAL_DATALOADER

In [None]:
model = Detr(lr=1e-4, lr_backbone=1e-5, weight_decay=1e-4)

batch = next(iter(TRAIN_DATALOADER))
outputs = model(pixel_values=batch['pixel_values'], pixel_mask=batch['pixel_mask'])

### Trénovanie modelu

In [None]:
from pytorch_lightning import Trainer
from pytorch_lightning.callbacks.early_stopping import EarlyStopping

%cd {HOME}

MAX_EPOCHS = 100          # Počet epoch pre trénovanie 

# technika predčasného ukončenia trénovania
early_stopping = EarlyStopping(
    monitor='val_loss',   # monitorovaná metrika
    patience=15,          # počet epoch bez zlepšenia metriky, predtým ako sa zastaví trénovanie
    verbose=True,         # zobrazenie informácií pre monitoring a debugging
    mode='min',           # 'min' pre minimalizáciu metriky, 'max' pre maximalizáciu
    min_delta=0.0001      # minimálna zmena, aby sa kvalifikovalo ako zlepšenie
)

trainer = Trainer(devices=1, 
                  accelerator="gpu", 
                  max_epochs=MAX_EPOCHS, 
                  gradient_clip_val=0.1, 
                  accumulate_grad_batches=8, 
                  log_every_n_steps=5,  
                  callbacks=[early_stop_callback])

trainer.fit(model)

### Uloženie natrénovaného modelu

In [None]:
model.to(DEVICE) 

In [None]:
MODEL_PATH = os.path.join(HOME, 'custom-model')

In [None]:
model.model.save_pretrained(MODEL_PATH)

### Načítanie natrénovaného modelu

In [None]:
model = DetrForObjectDetection.from_pretrained(MODEL_PATH)
model.to(DEVICE)

### Vizualizácia predikcií

Vykonanie vizualizácie na všetky obrázky v testovacej množine a následné uloženie do súborov:

*inferenced*:
>TP - súbor pre obrázky s detegovanými objektmi
 
>TN - súbor pre obrázky bez detegovaných objektov

In [None]:
# Nastavanie hraníc pre predikcie
CONFIDENCE_THRESHOLD = 0.5
NMS_THRESHOLD = 0.75

In [None]:
import os
import cv2
import torch
import numpy as np

input_folder = TEST_DIRECTORY
output_folder = "inferenced"
if not os.path.exists(output_folder):
    os.makedirs(output_folder)

categories = TEST_DATASET.coco.cats
id2label = {k: v['name'] for k, v in categories.items()}
box_annotator = sv.BoxAnnotator()

detection_folder = os.path.join(output_folder, "TP")
no_detection_folder = os.path.join(output_folder, "TN")

for folder in [detection_folder, no_detection_folder]:
    if not os.path.exists(folder):
        os.makedirs(folder)

for image_filename in os.listdir(input_folder):
  
    if image_filename.lower().endswith(".jpg"):
        image_path = os.path.join(input_folder, image_filename)
        image = cv2.imread(image_path)

        with torch.no_grad():
            inputs = image_processor(images=image, return_tensors='pt').to(DEVICE)
            outputs = model(**inputs)

            target_sizes = torch.tensor([image.shape[:2]]).to(DEVICE)
            results = image_processor.post_process_object_detection(
                outputs=outputs,
                threshold=CONFIDENCE_THRESHOLD,
                target_sizes=target_sizes
            )[0]
            
        detections = sv.Detections.from_transformers(transformers_results=results)
        
        if detections:
            detections = sv.Detections.from_transformers(transformers_results=results).with_nms(threshold=NMS_THRESHOLD)

            labels = [f"{id2label[class_id]} {confidence:.2f}" for _, confidence, class_id, _ in detections]
            frame = box_annotator.annotate(scene=image.copy(), detections=detections, labels=labels)

            output_folder = detection_folder

        else:
            frame = image.copy()
            output_folder = no_detection_folder

        output_image_path = os.path.join(output_folder, image_filename)
        cv2.imwrite(output_image_path, frame)
        print(f"Annotated image saved: {output_image_path}")