In [None]:
!pip install kaggle --upgrade



In [None]:
!kaggle datasets download -d s076923/pytorch-transformer

Dataset URL: https://www.kaggle.com/datasets/s076923/pytorch-transformer
License(s): other
Downloading pytorch-transformer.zip to /content
 99% 908M/916M [00:08<00:00, 108MB/s]
100% 916M/916M [00:08<00:00, 111MB/s]


In [None]:
# 먼저, 압축 파일(pytorch-transformer.zip)을 지정된 경로(현재 디렉토리)로 압축 해제합니다.
import shutil
shutil.unpack_archive(
    filename="pytorch-transformer.zip",  # 압축 해제할 파일 경로
    extract_dir="./",                     # 압축 해제할 디렉토리 (현재 디렉토리)
    format="zip"                          # 압축 파일 형식 지정
)


In [None]:
# 작업 디렉토리를 변경합니다.
import os
os.chdir("/content/datasets/")  # 작업할 데이터셋 폴더로 이동 (실행 환경에 따라 경로가 달라질 수 있음)

### SSD 백본(Backbone) 정의
먼저, ResNet과 같이 사전 학습된 네트워크의 일부 층을 사용하여 SSD의 특징 추출기(backbone)를 구성합니다.

In [None]:
from torch import nn
from collections import OrderedDict

class SSDBackbone(nn.Module):
    def __init__(self, backbone):
        super().__init__()
        # backbone의 초기 층: conv1, bn1, relu를 하나의 sequential로 묶습니다.
        layer0 = nn.Sequential(backbone.conv1, backbone.bn1, backbone.relu)
        # ResNet의 레이어들 (layer1 ~ layer4)를 저장
        layer1 = backbone.layer1
        layer2 = backbone.layer2
        layer3 = backbone.layer3
        layer4 = backbone.layer4

        # features는 layer0부터 layer3까지 연결한 것입니다.
        self.features = nn.Sequential(layer0, layer1, layer2, layer3)

        # upsampling 모듈: features의 출력 채널 수를 변경하고 활성화 함수(ReLU)를 적용합니다.
        self.upsampling= nn.Sequential(
            nn.Conv2d(in_channels=256, out_channels=512, kernel_size=1),
            nn.ReLU(inplace=True),
        )

        # extra 모듈: SSD에서는 다양한 크기의 특징맵(feature map)을 사용하기 위해 추가적인 계층을 만듭니다.
        # ModuleList에 여러 sequential 블록을 추가하여 점진적으로 다운샘플링 하면서 채널 수를 조정합니다.
        self.extra = nn.ModuleList(
            [
                # 첫 번째 extra block: layer4와 1x1 conv를 사용해 채널 수를 늘립니다.
                nn.Sequential(
                    layer4,
                    nn.Conv2d(in_channels=512, out_channels=1024, kernel_size=1),
                    nn.ReLU(inplace=True),
                ),
                # 두 번째 extra block
                nn.Sequential(
                    nn.Conv2d(1024, 256, kernel_size=1),
                    nn.ReLU(inplace=True),
                    nn.Conv2d(256, 512, kernel_size=3, padding=1, stride=2),
                    nn.ReLU(inplace=True),
                ),
                # 세 번째 extra block
                nn.Sequential(
                    nn.Conv2d(512, 128, kernel_size=1),
                    nn.ReLU(inplace=True),
                    nn.Conv2d(128, 256, kernel_size=3, padding=1, stride=2),
                    nn.ReLU(inplace=True),
                ),
                # 네 번째 extra block
                nn.Sequential(
                    nn.Conv2d(256, 128, kernel_size=1),
                    nn.ReLU(inplace=True),
                    nn.Conv2d(128, 256, kernel_size=3),
                    nn.ReLU(inplace=True),
                ),
                # 다섯 번째 extra block
                nn.Sequential(
                    nn.Conv2d(256, 128, kernel_size=1),
                    nn.ReLU(inplace=True),
                    nn.Conv2d(128, 256, kernel_size=3),
                    nn.ReLU(inplace=True),
                ),
                # 여섯 번째 extra block: kernel_size가 4로 지정되어 다운샘플링의 정도가 다릅니다.
                nn.Sequential(
                    nn.Conv2d(256, 128, kernel_size=1),
                    nn.ReLU(inplace=True),
                    nn.Conv2d(128, 256, kernel_size=4),
                    nn.ReLU(inplace=True),
                )
            ]
        )

    def forward(self, x):
        # 기본 feature 추출 (features: layer0 ~ layer3)
        x = self.features(x)
        # upsampling을 적용하여 첫 번째 출력 feature map을 만듭니다.
        output = [self.upsampling(x)]

        # extra 모듈들을 순차적으로 적용하며 추가적인 feature map들을 생성합니다.
        for block in self.extra:
            x = block(x)
            output.append(x)

        # 결과를 OrderedDict 형태로 반환 (각 단계별 특징맵에 index를 부여)
        return OrderedDict([(str(i), v) for i, v in enumerate(output)])


- features: ResNet의 앞부분(layer0~layer3)으로 입력 이미지의 기본 특징(feature)을 추출합니다.
- upsampling: 기본 특징맵의 채널 수를 512로 변환합니다.
- extra: SSD에서 여러 크기의 특징맵을 사용하기 위한 추가 계층들로, 점진적으로 해상도를 줄이면서 채널 수를 조정합니다.
- forward: 입력 이미지를 차례로 features, upsampling, extra 모듈에 통과시켜 여러 수준의 특징맵을 생성한 후, 이를 OrderedDict로 반환합니다.


### 모델과 앵커 생성자(Anchor Generator) 초기화
사전 학습된 ResNet34를 기반으로 SSD 백본을 구성하고, SSD 모델 및 앵커 박스 생성자를 정의합니다.

In [None]:
import torch
from torchvision.models import resnet34
from torchvision.models.detection import ssd
from torchvision.models.detection.anchor_utils import DefaultBoxGenerator

# 사전 학습된 ResNet34 모델을 불러옵니다.
backbone_base = resnet34(weights="ResNet34_Weights.IMAGENET1K_V1")
# 위에서 정의한 SSDBackbone으로 ResNet34의 일부 층을 래핑합니다.
backbone = SSDBackbone(backbone_base)

# DefaultBoxGenerator: SSD의 앵커 박스를 생성하기 위한 객체
anchor_generator = DefaultBoxGenerator(
    aspect_ratios=[[2], [2, 3], [2, 3], [2, 3], [2, 3], [2], [2]],
    scales=[0.07, 0.15, 0.33, 0.51, 0.69, 0.87, 1.05, 1.20],
    steps=[8, 16, 32, 64, 100, 300, 512],
)

# 사용 가능한 device(CPU 또는 GPU)를 선택합니다.
device = "cuda" if torch.cuda.is_available() else "cpu"

# SSD 모델 생성: backbone, 앵커 생성자, 입력 이미지 크기, 클래스 수를 지정합니다.
model = ssd.SSD(
    backbone=backbone,
    anchor_generator=anchor_generator,
    size=(512, 512),
    num_classes=3
).to(device)


Downloading: "https://download.pytorch.org/models/resnet34-b627a593.pth" to /root/.cache/torch/hub/checkpoints/resnet34-b627a593.pth
100%|██████████| 83.3M/83.3M [00:00<00:00, 116MB/s]


- ResNet34를 사전 학습된 가중치와 함께 불러오고, 이를 SSDBackbone으로 감싸 SSD에 맞게 구성합니다.
- DefaultBoxGenerator를 통해 SSD에서 사용할 여러 크기의 앵커 박스를 설정합니다.
- 최종적으로 SSD 모델을 초기화하고, GPU 또는 CPU로 할당합니다.


### COCO 데이터셋 클래스 정의
COCO 데이터셋의 이미지와 어노테이션(JSON 파일)을 읽어와서 모델 학습에 사용할 수 있도록 Dataset 클래스를 정의합니다.

In [None]:
import os
import torch
from PIL import Image
from pycocotools.coco import COCO
from torch.utils.data import Dataset

class COCODataset(Dataset):
    def __init__(self, root, train, transform=None):
        super().__init__()
        # 학습/검증 데이터를 구분하여 파일 경로 설정
        directory = "train" if train else "val"
        annotations = os.path.join(root, "annotations", f"{directory}_annotations.json")

        # COCO 어노테이션 파일을 읽어옵니다.
        self.coco = COCO(annotations)
        self.image_path = os.path.join(root, directory)
        self.transform = transform

        # 카테고리 정보 및 데이터셋의 이미지-어노테이션 페어를 로드합니다.
        self.categories = self._get_categories()
        self.data = self._load_data()

    def _get_categories(self):
        # COCO의 카테고리 정보를 {id: name} 형태의 딕셔너리로 저장합니다.
        categories = {0: "background"}
        for category in self.coco.cats.values():
            categories[category["id"]] = category["name"]
        return categories

    def _load_data(self):
        data = []
        # 모든 이미지에 대해 어노테이션 정보를 로드합니다.
        for _id in self.coco.imgs:
            file_name = self.coco.loadImgs(_id)[0]["file_name"]
            image_path = os.path.join(self.image_path, file_name)
            image = Image.open(image_path).convert("RGB")

            boxes = []
            labels = []
            anns = self.coco.loadAnns(self.coco.getAnnIds(_id))
            # 각 어노테이션에서 bbox 좌표와 레이블을 추출합니다.
            for ann in anns:
                x, y, w, h = ann["bbox"]
                boxes.append([x, y, x + w, y + h])
                labels.append(ann["category_id"])

            target = {
                "image_id": torch.LongTensor([_id]),
                "boxes": torch.FloatTensor(boxes),
                "labels": torch.LongTensor(labels)
            }
            data.append([image, target])
        return data

    def __getitem__(self, index):
        image, target = self.data[index]
        if self.transform:
            image = self.transform(image)
        return image, target

    def __len__(self):
        return len(self.data)


- _get_categories(): COCO 어노테이션 파일에서 카테고리 정보를 읽어와서, 배경 클래스(0번)와 함께 딕셔너리 형태로 저장합니다.
- _load_data(): 각 이미지 파일과 해당 이미지에 대한 어노테이션(바운딩 박스, 레이블)을 불러와서 리스트에 저장합니다.
- getitem(): DataLoader에서 호출될 때 이미지와 어노테이션을 반환하며, 필요시 transform을 적용합니다.

### DataLoader와 데이터 전처리
이미지를 텐서로 변환하는 transform과 함께 학습 및 테스트 데이터셋을 로드합니다.

In [None]:
from torchvision import transforms
from torch.utils.data import DataLoader

# DataLoader가 배치 데이터를 올바르게 묶을 수 있도록 custom collator 함수 정의
def collator(batch):
    return tuple(zip(*batch))

# 이미지 전처리: PIL 이미지를 텐서로 변환하고 데이터 타입을 float으로 변환
transform = transforms.Compose(
    [
        transforms.PILToTensor(),
        transforms.ConvertImageDtype(dtype=torch.float)
    ]
)

# 학습 및 테스트 데이터셋 초기화 (datasets 경로는 사용자 환경에 맞게 수정 필요)
train_dataset = COCODataset("../datasets/coco", train=True, transform=transform)
test_dataset = COCODataset("../datasets/coco", train=False, transform=transform)

# DataLoader: 배치 크기, 셔플링 여부, collate 함수 등을 지정
train_dataloader = DataLoader(
    train_dataset, batch_size=4, shuffle=True, drop_last=True, collate_fn=collator
)
test_dataloader = DataLoader(
    test_dataset, batch_size=1, shuffle=True, drop_last=True, collate_fn=collator
)


loading annotations into memory...
Done (t=0.09s)
creating index...
index created!
loading annotations into memory...
Done (t=0.01s)
creating index...
index created!


- transform: 이미지를 텐서로 변환하여 모델에 바로 입력할 수 있도록 합니다.
- collator: DataLoader에서 여러 샘플을 묶어 배치를 만들 때, 이미지와 어노테이션을 별도로 묶어주는 함수입니다.

### 모델 학습
SGD 옵티마이저와 StepLR 스케줄러를 사용하여 모델을 10 에폭(epoch) 동안 학습합니다.

In [None]:
from torch import optim
from tqdm import tqdm

# 학습 가능한 파라미터만 모아서 optimizer에 전달
params = [p for p in model.parameters() if p.requires_grad]
optimizer = optim.SGD(params, lr=0.001, momentum=0.9, weight_decay=0.0005)
lr_scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)

# 총 10 에폭 동안 학습
for epoch in range(10):
    train_cost = 0.0
    model.train()  # 학습 모드
    # tqdm으로 training loop 진행 상황 표시
    for images, targets in tqdm(train_dataloader, desc=f"Training Epoch {epoch+1}"):
        # 배치의 각 이미지와 어노테이션을 device(GPU/CPU)로 이동
        images = [image.to(device) for image in images]
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        # 모델에 입력하여 손실(loss) 계산
        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())

        # 역전파 및 파라미터 업데이트
        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

        # loss.item()을 사용하여 스칼라 값을 누적
        train_cost += losses.item()

    # 에폭 종료 후 학습률 갱신
    lr_scheduler.step()
    avg_train_loss = train_cost / len(train_dataloader)

    print(f"Epoch: {epoch+1:4d}, Train Loss: {avg_train_loss:.3f}")


Training Epoch 1: 100%|██████████| 607/607 [04:04<00:00,  2.48it/s]


Epoch:    1, Train Loss: 6.339


Training Epoch 2: 100%|██████████| 607/607 [04:05<00:00,  2.48it/s]


Epoch:    2, Train Loss: 5.425


Training Epoch 3: 100%|██████████| 607/607 [04:05<00:00,  2.47it/s]


Epoch:    3, Train Loss: 5.077


Training Epoch 4: 100%|██████████| 607/607 [04:05<00:00,  2.47it/s]


Epoch:    4, Train Loss: 4.727


Training Epoch 5: 100%|██████████| 607/607 [04:05<00:00,  2.47it/s]


Epoch:    5, Train Loss: 4.387


Training Epoch 6: 100%|██████████| 607/607 [04:05<00:00,  2.47it/s]


Epoch:    6, Train Loss: 3.899


Training Epoch 7: 100%|██████████| 607/607 [04:05<00:00,  2.47it/s]


Epoch:    7, Train Loss: 3.750


Training Epoch 8: 100%|██████████| 607/607 [04:05<00:00,  2.47it/s]


Epoch:    8, Train Loss: 3.653


Training Epoch 9: 100%|██████████| 607/607 [04:05<00:00,  2.47it/s]


Epoch:    9, Train Loss: 3.559


Training Epoch 10: 100%|██████████| 607/607 [04:05<00:00,  2.47it/s]

Epoch:   10, Train Loss: 3.460





- 매 배치마다 모델에 이미지와 타겟(어노테이션)을 입력하고, 손실값을 구한 뒤 역전파 및 업데이트를 수행합니다.
- 에폭이 끝나면 학습률을 조정하며, 에폭마다 평균 손실을 출력합니다.


### 객체 검출 결과 시각화
테스트 데이터셋을 이용해 모델이 예측한 바운딩 박스를 시각화합니다.
빨간색은 모델의 예측 결과, 파란색은 실제 정답(ground truth)을 나타냅니다.

In [None]:
import numpy as np
from PIL import Image
from matplotlib import pyplot as plt
from torchvision.transforms.functional import to_pil_image

# 바운딩 박스를 그리는 함수 정의
def draw_bbox(ax, box, text, color):
    ax.add_patch(
        plt.Rectangle(
            xy=(box[0], box[1]),
            width=box[2] - box[0],
            height=box[3] - box[1],
            fill=False,
            edgecolor=color,
            linewidth=2,
        )
    )
    ax.annotate(
        text=text,
        xy=(box[0] - 5, box[1] - 5),
        color=color,
        weight="bold",
        fontsize=13,
    )

threshold = 0.5  # 예측 점수 임계값
categories = test_dataset.categories

with torch.no_grad():
    model.eval()  # 평가 모드로 전환
    for images, targets in test_dataloader:
        images = [image.to(device) for image in images]
        outputs = model(images)

        # 모델 출력에서 바운딩 박스, 레이블, 점수를 추출
        boxes = outputs[0]["boxes"].to("cpu").numpy()
        labels = outputs[0]["labels"].to("cpu").numpy()
        scores = outputs[0]["scores"].to("cpu").numpy()

        # 점수가 threshold 이상인 경우만 선택
        boxes = boxes[scores >= threshold].astype(np.int32)
        labels = labels[scores >= threshold]
        scores = scores[scores >= threshold]

        # 이미지 시각화
        fig = plt.figure(figsize=(8, 8))
        ax = fig.add_subplot(1, 1, 1)
        plt.imshow(to_pil_image(images[0]))

        # 예측 결과 (빨간색)
        for box, label, score in zip(boxes, labels, scores):
            draw_bbox(ax, box, f"{categories[label]} - {score:.4f}", "red")

        # 실제 정답 (파란색)
        tboxes = targets[0]["boxes"].numpy()
        tlabels = targets[0]["labels"].numpy()
        for box, label in zip(tboxes, tlabels):
            draw_bbox(ax, box, f"{categories[label]}", "blue")

        plt.show()


- 모델 예측 결과에서 점수가 일정 임계값 이상인 박스들만 선택하여 시각화합니다.
- draw_bbox 함수로 각 박스와 텍스트(클래스 이름 및 점수 혹은 정답)를 그림에 표시합니다.

### COCO 평가 (COCOeval)
모델의 검출 성능을 COCO 평가 방식으로 계산합니다.

In [None]:
import numpy as np
from pycocotools.cocoeval import COCOeval

with torch.no_grad():
    model.eval()
    coco_detections = []
    for images, targets in test_dataloader:
        images = [img.to(device) for img in images]
        outputs = model(images)

        for i in range(len(targets)):
            # 각 이미지에 대해 image_id, 박스, 점수, 레이블 추출
            image_id = targets[i]["image_id"].data.cpu().numpy().tolist()[0]
            boxes = outputs[i]["boxes"].data.cpu().numpy()
            # COCO 형식에 맞게 박스 좌표 (x, y, w, h)로 변환
            boxes[:, 2] = boxes[:, 2] - boxes[:, 0]
            boxes[:, 3] = boxes[:, 3] - boxes[:, 1]
            scores = outputs[i]["scores"].data.cpu().numpy()
            labels = outputs[i]["labels"].data.cpu().numpy()

            for instance_id in range(len(boxes)):
                box = boxes[instance_id, :].tolist()
                prediction = np.array(
                    [
                        image_id,
                        box[0],
                        box[1],
                        box[2],
                        box[3],
                        float(scores[instance_id]),
                        int(labels[instance_id]),
                    ]
                )
                coco_detections.append(prediction)
    coco_detections = np.asarray(coco_detections)

    # COCO ground truth와 검출 결과를 로드하여 평가를 수행
    coco_gt = test_dataloader.dataset.coco
    coco_dt = coco_gt.loadRes(coco_detections)
    coco_evaluator = COCOeval(coco_gt, coco_dt, iouType="bbox")
    coco_evaluator.evaluate()
    coco_evaluator.accumulate()
    coco_evaluator.summarize()


Loading and preparing results...
Converting ndarray to lists...
(27088, 7)
0/27088
DONE (t=0.27s)
creating index...
index created!
Running per image evaluation...
Evaluate annotation type *bbox*
DONE (t=0.55s).
Accumulating evaluation results...
DONE (t=0.18s).
 Average Precision  (AP) @[ IoU=0.50:0.95 | area=   all | maxDets=100 ] = 0.198
 Average Precision  (AP) @[ IoU=0.50      | area=   all | maxDets=100 ] = 0.472
 Average Precision  (AP) @[ IoU=0.75      | area=   all | maxDets=100 ] = 0.115
 Average Precision  (AP) @[ IoU=0.50:0.95 | area= small | maxDets=100 ] = 0.047
 Average Precision  (AP) @[ IoU=0.50:0.95 | area=medium | maxDets=100 ] = 0.233
 Average Precision  (AP) @[ IoU=0.50:0.95 | area= large | maxDets=100 ] = 0.192
 Average Recall     (AR) @[ IoU=0.50:0.95 | area=   all | maxDets=  1 ] = 0.298
 Average Recall     (AR) @[ IoU=0.50:0.95 | area=   all | maxDets= 10 ] = 0.424
 Average Recall     (AR) @[ IoU=0.50:0.95 | area=   all | maxDets=100 ] = 0.438
 Average Recall   

### 백본 출력 채널 확인 함수
모델의 백본에서 각 단계별 출력 채널 수를 확인하기 위한 헬퍼 함수입니다.

In [None]:
def retrieve_out_channels(model, size):
    model.eval()  # 평가 모드로 전환
    with torch.no_grad():
        device = next(model.parameters()).device
        # 더미 이미지 생성: 지정한 사이즈로 0으로 채워진 텐서
        image = torch.zeros((1, 3, size[1], size[0]), device=device)
        features = model(image)

        # features가 Tensor인 경우 OrderedDict 형태로 변환
        if isinstance(features, torch.Tensor):
            features = OrderedDict([("0", features)])
        # 각 특징맵의 채널 수를 리스트로 반환
        out_channels = [x.size(1) for x in features.values()]

    model.train()
    return out_channels

# 백본의 각 출력 채널 수 출력 (예: [512, 1024, 512, 256, ...])
print(retrieve_out_channels(backbone, (512, 512)))


[512, 1024, 512, 256, 256, 256, 256]


- 더미 입력 이미지를 백본에 통과시켜서 각 단계에서 생성되는 특징맵의 채널 수(Depth)를 반환합니다.
- 이 함수는 모델의 구조를 이해하거나 디버깅할 때 유용합니다.


In [None]:
cnt = 0
threshold = 0.5  # 예측 점수 임계값
categories = test_dataset.categories

with torch.no_grad():
    model.eval()  # 평가 모드로 전환
    for images, targets in test_dataloader:
        images = [image.to(device) for image in images]
        outputs = model(images)

        # 모델 출력에서 바운딩 박스, 레이블, 점수를 추출
        boxes = outputs[0]["boxes"].to("cpu").numpy()
        labels = outputs[0]["labels"].to("cpu").numpy()
        scores = outputs[0]["scores"].to("cpu").numpy()

        # 점수가 threshold 이상인 경우만 선택
        boxes = boxes[scores >= threshold].astype(np.int32)
        labels = labels[scores >= threshold]
        scores = scores[scores >= threshold]

        # 이미지 시각화
        fig = plt.figure(figsize=(8, 8))
        ax = fig.add_subplot(1, 1, 1)
        plt.imshow(to_pil_image(images[0]))

        # 예측 결과 (빨간색)
        for box, label, score in zip(boxes, labels, scores):
            draw_bbox(ax, box, f"{categories[label]} - {score:.4f}", "red")

        # 실제 정답 (파란색)
        tboxes = targets[0]["boxes"].numpy()
        tlabels = targets[0]["labels"].numpy()
        for box, label in zip(tboxes, tlabels):
            draw_bbox(ax, box, f"{categories[label]}", "blue")

        plt.show()

        cnt += 1
        if cnt == 5 :
            break