## 0. 패키지 import

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import torchvision.transforms as T
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from PIL import Image
import json
import os
from tqdm import tqdm
from torchvision.ops import box_iou
import time
import numpy as np

In [None]:
from google.colab import drive

# Google Drive 마운트
drive.mount('/content/drive')

Mounted at /content/drive


## 1. 이미지 불러오기 + 변환

In [None]:
#이미지 경로와 annotation 읽어오기
# cache를 활용해 한 번 읽어온 이미지를 메모리에 저장해두고, 메모리에서 데이터를 그때그때 불러온다 (드라이브에서 불러오는건 비효율적이기 때문)

import json
import torch
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import os
import weakref

class CustomDataset(Dataset):
    def __init__(self, json_file, transforms=None):
        with open(json_file) as f:
            self.data = json.load(f)
        self.transforms = transforms
        self.cache = weakref.WeakValueDictionary()  # Use weak reference cache to avoid memory leaks

    def __len__(self):
        return len(self.data)

    # Load images
    def _load_image(self, img_path):
        if img_path in self.cache:
            return self.cache[img_path]

        try:
            img = Image.open(img_path).convert("RGB")
            self.cache[img_path] = img
            return img
        except FileNotFoundError:
            print(f"File not found: {img_path}")
            return None

    def __getitem__(self, idx):
        key = list(self.data.keys())[idx]
        img_path = self.data[key]["image"]
        img = self._load_image(img_path)

        if img is None:
            return None, None

        boxes = torch.as_tensor(self.data[key]["bbox"], dtype=torch.float32)
        labels = torch.as_tensor(self.data[key]["label"], dtype=torch.int64)

        # Validate and fix bounding boxes
        valid_boxes = []
        valid_labels = []
        for box, label in zip(boxes, labels):
            if box[2] > box[0] and box[3] > box[1]:
                valid_boxes.append(box)
                valid_labels.append(label)

        boxes = torch.stack(valid_boxes) if valid_boxes else torch.zeros((0, 4), dtype=torch.float32)
        labels = torch.tensor(valid_labels, dtype=torch.int64) if valid_labels else torch.zeros((0,), dtype=torch.int64)

        target = {}
        target["boxes"] = boxes
        target["labels"] = labels

        if self.transforms:
            img = self.transforms(img)

        return img, target


In [None]:
#데이터 변환 함수(학습 시 무작위로 좌우반전)
def get_transform(train):
    transforms = []
    transforms.append(T.ToTensor())
    if train:
        transforms.append(T.RandomHorizontalFlip(0.5))
    return T.Compose(transforms)

## 2. 데이터 로드

In [None]:
#데이터 로더 -> 배치로 묶기
def collate_fn(batch):
    batch = [b for b in batch if b[0] is not None and b[1] is not None]  # Filter out None values
    return tuple(zip(*batch)) if batch else ([], [])

In [None]:
# Load datasets
train_dataset = CustomDataset('/content/drive/MyDrive/preprocessed_data_aug/dataset_train.json', get_transform(train=True))
valid_dataset = CustomDataset('/content/drive/MyDrive/preprocessed_data_aug/dataset_valid.json', get_transform(train=False))
test_dataset = CustomDataset('/content/drive/MyDrive/preprocessed_data_aug/dataset_test.json', get_transform(train=False))

#배치 사이즈 16으로 진행(메모리 부족)
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, collate_fn=collate_fn)
valid_loader = DataLoader(valid_dataset, batch_size=16, shuffle=False, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False, collate_fn=collate_fn)

In [None]:
# resnet_50 Model
model = fasterrcnn_resnet50_fpn(pretrained=True)
num_classes = 16  # Assuming 15 classes + background
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = fasterrcnn_resnet50_fpn(num_classes=num_classes).roi_heads.box_predictor

Downloading: "https://download.pytorch.org/models/fasterrcnn_resnet50_fpn_coco-258fb6c6.pth" to /root/.cache/torch/hub/checkpoints/fasterrcnn_resnet50_fpn_coco-258fb6c6.pth
100%|██████████| 160M/160M [00:00<00:00, 213MB/s]
Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to /root/.cache/torch/hub/checkpoints/resnet50-0676ba61.pth
100%|██████████| 97.8M/97.8M [00:00<00:00, 193MB/s]


In [None]:
#gpu 연결
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)

FasterRCNN(
  (transform): GeneralizedRCNNTransform(
      Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
      Resize(min_size=(800,), max_size=1333, mode='bilinear')
  )
  (backbone): BackboneWithFPN(
    (body): IntermediateLayerGetter(
      (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
      (bn1): FrozenBatchNorm2d(64, eps=0.0)
      (relu): ReLU(inplace=True)
      (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
      (layer1): Sequential(
        (0): Bottleneck(
          (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn1): FrozenBatchNorm2d(64, eps=0.0)
          (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (bn2): FrozenBatchNorm2d(64, eps=0.0)
          (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn3): FrozenBatchNorm2d(256, eps=0.0)
          (relu): ReLU(

## 3. 모델 훈련

In [None]:
# Training function
def train_one_epoch(model, data_loader, optimizer, device, epoch):
    model.train()
    running_loss = 0.0
    for images, targets in tqdm(data_loader, desc=f"Epoch {epoch}"):
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())
        optimizer.zero_grad()
        losses.backward()
        optimizer.step()
        running_loss += losses.item()
    epoch_loss = running_loss / len(data_loader)
    return epoch_loss

In [None]:
#IOU 계산함수 정의
def box_iou(box1, box2):
    """Compute the Intersection Over Union (IOU) of two sets of boxes.
    The box order must be (xmin, ymin, xmax, ymax).
    """
    inter = (torch.min(box1[..., None, 2:], box2[..., 2:]) -
             torch.max(box1[..., None, :2], box2[..., :2])).clamp(0).prod(2)
    area1 = (box1[..., 2:] - box1[..., :2]).prod(1)
    area2 = (box2[..., 2:] - box2[..., :2]).prod(1)
    union = area1[..., None] + area2 - inter
    return inter / union

In [None]:
#accuracy만 반환하는 함수

def evaluate(model, data_loader, device):
    model.eval()
    total_boxes = 0
    correct_boxes = 0
    with torch.no_grad():
        for images, targets in data_loader:
            images = list(image.to(device) for image in images)
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
            outputs = model(images)

            for target, output in zip(targets, outputs):
                target_boxes = target["boxes"]
                target_labels = target["labels"]
                pred_boxes = output["boxes"]
                pred_labels = output["labels"]

                total_boxes += len(target_boxes)

                # Create a matching between predicted and target boxes
                for i, pred_box in enumerate(pred_boxes):
                    pred_label = pred_labels[i]
                    matching_indices = (target_labels == pred_label).nonzero(as_tuple=True)[0]

                    if len(matching_indices) > 0:
                        ious = box_iou(pred_box.unsqueeze(0), target_boxes[matching_indices]).squeeze(0)
                        max_iou, max_idx = ious.max(dim=0)
                        if max_iou.item() > 0.5:
                            correct_boxes += 1
                            # Remove the matched box to avoid double counting
                            target_boxes = torch.cat((target_boxes[:max_idx], target_boxes[max_idx+1:]))
                            target_labels = torch.cat((target_labels[:max_idx], target_labels[max_idx+1:]))

    accuracy = correct_boxes / total_boxes if total_boxes > 0 else 0
    return accuracy

In [None]:
#mAP 계산 함수 정의

def calculate_map(detections, annotations, iou_threshold=0.5):
    average_precisions = []
    # Assume labels are from 1 to num_classes
    num_classes = max([np.max(d['labels']) for d in detections]) if detections else 0

    for label in range(1, num_classes + 1):
        true_positives = []
        scores = []
        num_ground_truths = 0

        for detection, annotation in zip(detections, annotations):
            ground_truths = annotation['boxes'][annotation['labels'] == label]
            num_ground_truths += len(ground_truths)
            detected = []

            for box, score, pred_label in zip(detection['boxes'], detection['scores'], detection['labels']):
                if pred_label == label:
                    scores.append(score)
                    if ground_truths.size > 0:
                        ious = box_iou(torch.tensor(box).unsqueeze(0), torch.tensor(ground_truths))
                        max_iou = ious.max().item()
                        if max_iou > iou_threshold and max_iou not in detected:
                            true_positives.append(1)
                            detected.append(max_iou)
                        else:
                            true_positives.append(0)
                    else:
                        true_positives.append(0)

        # Sort by scores
        indices = np.argsort(-np.array(scores))
        true_positives = np.array(true_positives)[indices]
        tp_cumsum = np.cumsum(true_positives)
        fp_cumsum = np.cumsum(1 - true_positives)
        recalls = tp_cumsum / (num_ground_truths + np.finfo(np.float64).eps)
        precisions = tp_cumsum / (tp_cumsum + fp_cumsum + np.finfo(np.float64).eps)

        # Calculate average precision
        ap = np.trapz(precisions, recalls)
        average_precisions.append(ap)

    mAP = np.mean(average_precisions) if average_precisions else 0
    return mAP

In [None]:
#평가지표 출력

import torch
from torchvision.ops import box_iou
import numpy as np

def evaluate_index(model, data_loader, device):
    model.eval()
    total_boxes = 0
    true_positives = 0
    false_positives = 0
    false_negatives = 0
    all_detections = []
    all_annotations = []

    with torch.no_grad():
        for images, targets in data_loader:
            images = list(image.to(device) for image in images)
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
            outputs = model(images)

            for target, output in zip(targets, outputs):
                target_boxes = target["boxes"]
                target_labels = target["labels"]
                pred_boxes = output["boxes"]
                pred_labels = output["labels"]

                total_boxes += len(target_boxes)

                # Create a matching between predicted and target boxes
                for i, pred_box in enumerate(pred_boxes):
                    pred_label = pred_labels[i]
                    matching_indices = (target_labels == pred_label).nonzero(as_tuple=True)[0]

                    if len(matching_indices) > 0:
                        ious = box_iou(pred_box.unsqueeze(0), target_boxes[matching_indices]).squeeze(0)
                        max_iou, max_idx = ious.max(dim=0)
                        if max_iou.item() > 0.5:
                            true_positives += 1
                            # Remove the matched box to avoid double counting
                            target_boxes = torch.cat((target_boxes[:max_idx], target_boxes[max_idx+1:]))
                            target_labels = torch.cat((target_labels[:max_idx], target_labels[max_idx+1:]))
                        else:
                            false_positives += 1
                    else:
                        false_positives += 1

                false_negatives += len(target_boxes)

                # Collect all annotations and detections for mAP calculation
                annotations = {
                    'boxes': target['boxes'].cpu().numpy(),
                    'labels': target['labels'].cpu().numpy()
                }
                detections = {
                    'boxes': output['boxes'].cpu().numpy(),
                    'labels': output['labels'].cpu().numpy(),
                    'scores': output['scores'].cpu().numpy()
                }
                all_annotations.append(annotations)
                all_detections.append(detections)

    precision = true_positives / (true_positives + false_positives) if true_positives + false_positives > 0 else 0
    recall = true_positives / (true_positives + false_negatives) if true_positives + false_negatives > 0 else 0
    mAP = calculate_map(all_detections, all_annotations)

    return precision, recall, mAP

In [None]:
#optimizer 정의
params = [p for p in model.parameters() if p.requires_grad]
optimizer = optim.SGD(params, lr=0.0005, momentum=0.9, weight_decay=0.0005)

In [None]:
# @title
#모델 학습 루프
num_epochs = 10
for epoch in range(num_epochs):
    train_loss = train_one_epoch(model, train_loader, optimizer, device, epoch)
    print(f"Epoch {epoch}: Train Loss: {train_loss:.4f}")

Epoch 0:  37%|███▋      | 338/907 [1:24:31<2:16:53, 14.43s/it]

In [None]:
#cache 비우기
import torch, gc
gc.collect()
torch.cuda.empty_cache()

## 4. Model 저장 및 학습 결과 출력

In [None]:
# Save the model
torch.save(model.state_dict(), '/content/drive/MyDrive/Colab Notebooks/detection_model_newpre.pth')

In [None]:
#평가지표 출력
precision, recall, mAP = evaluate_index(model, valid_loader, device)
print(f"precision: {precision:.4f}")
print(f"recall: {recall:.4f}")
print(f"mAP : {mAP:.4f}")

## cf) Loading saved model

In [None]:
# Load the model for testing
model.load_state_dict(torch.load('/content/drive/MyDrive/Colab Notebooks/detection_model_newpre.pth'))

precision, recall, mAP = evaluate_index(model, valid_loader, device)
print(f"precision: {precision:.4f}")
print(f"recall: {recall:.4f}")
print(f"mAP : {mAP:.4f}")

precision: 0.0521
recall: 0.8198
mAP : 0.5273
