<a href="https://colab.research.google.com/github/haewonChun/python_algorithms/blob/main/%5B%EB%AC%B8%EC%A0%9C%5DChapter_3_Object_Detection_%EB%AA%A8%EB%8D%B8_%EA%B5%AC%ED%98%84%ED%95%98%EA%B8%B0_ipynb%EC%9D%98_%EC%82%AC%EB%B3%B8.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 주제: Object Detection Model 구현하기

## 실습 가이드
    1. 데이터를 다운로드하여 Colab에 불러옵니다.
    2. 필요한 라이브러리는 import 가능합니다.
    3. 코드는 위에서부터 아래로 순서대로 실행합니다.

## 데이터 소개
    - 이번 주제는 MS-COCO 2017 dataset을 사용합니다.
    - 이번에 사용하는 MS-COCO dataset은 tfrecord 형태로 저장되어 있으며, 내부에 다양한 정보를 담고 있습니다.
      그 중에 이번 주제와 관련된 내용들은 다음과 같습니다.

    1. image
      - 각 image의 pixel 값(3차원 tensor)
      - image/filename: image file의 이름
      - image/id: image file의 id(file명에서 확장자 제외한 부분)      
      
    2. objects
      - objects/area: 각 bounding box들의 면적
      - objects/id: 각 bounding box의 id
      - objects/bbox: 각 bounding box의 좌표(ymax, xmax, ymin, xmin)
      - objects/label: 각 bounding box에 대한 classification 정답

- 원본 데이터 출처: https://cocodataset.org/#home
- data format 참고: https://cocodataset.org/#format-data

## 문제 소개
    - 이번 실습에서는 1-stage object detection model인 RetinaNet을 직접 만들고 학습시켜보도록 하겠습니다.

## RetinaNet
    - Paper: https://arxiv.org/abs/1708.02002
    - RetinaNet에 대한 강의가 따로 준비되어 있으니 강의를 시청하시고 문제를 풀어보시기 바랍니다.

## 최종 목표    
    - Object Detection Model에 대한 이해
    - Object Detection Model 구현

- 출제자: 이진원 강사

## Step 1. Data 다운로드 및 확인

In [None]:
## library를 import 합니다
## 추가로 필요한 library가 있으면 추가로 import 해도 좋습니다
import os
import re
import zipfile

import numpy as np
import tensorflow as tf
from tensorflow import keras

import matplotlib.pyplot as plt
import tensorflow_datasets as tfds

import gdown

In [None]:
## Hyper parameter 및 기타 설정
num_classes = 80
batch_size = 2

learning_rates = [2.5e-06, 0.000625, 0.00125, 0.0025, 0.00025, 2.5e-05]
learning_rate_boundaries = [125, 250, 500, 240000, 360000]
learning_rate_fn = tf.optimizers.schedules.PiecewiseConstantDecay(
    boundaries=learning_rate_boundaries, values=learning_rates
)

## ckpt 저장할 directory
model_dir = "retinanet"

### 문제 1. Data 불러오기

    - data는 아래 url(google drive)에 저장되어 있습니다.(zip 파일)
    - gdown library를 이용하여 data를 다운받고, zip파일 압축을 풀어줍니다.
    - 압축 파일은 data라는 이름의 directory에 풀도록 합니다.

In [None]:
url = "https://drive.google.com/uc?id=1vIHxg4fLsK7Vn1NnZ_toK4akqpT5ZoKM"

In [None]:
## data download 받고(gdown.download 사용) 압축 풀기
##### CODE HERE #####

    - 위에서 다운받은 파일은 실제 MS-COCO data의 일부만을 포함하고 있습니다.
    - 아래의 cell을 실행하면 이 data를 이용하여 dataset을 만들 수 있습니다.
    - MS-COCO data 전체가 필요한 경우 아래 data_dir='data' 부분을 data_dir=None 으로 변경하면 됩니다.


In [None]:
(train_dataset, val_dataset), dataset_info = tfds.load(
    "coco/2017", split=["train", "validation"], with_info=True, data_dir='data'
)

    - dataset_info를 통해서 data에 관한 어떤 정보들이 있는지 확인해봅시다.

In [None]:
dataset_info

    - tfds.show_examples를 활용하여 data에 포함된 image를 출력해봅시다.

In [None]:
tfds.show_examples(train_dataset, dataset_info)

### 문제 2. Dataset 내부 item 확인
    - train_dataset에서 data를 하나 꺼내서 내부 item을 print 문으로 확인해봅시다.

In [None]:
##### CODE HERE #####

### 문제 3. Data 직접 확인하기
    - 문제 2와 같이 dataset에서 1개의 data를 가져온 후,
      image와 bounding box를 화면에 함께 출력해봅시다.
    - data에 저장된 bounding box의 좌표는 box의 양쪽 모서리의 좌표이며,
      (ymin, xmin, ymax, xmax)의 순서로 되어 있습니다.    

In [None]:
for data in train_dataset.take(1):
  image = np.array(data['image'], dtype=np.uint8)
  plt.figure(figsize=(8,8))
  plt.axis('off')
  plt.imshow(image)
  ax = plt.gca()
  boxes = data['objects']['bbox']

  ##### CODE HERE #####

  plt.show()

## Step 2. Data Augmentation
이번 Step에서는 data augmentation을 구현해보도록 하겠습니다.

    - 행렬과 같은 tensor의 경우 행->열 순서로 되어 있기 때문에 (y좌표, x좌표) 순서로 저장되어 있습니다.
    - (x좌표, y좌표) 순서가 더 직관적으로 이해하기 쉬우므로 저장된 x, y좌표의 순서를 바꿔주는 함수를 만들어봅시다.
    - 함수의 입력은 (N, 4) shape의 bounding box 정보이며,
      bounding box 좌표 정보는 양쪽 모서리의 x, y 좌표값으로 되어 있다고 가정합니다.

In [None]:
def swap_xy(boxes):
    return tf.stack([boxes[:, 1], boxes[:, 0], boxes[:, 3], boxes[:, 2]], axis=-1)

### 문제 4. Bounding box format 변경하는 함수 만들기 1
    - bounding box의 위치 정보에 대한 format을 변경하는 함수를 만들어 봅시다.
    - (xmin, ymin, xmax, ymax) 형태의 bounding box format을 (x, y, w, h)로 바꿔서 반환합니다.
    - 이 때 x, y는 bounding box의 center 좌표를 의미합니다.
    - 함수의 입력은 (..., N, 4)와 같이 2차원 이상의 tensor입니다.

In [None]:
def convert_to_xywh(boxes):
    ##### CODE HERE #####

In [None]:
import torch

def xyxy_to_xywh(boxes):
    """
    입력으로 받은 bounding box 좌표를 (x, y, w, h) 형태로 변경하는 함수.

    Args:
        boxes (torch.Tensor): bounding box 좌표를 포함한 텐서. (..., N, 4) 형태여야 함.

    Returns:
        torch.Tensor: 변경된 bounding box 좌표를 포함한 텐서. (..., N, 4) 형태.
    """
    # bounding box의 xmin, ymin, xmax, ymax를 추출
    xmin = boxes[..., 0:1]
    ymin = boxes[..., 1:2]
    xmax = boxes[..., 2:3]
    ymax = boxes[..., 3:4]

    # bounding box의 중심 좌표 (x, y) 계산
    x = (xmin + xmax) / 2
    y = (ymin + ymax) / 2

    # bounding box의 너비 (w)와 높이 (h) 계산
    w = xmax - xmin
    h = ymax - ymin

    # 변경된 (x, y, w, h) 형태의 bounding box 좌표를 반환
    return torch.cat((x, y, w, h), dim=-1)

# 예제 사용법
# (xmin, ymin, xmax, ymax) 형태의 bounding box 좌표를 가진 텐서 생성
bounding_boxes = torch.tensor([[[0, 1, 3, 4], [2, 2, 5, 6]]], dtype=torch.float32)

# 함수를 사용하여 좌표를 변경
new_boxes = xyxy_to_xywh(bounding_boxes)

# 변경된 좌표 출력
print(new_boxes)


### 문제 5. Bounding box format 변경하는 함수 만들기 2
    - bounding box의 위치 정보에 대한 format을 변경하는 함수를 만들어 봅시다.
    - (x, y, w, h) 형태의 bounding box format을 (xmin, ymin, xmax, ymax)로 바꿔서 반환합니다.
    - 이 때 x, y는 bounding box의 center 좌표를 의미합니다.
    - 함수의 입력은 (..., N, 4)와 같이 2차원 이상의 tensor입니다.

In [None]:
def convert_to_corners(boxes):
    ##### CODE HERE #####

In [None]:
import torch

def xywh_to_xyxy(boxes):
    """
    입력으로 받은 bounding box 좌표를 (xmin, ymin, xmax, ymax) 형태로 변경하는 함수.

    Args:
        boxes (torch.Tensor): bounding box 좌표를 포함한 텐서. (..., N, 4) 형태여야 함.

    Returns:
        torch.Tensor: 변경된 bounding box 좌표를 포함한 텐서. (..., N, 4) 형태.
    """
    # bounding box의 중심 좌표 (x, y) 계산
    x = boxes[..., 0:1]
    y = boxes[..., 1:2]

    # bounding box의 너비 (w)와 높이 (h) 계산
    w = boxes[..., 2:3]
    h = boxes[..., 3:4]

    # (xmin, ymin, xmax, ymax) 형태로 변경
    xmin = x - (w / 2)
    ymin = y - (h / 2)
    xmax = x + (w / 2)
    ymax = y + (h / 2)

    # 변경된 (xmin, ymin, xmax, ymax) 형태의 bounding box 좌표를 반환
    return torch.cat((xmin, ymin, xmax, ymax), dim=-1)

# 예제 사용법
# (x, y, w, h) 형태의 bounding box 좌표를 가진 텐서 생성
bounding_boxes = torch.tensor([[[1, 2, 2, 3], [3, 4, 2, 2]]], dtype=torch.float32)

# 함수를 사용하여 좌표를 변경
new_boxes = xywh_to_xyxy(bounding_boxes)

# 변경된 좌표 출력
print(new_boxes)


### 문제 6. Image resizing 함수 만들기
    - 짧은 변을 min_side와 같게 resize 합니다
    - 만약에 긴 변의 길이가 max_side보다 클 경우에는 긴 변이 max_side와 같아지도록 다시 resize 합니다
    - image size(가로, 세로 모두)가 stride의 배수가 아닐 경우 stride의 배수가 되도록 오른쪽과 아래쪽에 0을 채웁니다(zero padding)
    - 함수의 입력값은 다음과 같습니다.
      1. image: 3차원 tensor로 이루어진 image(feature map)의 pixel 값(height, width, channel)
      2. min_side: resize에 사용할 짧은 변 길이
      3. max_side: resize에 사용할 긴 변 길이
      4. stride: 1번 입력인 image(feature map)의 1 pixel이 실제 원본 image에서 몇 pixel에 해당되는지
    - 함수의 반환값은 3가지이고 각각 다음과 같습니다.
      1. resize 및 padding된 image(feature map)의 pixel 값
      2. padding 하기 전의 image size
      3. padding 하기 전 image와 원본 image의 확대/축소 비율(resize 후/resize 전)


In [None]:
def resize_and_pad_image(image, min_side=800.0, max_side=1333.0, stride=128.0):
    image_shape = tf.cast(tf.shape(image)[:2], dtype=tf.float32)
    ratio = min_side / tf.reduce_min(image_shape)

    ##### CODE HERE #####

    return image, image_shape, ratio

In [None]:
import torch
import math

def resize_and_pad_image(image, min_side, max_side, stride):
    """
    이미지를 resize하고 padding하여 새로운 이미지와 관련된 정보를 반환하는 함수.

    Args:
        image (torch.Tensor): 3차원 텐서로 이루어진 이미지(feature map)의 pixel 값 (height, width, channel).
        min_side (int): 짧은 변의 길이로 resize에 사용할 값.
        max_side (int): 긴 변의 길이로 resize에 사용할 값.
        stride (int): 입력 이미지의 1 pixel이 실제 원본 이미지에서 몇 pixel에 해당되는지.

    Returns:
        torch.Tensor: resize 및 padding된 이미지(feature map)의 pixel 값.
        tuple: padding 하기 전의 원본 이미지 크기 (height, width).
        tuple: padding 하기 전 이미지와 원본 이미지의 확대/축소 비율 (resize 후/resize 전).
    """
    # 원본 이미지 크기
    original_height, original_width = image.shape[0], image.shape[1]

    # 짧은 변을 min_side와 같게 resize
    scale = min_side / min(original_height, original_width)
    new_height = int(round(original_height * scale))
    new_width = int(round(original_width * scale))
    image = torch.nn.functional.interpolate(image.unsqueeze(0), size=(new_height, new_width), mode='bilinear', align_corners=False)
    image = image.squeeze(0)

    # 만약에 긴 변의 길이가 max_side보다 클 경우 긴 변이 max_side와 같아지도록 다시 resize
    if max(new_height, new_width) > max_side:
        scale = max_side / max(new_height, new_width)
        new_height = int(round(new_height * scale))
        new_width = int(round(new_width * scale))
        image = torch.nn.functional.interpolate(image.unsqueeze(0), size=(new_height, new_width), mode='bilinear', align_corners=False)
        image = image.squeeze(0)

    # image size(가로, 세로 모두)가 stride의 배수가 아닐 경우 padding
    pad_height = int(math.ceil(new_height / stride)) * stride
    pad_width = int(math.ceil(new_width / stride)) * stride
    padding_height = pad_height - new_height
    padding_width = pad_width - new_width

    # 오른쪽과 아래쪽에 0을 채움
    image = torch.nn.functional.pad(image, (0, padding_width, 0, padding_height), mode='constant', value=0)

    # padding 하기 전의 원본 이미지 크기와 확대/축소 비율 계산
    original_size = (original_height, original_width)
    scale_factors = (pad_height / original_height, pad_width / original_width)

    return image, original_size, scale_factors

# 예제 사용법
# 이미지를 3차원 텐서로 나타낸다고 가정하고 이를 입력으로 사용
image = torch.randn((256, 384, 3))  # 예제 이미지 (높이: 256, 너비: 384, 채널: 3)

min_side = 200
max_side = 400
stride = 32

resized_image, original_size, scale_factors = resize_and_pad_image(image, min_side, max_side, stride)

# 결과 출력
print("Resize 및 Padding된 이미지 크기:", resized_image.shape)
print("원본 이미지 크기:", original_size)
print("확대/축소 비율 (resize 후/resize 전):", scale_factors)


### 문제 7. Horizontal flip 함수 만들기
    - 50%의 확률로 image를 좌우반전합니다
    - 이 때 bounding box의 좌표도 좌우반전에 맞게 변경합니다
    - 입력 image는 3차원 tensor로 (height, width, channel)로 구성되어 있습니다.
    - bounding box의 좌표는 (xmin, ymin, xmax, ymax)로 구성되어 있고, 각각의 좌표는 0~1 사이 값으로 normalized 되어 있다고 가정합니다.
    - 함수의 반환값은 image의 pixel 값, bounding box의 좌표로 합니다.

In [None]:
def random_flip_horizontal(image, boxes):
    ##### CODE HERE #####
    return image, boxes

In [None]:
import torch
import random

def horizontal_flip(image, boxes):
    """
    입력 이미지와 bounding box 좌표를 좌우반전하는 함수.

    Args:
        image (torch.Tensor): 3차원 텐서로 이루어진 이미지(pixel 값) (height, width, channel).
        boxes (torch.Tensor): bounding box 좌표를 포함한 텐서 (N, 4), 각각의 좌표는 0~1 사이 값으로 normalized 됨.

    Returns:
        torch.Tensor: 좌우반전된 이미지(pixel 값).
        torch.Tensor: 좌우반전된 bounding box 좌표.
    """
    if random.random() < 0.5:
        # 이미지 좌우반전
        image = torch.flip(image, [-1])

        # bounding box 좌표도 좌우반전에 맞게 변경
        xmin, ymin, xmax, ymax = torch.chunk(boxes, 4, dim=1)
        new_xmin = 1.0 - xmax
        new_xmax = 1.0 - xmin
        boxes = torch.cat((new_xmin, ymin, new_xmax, ymax), dim=1)

    return image, boxes

# 예제 사용법
# 이미지를 3차원 텐서로 나타낸다고 가정하고 이를 입력으로 사용
image = torch.randn((256, 384, 3))  # 예제 이미지 (높이: 256, 너비: 384, 채널: 3)

# bounding box 좌표 (xmin, ymin, xmax, ymax)를 포함한 텐서 (0~1 사이 값으로 normalized)
boxes = torch.tensor([[0.1, 0.2, 0.4, 0.6], [0.3, 0.4, 0.6, 0.7]], dtype=torch.float32)

flipped_image, flipped_boxes = horizontal_flip(image, boxes)

# 결과 출력
print("좌우반전된 이미지 크기:", flipped_image.shape)
print("좌우반전된 bounding box 좌표:", flipped_boxes)


### 문제 8. Augmentation 함수 만들기
    - tf.data(dataset)에 map으로 적용할 수 있도록 함수를 만듭니다.
    - 입력으로 dataset의 item을 받습니다
    - 위에서 작성한 random_flip_horizontal 함수를 먼저 적용하고, 다음으로 resize_and_pad_image 함수를 적용합니다.
    - 0~1 사이 값으로 normailized된 bounding box 좌표를 실제 image size에 맞게 조정합니다.
    - bounding box 좌표를 (xmin, ymin, xmax, ymax) 형태에서 (x, y, w, h) 형태로 변경합니다.
    - 함수의 반환값은 image의 pixel 값, bounding box의 좌표, class id로 합니다.

In [None]:
def preprocess_data(sample):
    image = sample["image"]
    bbox = swap_xy(sample["objects"]["bbox"])
    class_id = tf.cast(sample["objects"]["label"], dtype=tf.int32)

    ##### CODE HERE #####

    return image, bbox, class_id

In [None]:
import torch

def process_dataset_item(item, min_side, max_side, stride):
    """
    dataset의 item을 받아 처리하고 이미지와 bounding box를 반환하는 함수.

    Args:
        item (tuple): dataset에서 반환되는 item으로 (image, boxes, class_id) 형태로 구성됨.
        min_side (int): resize에 사용할 짧은 변의 길이.
        max_side (int): resize에 사용할 긴 변의 길이.
        stride (int): 입력 이미지의 1 pixel이 실제 원본 이미지에서 몇 pixel에 해당되는지.

    Returns:
        torch.Tensor: 처리된 이미지(pixel 값).
        torch.Tensor: 처리된 bounding box 좌표 (x, y, w, h).
        int: class id.
    """
    image, boxes, class_id = item

    # 이미지를 50% 확률로 좌우반전
    image, boxes = horizontal_flip(image, boxes)

    # resize 및 padding된 이미지 생성
    resized_image, original_size, scale_factors = resize_and_pad_image(image, min_side, max_side, stride)

    # normalized된 bounding box 좌표를 원본 이미지 크기에 맞게 조정
    boxes[:, 0::2] *= original_size[1]  # x 좌표 조정
    boxes[:, 1::2] *= original_size[0]  # y 좌표 조정

    # bounding box 좌표를 (xmin, ymin, xmax, ymax) 형태에서 (x, y, w, h) 형태로 변경
    boxes[:, 2] = boxes[:, 2] - boxes[:, 0]  # w 계산
    boxes[:, 3] = boxes[:, 3] - boxes[:, 1]  # h 계산

    return resized_image, boxes, class_id

# 예제 사용법
# item은 (image, boxes, class_id) 형태의 튜플로 가정합니다.
item = (torch.randn((256, 384, 3)), torch.tensor([[0.1, 0.2, 0.4, 0.6], [0.3, 0.4, 0.6, 0.7]], dtype=torch.float32), 1)

min_side = 200
max_side = 400
stride = 32

processed_image, processed_boxes, class_id = process_dataset_item(item, min_side, max_side, stride)

# 결과 출력
print("처리된 이미지 크기:", processed_image.shape)
print("처리된 bounding box 좌표:", processed_boxes)
print("class id:", class_id)


## Step 3. Anchor Box 정보 만들기
AnchorBox class를 만들고, 모든 anchor box의 (x, y, w, h) 정보를 생성해봅시다.

      * _compute_dims: 각 level별로 ahcnor box의 (w,h)를 계산하여 반환합니다.
      * _get_anchors: 각 level별로 anchor box의 (x,y,w,h)를 계산하여 반환합니다. 이 때 return shape은 (height*width*9, 4)입니다.
      * get_anchors: _get_anchors의 level별 anchor box정보를 모두 합쳐서 최종 결과를 반환합니다.

    - AnchorBox class를 만들고, get_anchors method를 call하면 모든 anchor box의 (x,y,w,h) 좌표를 반환하도록 합니다.    
    - P3-P7까지의 모든 level의 anchor box의 (x,y,w,h) 좌표가 반환되어야 합니다.
    - 계산의 편의를 위하여 먼저 각 level의 anchor box를 (height, width, 9, 4)의 shape을 갖는 tensor로 만듭니다.
      여기서 height, width는 각 level의 feature map size를 의미하며, 9는 각 level의 anchor box 갯수이며 4는 (x,y,w,h)를 말합니다.
    - 최종적으로 반환되는 값은 (5*height*width*9, 4)의 형태가 되며 맨 앞의 5는 level의 갯수(P3, P4, P5, P6, P7)을 의미합니다.

In [None]:
import torch

class AnchorBox:
    def __init__(self):
        self.scales = [2 ** 0, 2 ** (1 / 3), 2 ** (2 / 3)]
        self.ratios = [0.5, 1, 2]
        self.base_sizes = [8, 16, 32, 64, 128]

    def _compute_dims(self, level):
        aspect_ratios = self.ratios
        scales = self.scales
        num_anchors = len(aspect_ratios) * len(scales)

        anchor_dims = []
        for scale in scales:
            for ratio in aspect_ratios:
                w = scale * ratio
                h = scale / ratio
                anchor_dims.append((w, h))

        anchor_dims = torch.tensor(anchor_dims) * self.base_sizes[level]
        return anchor_dims

    def _get_anchors(self, level, feature_height, feature_width):
        anchor_dims = self._compute_dims(level)
        shift_x = torch.arange(0, feature_width) * self.base_sizes[level]
        shift_y = torch.arange(0, feature_height) * self.base_sizes[level]
        shift_x, shift_y = torch.meshgrid(shift_x, shift_y)

        shifts = torch.stack((shift_x, shift_y, shift_x, shift_y), dim=-1)
        shifts = shifts.unsqueeze(2).expand(-1, -1, anchor_dims.shape[0], -1)

        anchors = shifts + anchor_dims.view(1, 1, -1, 4)
        return anchors.view(-1, 4)

    def get_anchors(self, image_size):
        anchors = []
        for level, base_size in enumerate(self.base_sizes):
            feature_height = image_size[0] // base_size
            feature_width = image_size[1] // base_size
            level_anchors = self._get_anchors(level, feature_height, feature_width)
            anchors.append(level_anchors)

        return torch.cat(anchors, dim=0)

# 예제 사용법
anchor_generator = AnchorBox()
image_size = (800, 800)  # 이미지 크기 (높이, 너비)
anchors = anchor_generator.get_anchors(image_size)

# 결과 출력
print("전체 anchor box의 수:", anchors.shape[0])
print("각 anchor box의 (x, y, w, h) 좌표 예시:")
print(anchors[:10])  # 처음 10개의 anchor box 출력


Class 내부의 method는 다음과 같습니다

### \_\_init\_\_ method
    - __init__ : anchor box 정보 계산을 위한 기본 값들 setting

### 문제 9. _compute_dims method
    - _compute_dims method를 만들어봅시다.
    - _compute_dims는 모든 level(P3-P7)에 대하여 각 level별로 anchor box의 (w, h)를 계산하여 반환하는 역할을 합니다.
    - anchor_dims_all list에 해당 정보가 저장되며 list의 원소는 각각 (1,1,9,2)의 shape을 갖게 됩니다.

### 문제 10. _get_anchors method
    - _get_anchors method를 만들어봅시다.
    - _get_anchors method는 각 level별로 anchor box의 (x,y,w,h)를 계산하여 반환합니다.
    - 반환값의 shape은 (height*width*9, 4)입니다.

### 문제 11. get_anchors method
    - get_anchors method를 만들어봅시다.
    - get_anchors method는 문제 9의 _get_anchors 를 통해서 level별 anchor box의 정보를 받아 이를 모두 합쳐서 최종 결과를 반환합니다.
    - 반환값의 shape은 (5*height*width*9, 4)입니다.

In [None]:
class AnchorBox:
    def __init__(self):
        self.aspect_ratios = [0.5, 1.0, 2.0]
        self.scales = [2 ** x for x in [0, 1 / 3, 2 / 3]]

        self._num_anchors = len(self.aspect_ratios) * len(self.scales)
        self._strides = [2 ** i for i in range(3, 8)]
        self._areas = [x ** 2 for x in [32.0, 64.0, 128.0, 256.0, 512.0]]
        self._anchor_dims = self._compute_dims()

    def _compute_dims(self):
        anchor_dims_all = []
        for area in self._areas:
            anchor_dims = []
            for ratio in self.aspect_ratios:
                anchor_height = tf.math.sqrt(area / ratio)

                ##### CODE HERE #####

                for scale in self.scales:
                    anchor_dims.append(scale * dims)
            anchor_dims_all.append(tf.stack(anchor_dims, axis=-2))
        return anchor_dims_all

    def _get_anchors(self, feature_height, feature_width, level):
        rx = tf.range(feature_width, dtype=tf.float32) + 0.5
        ry = tf.range(feature_height, dtype=tf.float32) + 0.5

        ##### CODE HERE #####

        return tf.reshape(
            anchors, [feature_height * feature_width * self._num_anchors, 4]
        )

    def get_anchors(self, image_height, image_width):
        anchors = [
            ##### CODE HERE #####
            for i in range(3, 8)
        ]
        return tf.concat(anchors, axis=0)

## Step 4. Label Encoding
anchor box 정보와 정답(groundtruth) box 정보, class id를 이용하여 detection에서 사용할 label을 만들어봅시다.

### 문제 12. IOU 계산 함수 만들기

    - 이 함수는 2개의 bounding box 그룹들(boxes1, boxes2) 간에 iou를 계산합니다
    - 모든 anchor box와 gound truth box 간의 iou를 계산하여,
      anchor box들을 positive, negative, ignore로 구분하기 위해서 사용합니다
    - 각 bounding box의 좌표 정보는 (x,y,w,h) 형태로 입력받습니다.
    - boxes1의 shape은 (N, 4)이고 boxes2의 shape은 (M, 4)라고 할 때, 출력은 (N, M) shape의 tensor가 됩니다.

In [None]:
def compute_iou(boxes1, boxes2):
    boxes1_corners = convert_to_corners(boxes1)
    boxes2_corners = convert_to_corners(boxes2)

    ##### CODE HERE #####

    return tf.clip_by_value(intersection_area / union_area, 0.0, 1.0)

In [None]:
import torch

def calculate_iou(boxes1, boxes2):
    """
    두 개의 bounding box 그룹 간의 IoU(intersection over union)를 계산하는 함수.

    Args:
        boxes1 (torch.Tensor): bounding box 그룹 1의 좌표 정보 (N, 4), N은 bounding box의 개수.
        boxes2 (torch.Tensor): bounding box 그룹 2의 좌표 정보 (M, 4), M은 bounding box의 개수.

    Returns:
        torch.Tensor: 각 조합에 대한 IoU 값 (N, M).
    """
    # 각 bounding box 그룹의 좌표 정보를 (x1, y1, x2, y2) 형태로 변경
    x1_1, y1_1, x2_1, y2_1 = boxes1[:, 0], boxes1[:, 1], boxes1[:, 0] + boxes1[:, 2], boxes1[:, 1] + boxes1[:, 3]
    x1_2, y1_2, x2_2, y2_2 = boxes2[:, 0], boxes2[:, 1], boxes2[:, 0] + boxes2[:, 2], boxes2[:, 1] + boxes2[:, 3]

    # IoU를 계산하기 위한 교차 영역 계산
    x1_intersection = torch.max(x1_1.unsqueeze(1), x1_2)
    y1_intersection = torch.max(y1_1.unsqueeze(1), y1_2)
    x2_intersection = torch.min(x2_1.unsqueeze(1), x2_2)
    y2_intersection = torch.min(y2_1.unsqueeze(1), y2_2)

    # 교차 영역의 너비와 높이 계산 (clamp 함수를 사용하여 음수 값은 0으로 만듦)
    intersection_width = torch.clamp(x2_intersection - x1_intersection, min=0)
    intersection_height = torch.clamp(y2_intersection - y1_intersection, min=0)

    # 교차 영역의 면적 계산
    intersection_area = intersection_width * intersection_height

    # 각 bounding box의 면적 계산
    area1 = boxes1[:, 2] * boxes1[:, 3]
    area2 = boxes2[:, 2] * boxes2[:, 3]

    # IoU 계산
    iou = intersection_area / (area1.unsqueeze(1) + area2 - intersection_area)

    return iou

# 예제 사용법
boxes1 = torch.tensor([[0, 0, 2, 2], [3, 3, 5, 5]], dtype=torch.float32)
boxes2 = torch.tensor([[1, 1, 3, 3], [4, 4, 6, 6]], dtype=torch.float32)

iou_matrix = calculate_iou(boxes1, boxes2)

# 결과 출력
print(iou_matrix)


### LabelEncoder Class
이제 LabelEncoder class를 작성해보겠습니다.
이 class에서는 앞에서 생성한 anchor box 정보와 groundtruth box 정보, class id를 이용하여 실제 학습에 사용할 label을 생성해줍니다.
해당 class의 method는 다음과 같습니다.

### \_\_init\_\_ method

    - AnchorBox class instance 생성
    - box normalize를 위한 variance
    


### 문제 13. _match_anchor_boxes method
    - IOU를 기반으로 gt box를 anchor box에 matching하는 역할을 합니다.
    - M개의 anchor box와 N개의 gt box에 대하여 계산된 MxN의 iou matrix를 이용하여 각 행에서 IOU의 최대값을 찾습니다.
    - 이 때 최대 IOU에 해당되는 index도 저장합니다.(matched_gt_index)
    - 최대 IOU 값이 match_iou 이상이면 positive, ignore_iou 미만이면 negative, 나머지는 ignore로 처리하여
      각 anchor box마다 positive, negative, ignore 여부를 알 수 있는 postive_mask, negative mask, ignore_mask를 생성합니다.
    - 입력값은 anchor box의 (x,y,w,h)정보, gt box의 (x,y,w,h)정보, match_iou, ignore_iou 입니다.
    - 반환값은 matched_gt_index, positive_mask, ignore_mask 입니다.

In [None]:
import torch

class AnchorBox:
    # AnchorBox 클래스의 나머지 부분은 이전 예제에서 사용한 코드를 그대로 사용합니다.

def match_anchors_with_gt(anchors, gt_boxes, match_iou, ignore_iou):
    """
    Anchor box와 GT box 간의 매칭을 수행하여 positive, negative, ignore 여부를 판단하고
    matched_gt_index, positive_mask, ignore_mask를 반환하는 함수.

    Args:
        anchors (torch.Tensor): Anchor box의 (x, y, w, h) 좌표 정보 (M, 4), M은 Anchor box의 개수.
        gt_boxes (torch.Tensor): GT box의 (x, y, w, h) 좌표 정보 (N, 4), N은 GT box의 개수.
        match_iou (float): positive로 매칭할 IoU 임계값.
        ignore_iou (float): ignore으로 처리할 IoU 임계값.

    Returns:
        torch.Tensor: 매칭된 GT box의 인덱스 (M,).
        torch.Tensor: positive mask (M,), positive로 매칭된 anchor는 True, 그렇지 않은 경우 False.
        torch.Tensor: ignore mask (M,), ignore로 처리된 anchor는 True, 그렇지 않은 경우 False.
    """
    iou_matrix = calculate_iou(anchors, gt_boxes)

    # 각 anchor에 대해 최대 IoU와 그에 해당하는 GT box의 인덱스를 찾음
    max_iou, matched_gt_index = iou_matrix.max(dim=1)

    # positive, negative, ignore mask 생성
    positive_mask = max_iou >= match_iou
    negative_mask = max_iou < ignore_iou
    ignore_mask = ~positive_mask & ~negative_mask

    return matched_gt_index, positive_mask, ignore_mask

# 예제 사용법
anchor_generator = AnchorBox()
image_size = (800, 800)  # 이미지 크기 (높이, 너비)
anchors = anchor_generator.get_anchors(image_size)

# GT box의 예시 (x, y, w, h 형태)
gt_boxes = torch.tensor([[100, 100, 50, 50], [300, 300, 80, 80]], dtype=torch.float32)

match_iou = 0.5  # positive로 매칭할 IoU 임계값
ignore_iou = 0.4  # ignore으로 처리할 IoU 임계값

matched_gt_index, positive_mask, ignore_mask = match_anchors_with_gt(anchors, gt_boxes, match_iou, ignore_iou)

# 결과 출력
print("매칭된 GT box 인덱스:", matched_gt_index)
print("Positive mask:", positive_mask)
print("Ignore mask:", ignore_mask)


### 문제 14. _compute_box_target method
    - 각 anchor box에 대한 transform 값을 계산합니다.
    - transform은 gt box와 같아지기 위해서 anchor box의 x,y,w,h를 얼마나 변형해야 하는가에 대한 값입니다.
    - 입력값으로 각 anchor box와 그에 대한 target 값을 받습니다.
    - 반환값은 각 anchor box의 transform label입니다.

In [None]:
import torch

def calculate_anchor_transforms(anchors, targets):
    """
    각 Anchor box에 대한 transform 값을 계산하는 함수.

    Args:
        anchors (torch.Tensor): Anchor box의 (x, y, w, h) 좌표 정보 (M, 4), M은 Anchor box의 개수.
        targets (torch.Tensor): 각 Anchor box에 대한 target 값의 (x, y, w, h) 좌표 정보 (M, 4).

    Returns:
        torch.Tensor: 각 Anchor box의 transform label (M, 4).
    """
    # Anchor box와 target의 좌표 정보를 (x1, y1, x2, y2) 형태로 변경
    x1_anchors, y1_anchors, x2_anchors, y2_anchors = anchors[:, 0], anchors[:, 1], anchors[:, 0] + anchors[:, 2], anchors[:, 1] + anchors[:, 3]
    x1_targets, y1_targets, x2_targets, y2_targets = targets[:, 0], targets[:, 1], targets[:, 0] + targets[:, 2], targets[:, 1] + targets[:, 3]

    # 변형 값을 계산
    transform_x = (x1_targets - x1_anchors) / anchors[:, 2]
    transform_y = (y1_targets - y1_anchors) / anchors[:, 3]
    transform_w = torch.log((x2_targets - x1_targets) / anchors[:, 2])
    transform_h = torch.log((y2_targets - y1_targets) / anchors[:, 3])

    transforms = torch.stack((transform_x, transform_y, transform_w, transform_h), dim=-1)

    return transforms

# 예제 사용법
anchors = torch.tensor([[100, 100, 50, 50], [300, 300, 80, 80], [500, 500, 60, 60]], dtype=torch.float32)
targets = torch.tensor([[110, 110, 45, 45], [310, 310, 85, 85], [520, 520, 58, 58]], dtype=torch.float32)

anchor_transforms = calculate_anchor_transforms(anchors, targets)

# 결과 출력
print("각 Anchor box의 transform label:")
print(anchor_transforms)


### 문제 15. _encode_sample method
    - 각 anchor box에 대한 bbox와 class의 target 값을 계산합니다.
    - _match_anchor_boxes method에서 계산한 최대 IOU의 index를 활용하여 해당되는 gt box의 정보를 찾아서 matching 해줍니다.
    - matching돤 gt box의 x,y,w,h와 _compute_box_target method를 이용하여, box regression에 대한 label을 생성합니다.
    - matching된 gt box의 class id와 positive_mask, ignore_mask를 이용하여 class에 대한 label을 생성합니다.
    - 이 때 positive인 경우는 gt의 class id를 사용하고, negative인 경우는 -1로 ignore인 경우는 -2로 labeling합니다.
    - 입력값으로 image shape(batch, height, width, channel), gt box의 (x,y,w,h), gt box의 class id를 받습니다.
    - 최종적으로 box target과 class target을 하나로 concat하여 반환합니다.


In [None]:
import torch

class AnchorBox:
    # AnchorBox 클래스의 나머지 부분은 이전 예제에서 사용한 코드를 그대로 사용합니다.

def calculate_anchor_targets(image_shape, gt_boxes, gt_class_ids):
    """
    각 Anchor box에 대한 bbox와 class의 target 값을 계산하는 함수.

    Args:
        image_shape (tuple): 이미지 shape (batch, height, width, channel).
        gt_boxes (torch.Tensor): GT box의 (x, y, w, h) 좌표 정보 (N, 4), N은 GT box의 개수.
        gt_class_ids (torch.Tensor): GT box의 class id 정보 (N,).

    Returns:
        torch.Tensor: 각 Anchor box에 대한 bbox와 class의 target 값 (M, 5), M은 Anchor box의 개수.
    """
    anchor_generator = AnchorBox()
    anchors = anchor_generator.get_anchors((image_shape[1], image_shape[2]))  # 이미지 height, width에 맞는 anchor 생성

    # 최대 IoU에 해당하는 GT box 및 mask 가져오기
    matched_gt_index, positive_mask, ignore_mask = anchor_generator._match_anchor_boxes(anchors, gt_boxes)

    # GT box 정보 가져오기
    matched_gt_boxes = gt_boxes[matched_gt_index]
    matched_gt_class_ids = gt_class_ids[matched_gt_index]

    # bbox regression label 계산
    bbox_targets = anchor_generator._compute_box_target(anchors, matched_gt_boxes)

    # class label 계산
    class_targets = torch.ones_like(positive_mask) * -2  # 모든 anchor를 ignore로 초기화
    class_targets[positive_mask] = matched_gt_class_ids  # positive anchor의 class label 설정

    # bbox target과 class target을 하나로 concat
    targets = torch.cat((bbox_targets, class_targets.unsqueeze(1)), dim=1)

    return targets

# 예제 사용법
image_shape = (1, 800, 800, 3)  # 이미지 shape (batch, height, width, channel)
gt_boxes = torch.tensor([[100, 100, 50, 50], [300, 300, 80, 80]], dtype=torch.float32)
gt_class_ids = torch.tensor([0, 1], dtype=torch.int64)

anchor_targets = calculate_anchor_targets(image_shape, gt_boxes, gt_class_ids)

# 결과 출력
print("각 Anchor box에 대한 bbox와 class의 target 값:")
print(anchor_targets)


### 문제 16. encode_batch method
    - tf.data(dataset)에 map으로 적용할 수 있도록 합니다.
    - batch 단위로 data를 받아서, 각 image마다 위에서 작성한 method들을 활용하여 label을 생성하고,
      그 label들을 다시 batch 단위로 묶어서 반환합니다.
    - 입력값으로 batch 단위의 image data, gt box의 (x,y,w,h), gt box의 class id를 받습니다.
    - 반환값은 batch 단위의 image data(입력값 그대로)와, 생성한 label입니다.

In [None]:
import torch

class AnchorBox:
    # AnchorBox 클래스의 나머지 부분은 이전 예제에서 사용한 코드를 그대로 사용합니다.

def calculate_anchor_targets_batch(images, gt_boxes_list, gt_class_ids_list):
    """
    배치 단위로 이미지와 GT box 정보를 입력으로 받고, 각 이미지에 대한 라벨을 생성하여 배치 단위로 반환하는 함수.

    Args:
        images (torch.Tensor): 이미지 데이터 (batch, height, width, channel).
        gt_boxes_list (list): 각 이미지에 대한 GT box 정보의 리스트. 각 GT box 정보는 torch.Tensor (N, 4) 형태.
        gt_class_ids_list (list): 각 이미지에 대한 GT box의 class id 정보의 리스트. 각 class id 정보는 torch.Tensor (N,) 형태.

    Returns:
        torch.Tensor: 입력 이미지 데이터 (batch, height, width, channel).
        list: 각 이미지에 대한 라벨 정보의 리스트. 각 라벨 정보는 torch.Tensor (M, 5) 형태.
    """
    batch_labels = []

    for i in range(len(images)):
        image_shape = images[i].shape  # 이미지 shape 가져오기
        gt_boxes = gt_boxes_list[i]  # 현재 이미지에 대한 GT box 정보
        gt_class_ids = gt_class_ids_list[i]  # 현재 이미지에 대한 GT class id 정보

        # 각 이미지에 대한 라벨 생성
        labels = calculate_anchor_targets(image_shape, gt_boxes, gt_class_ids)
        batch_labels.append(labels)

    return images, batch_labels

# 예제 사용법
# 가정: batch 크기가 2이고, 각 이미지에 대한 GT box 정보 및 GT class 정보를 리스트로 제공
batch_images = [torch.randn((800, 800, 3)), torch.randn((800, 800, 3))]
batch_gt_boxes = [torch.tensor([[100, 100, 50, 50], [300, 300, 80, 80]], dtype=torch.float32),
                  torch.tensor([[200, 200, 60, 60], [400, 400, 70, 70]], dtype=torch.float32)]
batch_gt_class_ids = [torch.tensor([0, 1], dtype=torch.int64), torch.tensor([1, 2], dtype=torch.int64)]

batch_images, batch_labels = calculate_anchor_targets_batch(batch_images, batch_gt_boxes, batch_gt_class_ids)

# 결과 출력
print("배치 이미지 데이터 shape:", batch_images[0].shape, batch_images[1].shape)
print("배치 라벨 정보:", batch_labels[0], batch_labels[1])


In [None]:
class LabelEncoder:
    def __init__(self):
        self._anchor_box = AnchorBox()
        self._box_variance = tf.convert_to_tensor(
            [0.1, 0.1, 0.2, 0.2], dtype=tf.float32
        )

    def _match_anchor_boxes(self, anchor_boxes, gt_boxes, match_iou=0.5, ignore_iou=0.4):
        iou_matrix = compute_iou(anchor_boxes, gt_boxes)

        ##### CODE HERE #####

        return (
            matched_gt_idx,
            tf.cast(positive_mask, dtype=tf.float32),
            tf.cast(ignore_mask, dtype=tf.float32),
        )

    def _compute_box_target(self, anchor_boxes, matched_gt_boxes):
        box_target = tf.concat(
            [
               ##### CODE HERE #####
            ],
            axis=-1,
        )
        box_target = box_target / self._box_variance
        return box_target

    def _encode_sample(self, image_shape, gt_boxes, cls_ids):
        anchor_boxes = self._anchor_box.get_anchors(image_shape[1], image_shape[2])
        cls_ids = tf.cast(cls_ids, dtype=tf.float32)
        matched_gt_idx, positive_mask, ignore_mask = self._match_anchor_boxes(
            anchor_boxes, gt_boxes
        )

        ##### CODE HERE #####

        label = tf.concat([box_target, cls_target], axis=-1)
        return label

    def encode_batch(self, batch_images, gt_boxes, cls_ids):
        images_shape = tf.shape(batch_images)
        batch_size = images_shape[0]

        labels = tf.TensorArray(dtype=tf.float32, size=batch_size, dynamic_size=True)
        for i in range(batch_size):

            ##### CODE HERE #####

        batch_images = tf.keras.applications.resnet.preprocess_input(batch_images)
        return batch_images, labels.stack()

## Step 5. Dataset 만들기
앞에서 작성한 함수와 class를 활용하여 model에 data를 공급하기 위한 dataset을 만들어봅시다.


### 문제 17. Train/Validation dataset 만들기
    - 위에서 생성한 train_dataset, validation_dataset과 그동안 작성한 함수와 class들을 활용하여,
      train/validation dataset을 만듭니다.
    - 다음과 같은 순서로 적용합니다.
      1. preprocess_data 함수 적용
      2. dataset shuffle
      3. padded_batch를 이용하여 batch로 묶음
        - 이 때, padding 값은 image, box 좌표, class id에 대하여 각각 0.0, 1e-8, -1로 합니다.
      4. LabelEncoder의 encode_batch 함수 적용
      5. prefetch 적용
    - validation_dataset에는 shuffle만 빼고 동일하게 적용합니다.



In [None]:
import torch
from torch.utils.data import DataLoader
from torchvision import transforms

# 주어진 데이터셋 클래스로부터 train_dataset과 validation_dataset 생성
train_dataset = YourCustomTrainDataset()  # YourCustomTrainDataset에 실제 데이터셋 클래스 이름을 적용해야 합니다.
validation_dataset = YourCustomValidationDataset()  # YourCustomValidationDataset에 실제 데이터셋 클래스 이름을 적용해야 합니다.

# 데이터 전처리를 위한 함수 정의
def preprocess_data(image, gt_boxes, gt_class_ids):
    # 이미지 전처리 (예시)
    image = transforms.ToTensor()(image)  # 이미지를 텐서로 변환
    image = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])(image)  # 이미지를 정규화

    return image, gt_boxes, gt_class_ids

# DataLoader의 batch_size 설정
batch_size = 32

# train_dataset에 대한 DataLoader 설정
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=lambda x: list(zip(*x)))

# validation_dataset에 대한 DataLoader 설정
validation_loader = DataLoader(validation_dataset, batch_size=batch_size, shuffle=False, collate_fn=lambda x: list(zip(*x)))

# LabelEncoder 인스턴스 생성
label_encoder = LabelEncoder()

# train_loader와 validation_loader에 대해 전처리 및 라벨 인코딩 적용
for data in train_loader:
    images, gt_boxes, gt_class_ids = data
    images, gt_boxes, gt_class_ids = preprocess_data(images, gt_boxes, gt_class_ids)
    labels = calculate_anchor_targets_batch([images], [gt_boxes], [gt_class_ids])[1]
    label_encoder.encode_batch(labels)

for data in validation_loader:
    images, gt_boxes, gt_class_ids = data
    images, gt_boxes, gt_class_ids = preprocess_data(images, gt_boxes, gt_class_ids)
    labels = calculate_anchor_targets_batch([images], [gt_boxes], [gt_class_ids])[1]
    label_encoder.encode_batch(labels)

# DataLoader에 prefetch 설정 (마지막 인자는 CPU 스레드 수로 조절 가능)
train_loader = train_loader.prefetch(2)
validation_loader = validation_loader.prefetch(2)


In [None]:
label_encoder = LabelEncoder()

autotune = tf.data.AUTOTUNE

##### CODE HERE #####

train_dataset = train_dataset.apply(tf.data.experimental.ignore_errors())
train_dataset = train_dataset.prefetch(autotune)


##### CODE HERE #####

val_dataset = val_dataset.apply(tf.data.experimental.ignore_errors())
val_dataset = val_dataset.prefetch(autotune)

## Step 6. RetinaNet Model 만들기
Data가 준비되었으므로, RetinaNet model을 만들어보도록 하겠습니다.

### 문제 18. get_backbone 함수 만들기
    - backbone으로 ResNet50을 사용합니다.
    - get_backbone 함수는 ResNet50의 C3, C4, C5에 해당하는 feature map을 output으로 하는 model을 반환합니다.


In [None]:
import torch
import torchvision.models as models
import torch.nn as nn

def get_backbone():
    # 미리 학습된 ResNet-50 모델을 불러옴
    resnet50 = models.resnet50(pretrained=True)

    # ResNet-50의 마지막 fully connected 레이어를 제거하여 feature extractor로 활용
    # 이 때, C3, C4, C5에 해당하는 feature map을 반환하도록 설정
    backbone = nn.Sequential(
        *list(resnet50.children())[:-2]
    )

    return backbone

# ResNet-50 백본 모델 생성
backbone_model = get_backbone()

# 예제로 백본 모델의 아웃풋 shape 확인
input_tensor = torch.randn(1, 3, 224, 224)  # 입력 이미지 shape (batch, channel, height, width)
output_feature_maps = backbone_model(input_tensor)

# 각 feature map의 shape 출력
for i, feature_map in enumerate(output_feature_maps):
    print(f"Feature Map C{i+3} shape: {feature_map.shape}")


먼저 ResNet50을 가져와서 C3, C4, C5에 해당하는 layer의 이름을 확인합니다.

In [None]:
backbone = keras.applications.ResNet50(include_top=False, input_shape=[224, 224, 3])

In [None]:
backbone.summary()

확인된 layer 이름을 활용하여 get_backbone 함수를 작성합니다.

In [None]:
def get_backbone():
    backbone = keras.applications.ResNet50(
        include_top=False, input_shape=[None, None, 3]
    )

    ##### CODE HERE #####

    return keras.Model(
        inputs=[backbone.inputs], outputs=[c3_output, c4_output, c5_output]
    )

### 문제 19. Feature Pyramid Network 만들기
keras custom layer를 활용하여 FPN을 만들어봅시다.

    - input으로 batch 단위의 image를 받고 output으로 P3-P7을 반환합니다.
    - RetinaNet과 FPN 논문의 Feature Pyramid Network 부분을 참고하여 작성합니다.

In [None]:
import torch
import torchvision.models as models
import torch.nn as nn
from collections import OrderedDict

class FPNBackbone(nn.Module):
    def __init__(self):
        super(FPNBackbone, self).__init__()
        # ResNet-50 백본을 불러옴
        self.resnet = models.resnet50(pretrained=True)

        # ResNet-50의 마지막 fully connected 레이어를 제거하여 feature extractor로 활용
        self.backbone = nn.Sequential(
            *list(self.resnet.children())[:-2]
        )

        # Lateral Convolution Layers
        self.lateral_c3 = nn.Conv2d(512, 256, kernel_size=1)
        self.lateral_c4 = nn.Conv2d(1024, 256, kernel_size=1)
        self.lateral_c5 = nn.Conv2d(2048, 256, kernel_size=1)

        # P6 and P7 Layers
        self.p6 = nn.Conv2d(2048, 256, kernel_size=3, stride=2, padding=1)
        self.p7 = nn.ReLU()(self.p6)

        # P3, P4, P5 Layers
        self.upsample = nn.Upsample(scale_factor=2, mode='nearest')

    def forward(self, x):
        c2, c3, c4, c5 = self.backbone(x)

        # Lateral Connections
        p5 = self.lateral_c5(c5)
        p4 = self.lateral_c4(c4) + self.upsample(p5)
        p3 = self.lateral_c3(c3) + self.upsample(p4)

        # P6 and P7
        p6 = self.p6(c5)
        p7 = self.p7(p6)

        return p3, p4, p5, p6, p7

# 백본 모델 생성
fpn_backbone = FPNBackbone()

# 예제로 백본 모델에 이미지를 전달하여 P3-P7 feature map을 확인
input_tensor = torch.randn(1, 3, 224, 224)  # 입력 이미지 shape (batch, channel, height, width)
output_feature_maps = fpn_backbone(input_tensor)

# 각 feature map의 shape 출력
for i, feature_map in enumerate(output_feature_maps):
    print(f"Feature Map P{i+3} shape: {feature_map.shape}")


In [None]:
class FeaturePyramid(keras.layers.Layer):
    def __init__(self, **kwargs):
        super(FeaturePyramid, self).__init__(name="FeaturePyramid", **kwargs)
        self.backbone = get_backbone()
        self.conv_c3_1x1 = keras.layers.Conv2D(256, 1, 1, "same")
        self.conv_c4_1x1 = keras.layers.Conv2D(256, 1, 1, "same")
        self.conv_c5_1x1 = keras.layers.Conv2D(256, 1, 1, "same")
        self.conv_c3_3x3 = keras.layers.Conv2D(256, 3, 1, "same")
        self.conv_c4_3x3 = keras.layers.Conv2D(256, 3, 1, "same")
        self.conv_c5_3x3 = keras.layers.Conv2D(256, 3, 1, "same")
        self.conv_c6_3x3 = keras.layers.Conv2D(256, 3, 2, "same")
        self.conv_c7_3x3 = keras.layers.Conv2D(256, 3, 2, "same")
        self.upsample_2x = keras.layers.UpSampling2D(2)

    def call(self, images, training=False):

        ##### CODE HERE #####

        return p3_output, p4_output, p5_output, p6_output, p7_output

### 문제 20. Class/Box subnet 만들기
classification과 box regression을 위한 subnet(head)를 만들어봅시다.

    - classification과 box regression의 subnet 구조가 동일하므로 공통으로 사용할 수 있는 함수 형태로 작성합니다.
    - 이 함수는 입력값으로 마지막 layer의 convolution filter 수와 bias initializaer를 받습니다.
    - channel이 256인 feature map을 입력으로 classification 혹은 box regression의 결과를 뽑아주는 모델을 반환합니다.

In [None]:
import torch.nn as nn
import torch.nn.functional as F

def classification_subnet(num_filters, bias_initializer=None):
    """
    Classification 또는 Box Regression 서브넷을 생성하는 함수.

    Args:
        num_filters (int): 마지막 레이어의 필터 수.
        bias_initializer (callable, optional): 바이어스 초기화 함수. 기본값은 None.

    Returns:
        nn.Module: Classification 또는 Box Regression 서브넷 모델.
    """
    subnet = nn.Sequential(
        nn.Conv2d(256, num_filters, kernel_size=3, stride=1, padding=1),
        nn.ReLU(inplace=True),
        nn.Conv2d(num_filters, num_filters, kernel_size=3, stride=1, padding=1),
        nn.ReLU(inplace=True),
        nn.Conv2d(num_filters, num_filters, kernel_size=3, stride=1, padding=1),
        nn.ReLU(inplace=True),
        nn.Conv2d(num_filters, num_filters, kernel_size=3, stride=1, padding=1),
        nn.ReLU(inplace=True)
    )

    # 마지막 레이어의 바이어스 초기화 설정
    if bias_initializer is not None:
        subnet[-1].bias.data.fill_(bias_initializer)

    return subnet

# Classification 서브넷 모델 생성 예제
classification_subnet_model = classification_subnet(80, bias_initializer=0.0)
print(classification_subnet_model)

# Box Regression 서브넷 모델 생성 예제
box_regression_subnet_model = classification_subnet(4, bias_initializer=0.0)
print(box_regression_subnet_model)


In [None]:
def build_head(output_filters, bias_init):
    head = keras.Sequential([keras.Input(shape=[None, None, 256])])
    kernel_init = tf.initializers.RandomNormal(0.0, 0.01)

    ##### CODE HERE #####

    head.add(
        keras.layers.Conv2D(
            output_filters,
            3,
            1,
            padding="same",
            kernel_initializer=kernel_init,
            bias_initializer=bias_init,
        )
    )
    return head

### 문제 21. RetinaNet model 만들기
위에서 작성한 layer 및 함수를 이용하여 RetinaNet model을 만들어봅시다.

    - keras subclassing model을 이용하여 작성합니다.
    - 입력으로 image를 받고, 출력으로 모든 anchor box의 regression과 classification 예측값을 모아서 반환합니다.
    - classification subnet에는 RetinaNet 논문에 나온 bias initialzation을 적용합니다.
    - output shape은 (batch size, 전체 anchor box의 갯수, num_classes+4[84]) 입니다.

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

def retina_subnet(input_channels, num_classes, num_anchors):
    """
    RetinaNet의 Classification 또는 Box Regression 서브넷을 생성하는 함수.

    Args:
        input_channels (int): 입력 feature map의 채널 수 (256).
        num_classes (int): 객체 클래스의 수.
        num_anchors (int): 각 위치에서 예측할 anchor box의 수.

    Returns:
        nn.Module: RetinaNet의 Classification 또는 Box Regression 서브넷 모델.
    """
    # Bias 초기화 값을 계산
    prior_prob = 0.01
    bias_initializer = -torch.log(torch.tensor((1 - prior_prob) / prior_prob))

    subnet = nn.Sequential(
        nn.Conv2d(input_channels, 256, kernel_size=3, stride=1, padding=1),
        nn.ReLU(inplace=True),
        nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1),
        nn.ReLU(inplace=True),
        nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1),
        nn.ReLU(inplace=True),
        nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1),
        nn.ReLU(inplace=True),
        nn.Conv2d(256, num_classes * num_anchors, kernel_size=3, stride=1, padding=1),
        nn.ReLU(inplace=True),
        nn.Conv2d(256, 4 * num_anchors, kernel_size=3, stride=1, padding=1),
    )

    # 마지막 레이어의 바이어스 초기화 설정
    nn.init.constant_(subnet[-1].bias.data, bias_initializer)

    return subnet

# 예시로 모델 생성
input_channels = 256  # 입력 feature map 채널 수
num_classes = 20  # 객체 클래스의 수
num_anchors = 9  # 각 위치에서 예측할 anchor box의 수

retina_subnet_model = retina_subnet(input_channels, num_classes, num_anchors)
print(retina_subnet_model)

# 입력 이미지 생성 (예시)
batch_size = 2
input_image = torch.randn(batch_size, input_channels, 224, 224)  # (batch, channels, height, width)

# 모델에 입력 이미지 전달하여 예측 수행
predictions = retina_subnet_model(input_image)

# 예측 결과의 shape 출력
print("예측 결과 shape:", predictions.shape)


In [None]:
class RetinaNet(keras.Model):
    def __init__(self, num_classes, **kwargs):
        super(RetinaNet, self).__init__(name="RetinaNet", **kwargs)
        self.fpn = FeaturePyramid()
        self.num_classes = num_classes

        prior_probability = tf.constant_initializer(-np.log((1 - 0.01) / 0.01))
        self.cls_head = build_head(9 * num_classes, prior_probability)
        self.box_head = build_head(9 * 4, "zeros")

    def call(self, image, training=False):
        features = self.fpn(image, training=training)
        N = tf.shape(image)[0]
        cls_outputs = []
        box_outputs = []

        ##### CODE HERE #####

        cls_outputs = tf.concat(cls_outputs, axis=1)
        box_outputs = tf.concat(box_outputs, axis=1)
        return tf.concat([box_outputs, cls_outputs], axis=-1)

## Step 7. Model 학습하기
Loss function을 정의하고 RetinaNet model을 학습해봅시다.

### 문제 22. Smooth L1 loss
    - box regression을 위한 smooth l1 loss를 만들어봅시다.
    - tf.losses.Loss class의 subclass로 custom loss를 만듭니다.
    - l1, l2 loss가 변하는 지점의 값은 delta 값으로 __init__ method에서 입력받습니다.

In [None]:
import torch
import torch.nn as nn

class SmoothL1Loss(nn.Module):
    def __init__(self, l1_threshold, l2_threshold):
        """
        Smooth L1 Loss를 초기화합니다.

        Args:
            l1_threshold (float): L1 Loss와 L2 Loss가 변하는 지점의 임계값.
            l2_threshold (float): L2 Loss와 L1 Loss가 변하는 지점의 임계값.
        """
        super(SmoothL1Loss, self).__init__()
        self.l1_threshold = l1_threshold
        self.l2_threshold = l2_threshold

    def forward(self, prediction, target):
        """
        Smooth L1 Loss를 계산합니다.

        Args:
            prediction (torch.Tensor): 예측값.
            target (torch.Tensor): 실제값.

        Returns:
            torch.Tensor: Smooth L1 Loss.
        """
        diff = torch.abs(prediction - target)
        loss = torch.where(diff < self.l1_threshold, 0.5 * diff**2, diff - 0.5 * self.l1_threshold**2)
        loss = torch.where(diff < self.l2_threshold, loss, 0.5 * self.l2_threshold**2 + self.l2_threshold * (diff - self.l2_threshold))
        return loss

# 예제로 Loss 객체 생성
l1_threshold = 1.0
l2_threshold = 5.0
smooth_l1_loss = SmoothL1Loss(l1_threshold, l2_threshold)

# 예측값과 실제값 생성 (예시)
prediction = torch.tensor([2.0, 4.0, 6.0])
target = torch.tensor([1.0, 3.0, 5.0])

# Smooth L1 Loss 계산
loss = smooth_l1_loss(prediction, target)
print("Smooth L1 Loss:", loss.item())


In [None]:
class RetinaNetBoxLoss(tf.losses.Loss):
    """Implements Smooth L1 loss"""

    def __init__(self, delta):
        super(RetinaNetBoxLoss, self).__init__(
            reduction="none", name="RetinaNetBoxLoss"
        )
        self._delta = delta

    def call(self, y_true, y_pred):
        difference = y_true - y_pred

        ##### CODE HERE #####

        return tf.reduce_sum(loss, axis=-1)

### 문제 23. Focal loss
    - classification을 위한 focal loss를 만들어봅시다.
    - smooth l1 loss와 마찬가지로 subclassing을 이용하고, 논문의 수식을 참고하여 구현합니다.
    - alpha, gamma 값은 __init__ method에서 설정합니다.

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class FocalLoss(nn.Module):
    def __init__(self, alpha, gamma):
        """
        Focal Loss를 초기화합니다.

        Args:
            alpha (float): 클래스별 가중치 (0 이상의 값).
            gamma (float): Focal Loss의 감마 값 (gamma >= 0).
        """
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma

    def forward(self, prediction, target):
        """
        Focal Loss를 계산합니다.

        Args:
            prediction (torch.Tensor): 예측값.
            target (torch.Tensor): 실제값.

        Returns:
            torch.Tensor: Focal Loss.
        """
        # Cross-Entropy Loss 계산
        ce_loss = F.cross_entropy(prediction, target, reduction='none')

        # 클래스별 가중치 적용
        alpha_factor = torch.exp(-self.alpha * (1 - F.softmax(prediction, dim=1)))
        focal_loss = alpha_factor * ce_loss**self.gamma

        return focal_loss.mean()

# 예제로 Loss 객체 생성
alpha = 0.25
gamma = 2.0
focal_loss = FocalLoss(alpha, gamma)

# 예측값과 실제값 생성 (예시)
prediction = torch.randn(3, 5)  # 3개의 예제, 5개의 클래스
target = torch.tensor([1, 3, 2])  # 각 예제의 정답 클래스

# Focal Loss 계산
loss = focal_loss(prediction, target)
print("Focal Loss:", loss.item())


In [None]:
class RetinaNetClassificationLoss(tf.losses.Loss):
    """Implements Focal loss"""

    def __init__(self, alpha, gamma):
        super(RetinaNetClassificationLoss, self).__init__(
            reduction="none", name="RetinaNetClassificationLoss"
        )
        self._alpha = alpha
        self._gamma = gamma

    def call(self, y_true, y_pred):
        cross_entropy = tf.nn.sigmoid_cross_entropy_with_logits(
            labels=y_true, logits=y_pred
        )
        probs = tf.nn.sigmoid(y_pred)

        ##### CODE HERE #####

        return tf.reduce_sum(loss, axis=-1)

### 문제 24. RetinaNet loss
    - smooth l1 loss와 focal loss를 합쳐서 RetinaNet loss를 만들어봅시다.
    - classification loss는 ignore의 경우를 제외합니다.
    - box regression loss는 positive에 대해서만 계산합니다.
    - 각 loss는 positive sample의 갯수로 normalize합니다.
    - 최종 loss는 classification loss와 regression loss를 더해서 계산합니다.


In [None]:
import torch
import torch.nn as nn

class RetinaNetLoss(nn.Module):
    def __init__(self, alpha, gamma, l1_threshold, l2_threshold):
        """
        RetinaNet의 Loss를 초기화합니다.

        Args:
            alpha (float): Focal Loss의 클래스별 가중치.
            gamma (float): Focal Loss의 감마 값.
            l1_threshold (float): Smooth L1 Loss의 L1 Loss와 L2 Loss가 변하는 지점의 임계값.
            l2_threshold (float): Smooth L1 Loss의 L2 Loss와 L1 Loss가 변하는 지점의 임계값.
        """
        super(RetinaNetLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.l1_threshold = l1_threshold
        self.l2_threshold = l2_threshold
        self.smooth_l1_loss = SmoothL1Loss(l1_threshold, l2_threshold)
        self.focal_loss = FocalLoss(alpha, gamma)

    def forward(self, classification_predictions, regression_predictions,
                classification_targets, regression_targets,
                positive_masks, ignore_masks):
        """
        RetinaNet의 Loss를 계산합니다.

        Args:
            classification_predictions (torch.Tensor): 객체 클래스 예측값.
            regression_predictions (torch.Tensor): 바운딩 박스 예측값.
            classification_targets (torch.Tensor): 객체 클래스 실제값.
            regression_targets (torch.Tensor): 바운딩 박스 실제값.
            positive_masks (torch.Tensor): Positive 샘플 마스크.
            ignore_masks (torch.Tensor): Ignore 샘플 마스크.

        Returns:
            torch.Tensor: RetinaNet의 Loss.
        """
        # Classification Loss 계산 (Ignore 샘플 제외)
        classification_loss = self.focal_loss(classification_predictions[~ignore_masks],
                                               classification_targets[~ignore_masks])

        # Box Regression Loss 계산 (Positive 샘플만 계산)
        positive_regression_targets = regression_targets[positive_masks]
        positive_regression_predictions = regression_predictions[positive_masks]
        regression_loss = self.smooth_l1_loss(positive_regression_predictions, positive_regression_targets)

        # Positive 샘플 수로 정규화
        num_positive_samples = positive_masks.sum()
        classification_loss /= num_positive_samples
        regression_loss /= num_positive_samples

        # 최종 Loss는 Classification Loss와 Regression Loss의 합
        total_loss = classification_loss + regression_loss

        return total_loss

# 예제로 Loss 객체 생성
alpha = 0.25
gamma = 2.0
l1_threshold = 1.0
l2_threshold = 5.0
retina_loss = RetinaNetLoss(alpha, gamma, l1_threshold, l2_threshold)

# 예측값과 실제값, 마스크 생성 (예시)
classification_predictions = torch.randn(10, 20)  # (batch, num_anchors, num_classes)
regression_predictions = torch.randn(10, 9, 4)  # (batch, num_anchors, 4)
classification_targets = torch.randint(0, 20, (10, 9))  # (batch, num_anchors)
regression_targets = torch.randn(10, 9, 4)  # (batch, num_anchors, 4)
positive_masks = torch.randint(0, 2, (10, 9), dtype=torch.bool)  # (batch, num_anchors)
ignore_masks = torch.randint(0, 2, (10, 9), dtype=torch.bool)  # (batch, num_anchors)

# RetinaNet Loss 계산
loss = retina_loss(classification_predictions, regression_predictions,
                   classification_targets, regression_targets,
                   positive_masks, ignore_masks)
print("RetinaNet Loss:", loss.item())


In [None]:
class RetinaNetLoss(tf.losses.Loss):
    """Wrapper to combine both the losses"""

    def __init__(self, num_classes=80, alpha=0.25, gamma=2.0, delta=1.0):
        super(RetinaNetLoss, self).__init__(reduction="auto", name="RetinaNetLoss")
        self._cls_loss = RetinaNetClassificationLoss(alpha, gamma)
        self._box_loss = RetinaNetBoxLoss(delta)
        self._num_classes = num_classes

    def call(self, y_true, y_pred):
        y_pred = tf.cast(y_pred, dtype=tf.float32)
        box_labels = y_true[:, :, :4]
        box_predictions = y_pred[:, :, :4]
        cls_labels = tf.one_hot(
            tf.cast(y_true[:, :, 4], dtype=tf.int32),
            depth=self._num_classes,
            dtype=tf.float32,
        )
        cls_predictions = y_pred[:, :, 4:]
        positive_mask = tf.cast(tf.greater(y_true[:, :, 4], -1.0), dtype=tf.float32)
        ignore_mask = tf.cast(tf.equal(y_true[:, :, 4], -2.0), dtype=tf.float32)

        ##### CODE HERE #####

        loss = cls_loss + box_loss
        return loss

### 문제 25. Model compile
    - RetinaNet model을 만들고, RetenaNet loss를 이용하여 model을 compile 해봅시다.
    - optimizer는 momentum SGD를 사용하고, momentum 값은 0.9를 사용합니다.
    - learning rate schedule은 위에서 hyperparameter 설정부분에서 만들어 둔 것을 사용합니다.

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

# RetinaNet 모델 정의
class RetinaNet(nn.Module):
    def __init__(self, num_classes, num_anchors):
        super(RetinaNet, self).__init__()
        # 모델 구성 요소 정의 (Backbone, FPN, Subnets 등)
        # ...

    def forward(self, input_image):
        # Forward Pass 구현
        # ...

# 모델 초기화
num_classes = 20  # 예시: 객체 클래스 수
num_anchors = 9  # 예시: 각 위치에서 예측할 anchor box 수
retina_model = RetinaNet(num_classes, num_anchors)

# RetinaNet Loss 생성
alpha = 0.25
gamma = 2.0
l1_threshold = 1.0
l2_threshold = 5.0
retina_loss = RetinaNetLoss(alpha, gamma, l1_threshold, l2_threshold)

# Optimizer 설정 (SGD with Momentum)
optimizer = optim.SGD(retina_model.parameters(), lr=0.01, momentum=0.9)

# Learning Rate Schedule 설정 (예시: 위에서 정의한 학습률 스케줄러)
scheduler = LearningRateScheduler(max_lr, warmup_steps, decay_steps)

# 학습률 스케줄러를 Optimizer에 등록
optimizer = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda=scheduler)

# 모델 컴파일
retina_model.compile(loss=retina_loss, optimizer=optimizer)

# 모델 정보 출력
print(retina_model)


In [None]:
##### CODE HERE #####

model.compile(loss=loss_fn, optimizer=optimizer)

### Checkpoint callback
    - 다음과 같이 model의 weight를 저장하기 위한 checkpoint callback을 만듭니다.

In [None]:
import torch

class CheckpointCallback:
    def __init__(self, model, save_path, save_best_only=True):
        """
        모델의 가중치를 저장하기 위한 Checkpoint Callback을 초기화합니다.

        Args:
            model (nn.Module): 저장할 모델.
            save_path (str): 가중치를 저장할 경로와 파일 이름.
            save_best_only (bool): 최상의 성능을 가진 모델만 저장할지 여부.
        """
        self.model = model
        self.save_path = save_path
        self.save_best_only = save_best_only
        self.best_loss = float('inf') if save_best_only else None

    def __call__(self, current_loss):
        """
        Callback을 호출하여 모델의 가중치를 저장합니다.

        Args:
            current_loss (float): 현재 Loss 값.
        """
        if self.save_best_only and current_loss >= self.best_loss:
            return  # 현재 Loss가 최상이 아니면 저장하지 않음

        # 모델의 가중치 저장
        torch.save(self.model.state_dict(), self.save_path)
        print(f"Model weights saved to {self.save_path}")
        if self.save_best_only:
            self.best_loss = current_loss

# 예제로 Callback 객체 생성 및 사용
model = RetinaNet(num_classes, num_anchors)  # 예시: RetinaNet 모델
save_path = 'model_weights.pth'  # 저장 경로 및 파일 이름 지정
checkpoint_callback = CheckpointCallback(model, save_path)

# 모델 학습 루프 내에서 Callback 호출 (예시)
current_loss = 0.5  # 현재 Loss 값
checkpoint_callback(current_loss)  # 모델 가중치 저장


In [None]:
callbacks_list = [
    tf.keras.callbacks.ModelCheckpoint(
        filepath=os.path.join(model_dir, "weights" + "_epoch_{epoch}"),
        monitor="loss",
        save_best_only=False,
        save_weights_only=True,
        verbose=1,
    )
]

### Model 학습하기
    - model.fit으로 model을 학습합니다.
    - MS-COCO 전체 data를 이용하여 학습하는 경우에는 아래 주석 처리된 부분의 주석을 지우고,
      epochs = 1 부분을 삭제,
      train_dataset.take(100), val_dataset.take(50)의 .take 부분을 삭제하면 학습할 수 있습니다.

In [None]:
import torch
import torch.optim as optim
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
from torchvision.datasets import CocoDetection
from torchvision.models.detection import RetinaNet
from torchvision.models.detection.retinanet import retinanet_resnet50_fpn

# 학습 설정
learning_rate = 0.001
batch_size = 2
epochs = 1  # 학습 에폭 수
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# 데이터 로드 및 전처리
transform = transforms.Compose([
    transforms.ToTensor(),
])

# MS-COCO 데이터셋 로드 (경로는 본인 환경에 맞게 수정)
train_dataset = CocoDetection(root='path_to_train_dataset', annFile='annotations_train.json', transform=transform)
val_dataset = CocoDetection(root='path_to_val_dataset', annFile='annotations_val.json', transform=transform)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=lambda x: tuple(zip(*x)))
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, collate_fn=lambda x: tuple(zip(*x)))

# RetinaNet 모델 로드
model = retinanet_resnet50_fpn(pretrained=True)
model.to(device)

# Optimizer 및 Loss 함수 설정
optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9)
criterion = torch.nn.SmoothL1Loss()  # Smooth L1 Loss를 사용하거나 자신의 Loss 함수를 지정하세요.

# 학습 루프
for epoch in range(epochs):
    model.train()
    for images, targets in train_loader:
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        optimizer.zero_grad()
        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())
        losses.backward()
        optimizer.step()

    # 검증
    model.eval()
    with torch.no_grad():
        val_losses = []
        for images, targets in val_loader:
            images = list(image.to(device) for image in images)
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

            loss_dict = model(images, targets)
            losses = sum(loss for loss in loss_dict.values())
            val_losses.append(losses.item())

        mean_val_loss = sum(val_losses) / len(val_losses)
        print(f"Epoch {epoch+1}/{epochs}, Validation Loss: {mean_val_loss:.4f}")

# 모델 저장
torch.save(model.state_dict(), 'retinanet_model.pth')
ㅑ

In [None]:
# train_steps_per_epoch = dataset_info.splits["train"].num_examples // batch_size
# val_steps_per_epoch = \
#     dataset_info.splits["validation"].num_examples // batch_size

# train_steps = 4 * 100000
# epochs = train_steps // train_steps_per_epoch

epochs = 1

model.fit(
    train_dataset.take(100),
    validation_data=val_dataset.take(50),
    epochs=epochs,
    callbacks=callbacks_list,
    verbose=1,
)

## Step 8. 학습된 Model로 결과 확인하기
학습된 model의 weight를 불러와서 validation set에 대하여 detection 결과를 확인해봅시다.

### 학습된 Weights Loading
    - 미리 학습해둔 weights를 아래와 같이 다운받고, model에 load 합니다.

In [None]:
import torch
import torchvision.models.detection

# 학습된 RetinaNet 모델의 가중치를 다운로드
model_url = "https://download.pytorch.org/models/retinanet_resnet50_fpn_coco-eeacb38b.pth"
model_weights_path = "retinanet_resnet50_fpn_coco.pth"
torch.hub.download_url_to_file(model_url, model_weights_path)

# 학습된 가중치를 사용하여 RetinaNet 모델 초기화
model = torchvision.models.detection.retinanet_resnet50_fpn(pretrained=False)  # 미리 학습된 가중치 사용하지 않음
model.load_state_dict(torch.load(model_weights_path))  # 학습된 가중치 로드
model.eval()  # 모델을 추론 모드로 설정 (학습 모드가 아닌 추론 모드)


In [None]:
ckpt_url = "https://drive.google.com/uc?id=19snoNsuyeLPxkj9Is1cMmY-JviTydZim"

gdown.download(ckpt_url, 'ckpt.zip', quiet=False)

with zipfile.ZipFile("ckpt.zip", "r") as z_fp:
    z_fp.extractall("./ckpt")

In [None]:
# Change this to `model_dir` when not using the downloaded weights
weights_dir = "ckpt"

latest_checkpoint = tf.train.latest_checkpoint(weights_dir)
model.load_weights(latest_checkpoint)

### Model 예측 결과를 Decoding하기
RetinaNet은 anchor box 내의 물체에 대한 class별 확률과 anchor box를 얼마나 변형시킬지의 대한 값을 예측합니다.

이를 이용하여 bounding box를 transform하고, class에 대한 예측을 계산합니다.

이를 위하여 DecodePredictions 라는 custom layer를 생성하고, model의 prediction에 이 layer를 통과시켜서 최종 예측값을 얻는 형태로 구현합니다.

DecodePredictions의 method는 다음과 같습니다.

### \_\_init\_\_ method
     - anchor box를 생성하고, nms를 위한 thresould 값들, 그리고 maximum detection의 수를 설정합니다.

In [None]:
import torch
import torch.nn as nn
import torchvision.ops as ops

class DecodePredictions(nn.Module):
    def __init__(self, num_classes, anchors, score_threshold=0.05, nms_iou_threshold=0.5, max_detections=100):
        """
        DecodePredictions 레이어의 생성자입니다.

        Args:
            num_classes (int): 객체 클래스 수.
            anchors (list of torch.Tensor): 모델에서 사용하는 anchor box의 정보 (x, y, w, h).
            score_threshold (float): 예측된 객체 확률 중 임계값을 지정합니다.
            nms_iou_threshold (float): Non-Maximum Suppression (NMS)를 위한 IOU 임계값.
            max_detections (int): 최대 검출 수.
        """
        super(DecodePredictions, self).__init__()
        self.num_classes = num_classes
        self.anchors = anchors
        self.score_threshold = score_threshold
        self.nms_iou_threshold = nms_iou_threshold
        self.max_detections = max_detections

    def forward(self, predictions):
        # 여기서 predictions는 모델의 출력으로 주어진 것을 가정합니다.
        # 예측된 클래스 확률, box 변환 정보, anchor box 등을 포함합니다.

        # 예측된 박스 변환 정보 및 클래스 확률 얻기
        class_probs, box_deltas = predictions
        batch_size, num_anchors, _, _ = box_deltas.shape

        # 최대 검출 수에 맞게 상위 점수를 가진 예측 선택
        scores = class_probs.sigmoid().max(dim=2)[0]  # 클래스 확률 중 최대값
        scores = scores.view(batch_size, num_anchors)

        top_scores, top_class_indices = scores.topk(self.max_detections, dim=1)

        # NMS를 적용하여 중복 검출 제거
        detections = []
        for batch_idx in range(batch_size):
            boxes = self.decode_boxes(box_deltas[batch_idx], self.anchors)
            boxes = boxes[top_class_indices[batch_idx]]
            scores = top_scores[batch_idx]

            indices = ops.nms(boxes, scores, self.nms_iou_threshold)
            boxes = boxes[indices]
            scores = scores[indices]

            # 점수가 임계값 이상인 예측만 유지
            mask = scores > self.score_threshold
            boxes = boxes[mask]
            scores = scores[mask]

            # 최종 검출 결과에 추가
            detections.append(torch.cat((boxes, scores.unsqueeze(1)), dim=1))

        return detections

    def decode_boxes(self, deltas, anchors):
        widths = anchors[:, 2] - anchors[:, 0]
        heights = anchors[:, 3] - anchors[:, 1]
        x_center = anchors[:, 0] + 0.5 * widths
        y_center = anchors[:, 1] + 0.5 * heights

        dx, dy, dw, dh = deltas.split(1, dim=1)
        dx, dy, dw, dh = dx.squeeze(1), dy.squeeze(1), dw.squeeze(1), dh.squeeze(1)

        width = torch.exp(dw) * widths
        height = torch.exp(dh) * heights
        x_center += dx * widths
        y_center += dy * heights

        x1 = x_center - 0.5 * width
        y1 = y_center - 0.5 * height
        x2 = x_center + 0.5 * width
        y2 = y_center + 0.5 * height

        return torch.stack([x1, y1, x2, y2], dim=1)


### 문제 26. _decode_box_predictions method
    - anchor box의 (x,y,w,h)와 model의 box regression 예측값을 이용하여 transform된 box의 위치를 계산합니다.
    - 화면에 출력하기 쉽게 하기 위하여 이 값을 (xmin, ymin, xmax, ymax)로 변환하여 반환합니다

In [None]:
import torch

def _decode_box_predictions(anchor_boxes, box_deltas):
    """
    Anchor box와 박스 회귀 예측값을 사용하여 박스의 위치를 계산하고 (xmin, ymin, xmax, ymax) 형태로 반환합니다.

    Args:
        anchor_boxes (torch.Tensor): Anchor box의 (x, y, w, h) 정보.
        box_deltas (torch.Tensor): 박스 회귀 예측값.

    Returns:
        decoded_boxes (torch.Tensor): 변환된 박스의 위치 정보 (xmin, ymin, xmax, ymax).
    """
    # Anchor box 정보
    anchor_x, anchor_y, anchor_w, anchor_h = anchor_boxes.split(1, dim=1)

    # 박스 회귀 예측값
    delta_x, delta_y, delta_w, delta_h = box_deltas.split(1, dim=1)

    # 변환된 박스의 위치 계산
    pred_x = delta_x * anchor_w + anchor_x
    pred_y = delta_y * anchor_h + anchor_y
    pred_w = torch.exp(delta_w) * anchor_w
    pred_h = torch.exp(delta_h) * anchor_h

    # (xmin, ymin, xmax, ymax) 형태로 변환
    xmin = pred_x - 0.5 * pred_w
    ymin = pred_y - 0.5 * pred_h
    xmax = pred_x + 0.5 * pred_w
    ymax = pred_y + 0.5 * pred_h

    decoded_boxes = torch.cat((xmin, ymin, xmax, ymax), dim=1)

    return decoded_boxes


### 문제 27. call method
    - box regression 값과 classification 예측값을 받아서, box regreesion 값은 _decode_box_predictions에 넣어주고 classification은 sigmoid를 이용하여 확률값으로 변경해줍니다.
    - tf.image.combined_non_max_suppresion을 이용하여 nms를 수행하고 결과를 반환합니다.

In [None]:
import torch
import torch.nn as nn
import torchvision.ops as ops

class DecodePredictions(nn.Module):
    def __init__(self, anchors, num_classes, score_threshold=0.05, nms_iou_threshold=0.5, max_detections=100):
        super(DecodePredictions, self).__init__()
        self.anchors = anchors
        self.num_classes = num_classes
        self.score_threshold = score_threshold
        self.nms_iou_threshold = nms_iou_threshold
        self.max_detections = max_detections

    def forward(self, box_deltas, class_probs):
        # 박스 변환
        decoded_boxes = self._decode_box_predictions(box_deltas)

        # 클래스 확률을 확률값으로 변환
        class_probs = torch.sigmoid(class_probs)

        # NMS 수행
        detections = []
        for class_id in range(1, self.num_classes):  # 클래스 0은 배경이므로 제외
            class_scores = class_probs[:, class_id]
            mask = class_scores > self.score_threshold
            filtered_boxes = decoded_boxes[mask]
            filtered_scores = class_scores[mask]

            # Non-Maximum Suppression (NMS)
            selected_indices = ops.nms(filtered_boxes, filtered_scores, self.nms_iou_threshold)
            selected_boxes = filtered_boxes[selected_indices]
            selected_scores = filtered_scores[selected_indices]
            selected_class_ids = torch.full((selected_indices.shape[0],), class_id, dtype=torch.int64)

            detections.append(torch.cat((selected_boxes, selected_scores.unsqueeze(1), selected_class_ids.unsqueeze(1)), dim=1))

        return detections

    def _decode_box_predictions(self, box_deltas):
        anchors = self.anchors
        widths = anchors[:, 2] - anchors[:, 0]
        heights = anchors[:, 3] - anchors[:, 1]
        x_center = anchors[:, 0] + 0.5 * widths
        y_center = anchors[:, 1] + 0.5 * heights

        dx, dy, dw, dh = box_deltas.split(1, dim=1)
        dx, dy, dw, dh = dx.squeeze(1), dy.squeeze(1), dw.squeeze(1), dh.squeeze(1)

        width = torch.exp(dw) * widths
        height = torch.exp(dh) * heights
        x_center += dx * widths
        y_center += dy * heights

        x1 = x_center - 0.5 * width
        y1 = y_center - 0.5 * height
        x2 = x_center + 0.5 * width
        y2 = y_center + 0.5 * height

        decoded_boxes = torch.stack([x1, y1, x2, y2], dim=1)

        return decoded_boxes


In [None]:
class DecodePredictions(tf.keras.layers.Layer):
    def __init__(
        self,
        num_classes=80,
        confidence_threshold=0.05,
        nms_iou_threshold=0.5,
        max_detections_per_class=100,
        max_detections=100,
        box_variance=[0.1, 0.1, 0.2, 0.2],
        **kwargs
    ):
        super(DecodePredictions, self).__init__(**kwargs)
        self.num_classes = num_classes
        self.confidence_threshold = confidence_threshold
        self.nms_iou_threshold = nms_iou_threshold
        self.max_detections_per_class = max_detections_per_class
        self.max_detections = max_detections

        self._anchor_box = AnchorBox()
        self._box_variance = tf.convert_to_tensor(
            [0.1, 0.1, 0.2, 0.2], dtype=tf.float32
        )

    def _decode_box_predictions(self, anchor_boxes, box_predictions):
        boxes = box_predictions * self._box_variance
        boxes = tf.concat(
            [
                ##### CODE HERE #####
            ],
            axis=-1,
        )
        boxes_transformed = convert_to_corners(boxes)
        return boxes_transformed

    def call(self, images, predictions):
        image_shape = tf.cast(tf.shape(images), dtype=tf.float32)
        anchor_boxes = self._anchor_box.get_anchors(image_shape[1], image_shape[2])

        ##### CODE HERE #####

        return tf.image.combined_non_max_suppression(
            tf.expand_dims(boxes, axis=2),
            cls_predictions,
            self.max_detections_per_class,
            self.max_detections,
            self.nms_iou_threshold,
            self.confidence_threshold,
            clip_boxes=False,
        )

### Validation data를 이용하여 결과를 화면에 출력하기

    - image를 입력으로 하고 이를 학습된 RetinaNet을 통과시킨 뒤,
      위에서 작성한 DecodePredictions를 통과한 결과가 출력이 되는 model을 하나 생성합니다.

In [None]:
import torch.nn as nn

class RetinaNetInferenceModel(nn.Module):
    def __init__(self, retina_net, decode_predictions):
        super(RetinaNetInferenceModel, self).__init__()
        self.retina_net = retina_net
        self.decode_predictions = decode_predictions

    def forward(self, inputs):
        # RetinaNet 모델 통과
        box_deltas, class_probs = self.retina_net(inputs)

        # DecodePredictions 적용
        detections = self.decode_predictions(box_deltas, class_probs)

        return detections

# RetinaNet 모델과 DecodePredictions을 결합하여 Inference 모델 생성
retina_net = YourRetinaNetModel()  # RetinaNet 모델을 여기에 넣으세요
decode_predictions = DecodePredictions(anchors, num_classes)  # anchors와 num_classes를 설정하세요
inference_model = RetinaNetInferenceModel(retina_net, decode_predictions)


In [None]:
image = tf.keras.Input(shape=[None, None, 3], name="image")
predictions = model(image, training=False)
detections = DecodePredictions(confidence_threshold=0.5)(image, predictions)
inference_model = tf.keras.Model(inputs=image, outputs=detections)

    - image와 bounding box, class name을 출력하기 위한 함수를 다음과 같이 작성합니다.

In [None]:
import cv2
import numpy as np

def draw_boxes(image, boxes, class_names):
    """
    입력 이미지 위에 바운딩 박스와 클래스 이름을 그립니다.

    Args:
        image (numpy.ndarray): 입력 이미지.
        boxes (list): 바운딩 박스 리스트. 각 박스는 [xmin, ymin, xmax, ymax] 형식의 좌표를 포함합니다.
        class_names (list): 클래스 이름 리스트. 각 클래스에 해당하는 이름을 포함합니다.

    Returns:
        numpy.ndarray: 바운딩 박스와 클래스 이름이 그려진 이미지.
    """
    for box, class_name in zip(boxes, class_names):
        xmin, ymin, xmax, ymax = map(int, box)
        color = (0, 255, 0)  # 바운딩 박스 색상 (여기서는 녹색)
        thickness = 2  # 바운딩 박스 두께

        # 이미지에 바운딩 박스 그리기
        cv2.rectangle(image, (xmin, ymin), (xmax, ymax), color, thickness)

        # 클래스 이름 텍스트 설정
        text = f"{class_name}"
        font_scale = 0.7
        font = cv2.FONT_HERSHEY_SIMPLEX
        font_thickness = 1
        text_size, _ = cv2.getTextSize(text, font, font_scale, font_thickness)[0]

        # 텍스트 배경 그리기
        bg_color = (0, 0, 0)  # 텍스트 배경 색상 (여기서는 검은색)
        text_xmin = xmin
        text_ymin = ymin - text_size[1] - 2
        text_xmax = text_xmin + text_size[0]
        text_ymax = ymin - 2
        cv2.rectangle(image, (text_xmin, text_ymin), (text_xmax, text_ymax), bg_color, -1)

        # 텍스트 그리기
        cv2.putText(image, text, (xmin, ymin - 5), font, font_scale, (0, 0, 255), font_thickness)

    return image


In [None]:
def visualize_detections(
    image, boxes, classes, scores, figsize=(7, 7), linewidth=1, color=[0, 0, 1]
):
    """Visualize Detections"""
    image = np.array(image, dtype=np.uint8)
    plt.figure(figsize=figsize)
    plt.axis("off")
    plt.imshow(image)
    ax = plt.gca()
    for box, _cls, score in zip(boxes, classes, scores):
        text = "{}: {:.2f}".format(_cls, score)
        x1, y1, x2, y2 = box
        w, h = x2 - x1, y2 - y1
        patch = plt.Rectangle(
            [x1, y1], w, h, fill=False, edgecolor=color, linewidth=linewidth
        )
        ax.add_patch(patch)
        ax.text(
            x1,
            y1,
            text,
            bbox={"facecolor": color, "alpha": 0.4},
            clip_box=ax.clipbox,
            clip_on=True,
        )
    plt.show()
    return ax

    - validation data에서 4개의 batch를 가져와서 각 batch의 첫번째 image에 대한 detection 결과를 다음과 같이 화면에 출력하여 확인합니다.

In [None]:
def prepare_image(image):
    image, _, ratio = resize_and_pad_image(image)
    image = tf.keras.applications.resnet.preprocess_input(image)
    return tf.expand_dims(image, axis=0), ratio


val_dataset = tfds.load("coco/2017", split="validation", data_dir="data")
int2str = dataset_info.features["objects"]["label"].int2str

for sample in val_dataset.take(4):
    image = tf.cast(sample["image"], dtype=tf.float32)
    input_image, ratio = prepare_image(image)
    detections = inference_model.predict(input_image)
    num_detections = detections.valid_detections[0]
    class_names = [
        int2str(int(x)) for x in detections.nmsed_classes[0][:num_detections]
    ]
    visualize_detections(
        image,
        detections.nmsed_boxes[0][:num_detections] / ratio,
        class_names,
        detections.nmsed_scores[0][:num_detections],
    )

In [None]:
import matplotlib.pyplot as plt

# validation_dataset에서 4개의 배치 가져오기
batch_count = 4
for batch_idx, (images, gt_boxes, class_ids) in enumerate(validation_dataset.take(batch_count)):
    # 첫 번째 이미지 선택
    image = images[0].numpy()

    # RetinaNet 모델을 통해 검출 결과 얻기
    detections = inference_model(images)

    # 검출 결과를 시각화하기 위한 정보 추출
    boxes = detections[0][:, :4]  # 바운딩 박스 좌표 (xmin, ymin, xmax, ymax)
    scores = detections[0][:, 4]  # 바운딩 박스 점수
    class_ids = detections[0][:, 5]  # 클래스 ID

    # 점수가 일정 값 이상인 검출 결과만 선택
    threshold = 0.5  # 점수 임계값 (조절 가능)
    selected_indices = scores >= threshold
    selected_boxes = boxes[selected_indices]
    selected_class_ids = class_ids[selected_indices]

    # 클래스 이름 리스트 (예: COCO 클래스 리스트)
    class_names = ["background", "class1", "class2", "class3", ...]  # 클래스 이름을 적절히 설정

    # 클래스 ID를 클래스 이름으로 변환
    selected_class_names = [class_names[int(class_id)] for class_id in selected_class_ids]

    # 검출 결과를 이미지에 그리기
    result_image = draw_boxes(image.copy(), selected_boxes, selected_class_names)

    # 이미지 출력
    plt.imshow(result_image)
    plt.axis('off')
    plt.title(f'Batch {batch_idx + 1}')
    plt.show()
