### 1. 필요 모듈 설치 (torch, torchvision, torchaudio, transformers, pytorch-lightning)
### 2. 데이터 셋 준비
    - roboflow 에서 데이터셋 구현 후 coco format 으로 다운로드
### 3. Dataset : VisDrone Dataset(2019)

**Pytorch Custom dataset : CustomDataset 생성**

In [None]:
# from torch.utils.data import Dataset

# class CustomDataset(Dataset):
#     # 생성자, 데이터를 전처리 하는 부분
#     def __init__(self):
#     # 데이터셋의 총 길이를 반환하는 부분
#     def __len__(self):
#     # idx(인덱스)에 해당하는 입출력 데이터를 반환
#     def __getitem__(self, idx):

- COCO-Format 이라서 CustomDataset 활용

In [1]:
import torchvision
import os
from torch.utils.data import Dataset

class CocoDetection(torchvision.datasets.CocoDetection):
    def __init__(self, img_folder, processor, train=True):
        ann_file = os.path.join(img_folder, "_annotations.coco.json" if train else "_annotations.coco.json")
        super(CocoDetection, self).__init__(img_folder, ann_file)
        self.processor = processor

        def __getitem__(self, idx):
            # COCO 형식의 PIL 이미지와 타겟을 읽음
            # 다음 단계로 전달하기 전에 여기서 데이터 증강
            img, target = super(CocoDetection, self).__getitem__(idx)
            # 이미지와 타겟 전처리 (타겟을 DETR 포맷으로 변환, 이미지와 타겟 모두 크기 조정 및 정규화)
            image_id = self.ids[idx]
            target = {'image_id': image_id, 'annotations': target}
            encoding = self.processor(images=img, annotations=target, return_tensors='pt')
            pixel_values = encoding["pixel_values"].squeeze()   # 배치 차원 제거
            target = encoding['labels'][0]  # 배치 차원 제거
            return pixel_values, target 


- COCODetection - CustomDataset 적용

In [3]:
# 전처리를 위해서 DERTImageProcessor 사용

from transformers import DetrImageProcessor

processor_DETR = DetrImageProcessor.from_pretrained("facebook/detr-resnet-50")  # DERT용 이미지 프로세서

train_dataset = CocoDetection(img_folder='C:\\Users\\SVT\\Desktop\\PyTorch\\datasets\\train', processor=processor_DETR)
val_dataset = CocoDetection(img_folder='C:\\Users\\SVT\\Desktop\\PyTorch\\datasets\\valid', processor=processor_DETR, train=False)

print("Number of training examples:", len(train_dataset))
print("Number of validation examples:", len(val_dataset))


loading annotations into memory...
Done (t=1.37s)
creating index...
index created!
loading annotations into memory...
Done (t=0.41s)
creating index...
index created!
Number of training examples: 6469
Number of validation examples: 547


**Pytorch Dataloader 수행**

- collate_fn : batch_size로 묶인 데이터 각각을 같은 길이로 padding 하는 코드
    - batch_size=1 : 길이와 상관 없이 적용하지만 2 이상이라면 모든 데이터의 길이는 같지 않음, 따라서 오류 발생
    - 이를 방지하기 위해 collate_fn 적용

In [None]:
from torch.utils.data import DataLoader

def collate_fn(batch):
    pixel_values = [item[0] for item in batch]
    encoding = processor_DETR.pad(pixel_values, return_tensors="pt")
    labels = [item[1] for item in batch]
    batch = {}
    batch['pixel_values'] = encoding['pixel_values']
    batch['pixel_mask'] = encoding['pixel_mask']
    batch['labels'] = labels
    return batch

train_dataloader = DataLoader(train_dataset, collate_fn=collate_fn, batch_size=4, shuffle=True, num_workers=79)
val_dataloader = DataLoader(val_dataset, collate_fn=collate_fn, batch_size=2, shuffle=False, num_workers=79)
batch = next(iter(train_dataloader))

print('batch_key :',batch.keys())
print()
pixel_values, target=train_dataset[0]
print(pixel_values.shape)
print(target)



**Pytorch Lightning 수행**

- pytorch lightning : High-level 인터페이스를 제공하는 오픈소스 Python 라이브러리 (정돈된 느낌으로 작성)
    - ignore_mismatched_sizes=True 로 설정해야함
    - pretrained 되어있는 class 에 맞지 않아도 알아서 맞춰주는 파라미터 인자 값

In [None]:
import pytorch_lightning as pl
from transformers import DetrForObjectDetection
import torch

class Detr(pl.LightningModule):
    def __init__(self, lr, lr_backbone, weight_decay):
        super().__init__()
        # COCO 분류 헤드를 사용자 지정 헤드로 교체
        # timm 라이브러리에 의존하지 않도록 "no_timm"변형 지정
        # 합성곱 백본의 경우엔
        # num_labels = 클래스 수
        # ignore_mismatched_sizes : 모델 출력과 대상 크기 간 불일치를 무시할지 여부
        self.model = DetrForObjectDetection.from_pretrained("facebook/detr-resnet-50", revision="no_timm", id2label=id2label, num_labels=len(id2label), ignore_mismatched_sizes=True)
        self.lr = lr
        self.lr_backbone = lr_backbone
        self.weight_decay = weight_decay
    
    def forward(self, pixel_values, pixel_mask):
        outputs = self.model(pixel_values=pixel_values, pixel_mask=pixel_mask)
        return outputs
    
    def common_step(self, batch, batch_idx):
        pixel_values = batch['pixel_values']
        pixel_mask = batch['pixel_mask']
        labels = [{k: v.to(self.device) for k, v in t.items()} for t in batch['labels']]

        outputs = self.model(pixel_values=pixel_values, pixel_mask=pixel_mask, labels=labels)

        loss = outputs.loss
        loss_dict = outputs.loss_dict
        return loss, loss_dict
    
    def training_step(self, batch, batch_idx):
        loss, loss_dict = self.common_step(batch, batch_idx)
        # 각 training_step 에 대한 로그 기록 및 에포크 전체의 평균
        self.log('training_loss', loss)
        for k, v in loss_dict.items():
            self.log('train_' + k, v.item())

            return loss
        
    def validation_step(self, batch, batch_idx):
        loss, loss_dict = self.common_step(batch, batch_idx)
        self.log('validation_loss', loss)
        for k, v in loss_dict.items():
            self.log('validation_' + k, v.item())

        return loss
    
    def configure_optimizers(self):
        param_dicts = [
            {"params": [ p for n, p in self.named_parameters() if "backbone" not in n and p.requires_grad]},
            {"params": [p for n, p in self.named_parameters() if "backbone" in n and p.requires_grad],
             "lr": self.lr_backbone},
        ]
        optimizer = torch.optim.AdamW(param_dicts, lr=self.lr, weight_decay=self.weight_decay)
        return optimizer
    
    def train_dataloader(self):
        return train_dataloader
    
    def val_dataloader(self):
        return val_dataloader
    
model_detr = Detr(lr=1e-4, lr_backbone=1e-5, weight_decay=1e-4)

outputs = model_detr(pixel_values=batch['pixel_values'], pixel_mask=batch['pixel_mask'])

# 모델의 구조를 수정하거나 했을 때 구조 확인을 위한 출력
print('model state_dict keys ')
for i in model_detr.state_dict().keys():
    print(i)

# 총 파라미터수/ 학습가능한 파라미터수/ 고정된 파라미터수
total_params = sum(p.numel() for p in model_detr.parameters())
print("Total Parameters:", total_params)

total_trainable_params = sum(p.numel() for p in model_detr.parameters() if p.requires_grad)
print("Total Trainable Parameters:", total_trainable_params)

total_fixed_params = sum(p.numel() for p in model_detr.parameters() if not p.requires_grad)
print("Total Fixed Parameters:", total_fixed_params)

**Training**
- early_stopping 사용 가능

In [None]:
from pytorch_lightning import Trainer
import os

## early stopping 적용 코드
# from lightning.pytorch.callbacks.early_stopping import EarlyStopping
# early_stop_callback = pl.callbacks.EarlyStopping(monitor="validation_loss", min_delta=0.00, patience=3, verbose=False, mode="min")
# trainer = Trainer(accelerator="gpu", devices=[3], max_epochs=30, callbacks=[early_step_callback], gradient_clip_val=0.1, accumulate_grad_batches=8, log_every_n_steps=5)
# trainer.fit(model_detr)

trainer = Trainer(accelerator="gpu", devices=[0], max_epochs=150, gradient_clip_val=0.1, accumulate_grad_batches=8, log_every_n_steps=5)
trainer.fit(model_detr)

**Huggingface 에 올리기**

In [None]:
import huggingface_hub

huggingface-cli login

model_detr.model.push_to_hub("name/detr_custom")
processor_DETR.push_to_hub("name/detr_custom")



**모델 평가(Evaluation)**
- HuggingFace 에서 작성했던 디렉토리에 맞게 주소 입력해서 다운

In [None]:
# model load
from transformers import DetrImageProcessor, DetrForObjectDetection
import torch

model = DetrForObjectDetection.from_pretrained("name/detr_custom")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
processor = DetrImageProcessor.from_pretrained("name/detr_custom")

In [None]:
def convert_to_xywh(boxes):
    xmin, ymin, xmax, ymax = boxes.unbind(1)
    return torch.stack((xmin, ymin, xmax - xmin, ymax - ymin), dim = 1)

def prepare_for_coco_detection(predictions):
    coco_results = []
    for original_id, prediction in predictions.items():
        if len(prediction) == 0:
            continue

        boxes = prediction['boxes']
        boxes = convert_to_xywh(boxes).tolist()
        scores = prediction['scores'].tolist()
        labels = prediction['labels'].tolist()

        coco_results.extend(
            [
                {
                    "image_id": original_id,
                    "category_id": labels[k],
                    "bbox": box,
                    "score": scores[k],
                }
                for k, box in enumerate(boxes)
            ]
        )
    return coco_results

- 평가 코드 실행

In [None]:
from coco_eval import CocoEvaluator
from tqdm.notebook import tqdm

import numpy as np

# initialize evaluator with ground truth (gt)
for idx, batch in enumerate(tqdm(val_dataloader)):
    # get the inputs
    pixel_values = batch["pixel_values"].to(device)
    pixel_mask = batch["pixel_mask"].to(device)
    labels = [{k: v.to(device) for k, v in t.items()} for t in batch["labels"]] # DETR 형식이고, 크기 조정 및 정규화 됨

    # forward pass
    with torch.no_grad():
        outputs = model(pixel_values=pixel_values, pixel_mask=pixel_mask)
    # 사전 목록으로 변환(배치의 각 예제에 대해 하나의 항목)
    orig_target_sizes = torch.stack([target["orig_size"] for target in labels], dim = 0)
    results = processor.post_process_object_detection(outputs, target_sizes=orig_target_sizes, threshold=0)

    # 메트릭에 제공
    # 메트릭은 사전 목록을 기대, 각 항목은 image_id, category_id, bbox 및 score 키 포함
    predictions = {target['image_id'].item(): output for target, output in zip(labels, results)}
    predictions = prepare_for_coco_detection(predictions)
    evaluator.update(predictions)

evaluator.synchronize_between_processes()
evaluator.accumulate()
evaluator.summarize()


**Inference(시각화)**

In [None]:
# 임의의 val_dataset 가져옴 : val_dataset[1]
pixel_values, target = val_dataset[1]
pixel_values = pixel_values.unsqueeze(0).to(device)
print(pixel_values.shape)

In [None]:
with torch.no_grad():
    # forward pass to get class logits and bounding boxes
    outputs = model(pixel_values=pixel_values, pixel_mask=None)
print("Outputs:", outputs.keys())

In [None]:
import matplotlib.pyplot as plt

# colors for visualization
COLORS = [[0.000, 0.447, 0.741], [0.850, 0.325, 0.098], [0.929, 0.694, 0.125],
          [0.494, 0.184, 0.556], [0.466, 0.674, 0.188], [0.301, 0.745, 0.933]]

def plot_results(pil_img, scores, labels, boxes):
    plt.figure(figsize=(16, 10))
    plt.imshow(pil_img)
    ax = plt.gca()
    colors = COLORS * 100
    for score, label, (xmin, ymin, xmax, ymax), c in zip(scores.tolist(), labels.tolist(), boxes.tolist(), colors):
        ax.add_patch(plt.Rectangle((xmin, ymin), xmax - xmin, ymax - ymin, fill=False, color=c, linewidth=3))

        text = f"{model.config.id2label[label]}: {score:0.2f}"
        ax.text(xmin, ymin, text, fontsize=15, bbox=dict(facecolor='yellow', alpha=0.5))
        plt.axis('off')
        plt.show()

In [None]:
# load image based on ID
image_id = target['image_id'].item()
image = val_dataset.coco.loadImgs(image_id)[0]
image = Image.open(os.path.join('C:\Users\SVT\Desktop\PyTorch\datasets\valid'), image['file_name'])

# postprocess model outputs
width, height = image.size
postprocessed_outputs = processor.post_process_object_detection(outputs, taret_sizes=[(height, width)],
                                                                threshold=0.9)
results = postprocessed_outputs[0]
plot_results(image, results['scores'], results['labels'], results['boxes'])

**기타**
- Params
    - Trainable params : Encoder, Decoder, FFN
    - Non-trainable params : ResNet