# To do

這份檔案會講解以下幾個部分:

* COCO format 資料讀取
* Object detection 模型訓練與評估

In [None]:
# import libraries

# basic
import warnings
warnings.filterwarnings('ignore')

import os
import random
import numpy as np
import pandas as pd
from tqdm.notebook import tqdm

# visualization
import cv2
import matplotlib.pyplot as plt
from PIL import Image

# PyTorch
import torch
import torch.nn as nn
import torchvision
import albumentations as A
from albumentations.pytorch import ToTensorV2
from torch.utils.data import Dataset, DataLoader
from torchvision import models
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection.mask_rcnn import MaskRCNNPredictor
from torchvision.models.detection import RetinaNet
from torchvision.models.detection.retinanet import RetinaNetClassificationHead


# object detection
!pip install pycocotools
import pycocotools
from pycocotools.coco import COCO

為了使用 COCO API 來評估模型成效，我們會需要用到以下五個檔案，以下為 source code：

- https://raw.githubusercontent.com/pytorch/vision/main/references/detection/engine.py
- https://raw.githubusercontent.com/pytorch/vision/main/references/detection/utils.py
- https://raw.githubusercontent.com/pytorch/vision/main/references/detection/coco_utils.py
- https://raw.githubusercontent.com/pytorch/vision/main/references/detection/coco_eval.py
- https://raw.githubusercontent.com/pytorch/vision/main/references/detection/transforms.py

In [None]:
!wget https://raw.githubusercontent.com/pytorch/vision/main/references/detection/engine.py
!wget https://raw.githubusercontent.com/pytorch/vision/main/references/detection/utils.py
!wget https://raw.githubusercontent.com/pytorch/vision/main/references/detection/coco_utils.py
!wget https://raw.githubusercontent.com/pytorch/vision/main/references/detection/coco_eval.py
!wget https://raw.githubusercontent.com/pytorch/vision/main/references/detection/transforms.py
    
from engine import evaluate

In [None]:
class config:
    
    root = "/kaggle/input/hw5-dataset-new"
    num_classes = 8
    batch_size = 4
    epochs = 100
    weight_decay = 0.0005
    lr = 0.005
    momentum = 0.9
    milestones = [20,40,60,80]
    gamma = 0.1
    seed = 42
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    categories = ['normal', 'aortic_curvature', 'aortic_atherosclerosis_calcification', 
                  'cardiac_hypertrophy', 'intercostal_pleural_thickening', 'lung_field_infiltration', 
                  'degenerative_joint_disease_of_the_thoracic_spine', 'scoliosis']

In [None]:
def seed_everything(seed):
    # Set Python random seed
    random.seed(seed)
    
    # Set NumPy random seed
    np.random.seed(seed)
    
    # Set PyTorch random seed for CPU and GPU
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)
    
    # Set PyTorch deterministic operations for cudnn backend
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

---

# Read data information

我們可以利用 `pycocotools` 這個套件來讀取 .json 檔案中的資料，此處簡單介紹我們所創建的 COCO dataset 所包含的三個部分： <br>
( 更詳細的介紹可閱讀[這裡](https://docs.aws.amazon.com/rekognition/latest/customlabels-dg/md-coco-overview.html) )

## Categories

包含所有類別的 dictionary ( 不含 background ) ，每個 dictionary 中需要 2 個 key :

* `id` : 類別編號
* `name` : 類別名稱

In [None]:
annfile = config.root + "/train.json"
coco = COCO(annfile)
coco.cats

In [None]:
annfile_test = config.root + "/test.json"
coco_test = COCO(annfile_test)
coco_test.cats
coco_test.loadImgs(80)


## Images

影像相關資訊，一個 dictionary 含一張影像，內有 4 個 key :

* `file_name` : 影像路徑
* `height` : 影像高度
* `width` : 影像寬度
* `id` : 影像編號 ( unique  )

In [None]:
coco_test.loadImgs(80)

## Annotations

標註相關資訊，一個 dictionary 只包含一個 annotation ( bounding box ) ，共有 7 個 key :

* `id` : 該 annotation 的編號
* `image_id` : 該 bounding box 所屬影像的編號
* `category_id` : 該 bounding box 所屬類別的編號
* `bbox` : bounding box 的標註資訊，格式為 $[\text{xmin}, \text{ymin}, \text{width}, \text{height}]$。$\text{xmin}$ 和 $\text{ymin}$ 表示 bounding box 左上角在影像上的座標，$\text{width}$ 和 $\text{height}$ 則為 bounding box 的寬跟高
* `area` : 每個 bounding box 所圍出的面積。
* `iscrowd` : 是單一物件 ( 0 ) 或一組物件 ( 1 )。segmentation 時使用，此處直接設為 0 即可
* `segmentation` : segmentation 時使用，可忽略

In [None]:
ann_ids = coco.getAnnIds(imgIds = 80)
coco.loadAnns(ann_ids)

In [None]:
ann_ids = coco_test.getAnnIds(imgIds = 80)
coco_test.loadAnns(ann_ids)

---

# Data augmentation

由於經過 transform 後，圖片中的 bounding box 的位置與大小也會跟著改變，因此要將 bounding box 也一起進行轉換。

In [None]:
def get_transforms(train = False):
    
    if train:
        transform = A.Compose([
            A.Resize(800, 800),
            A.HorizontalFlip(p = 0.3),
            A.RandomBrightnessContrast(p = 0.1),
            A.ColorJitter(p = 0.1),
            ToTensorV2()
        ], bbox_params = A.BboxParams(format = "coco"))
    else:
        transform = A.Compose([
            A.Resize(800, 800),
            ToTensorV2()
        ], bbox_params = A.BboxParams(format = "coco"))
    
    return transform

---

# Dataset

在 Dataset 的部分，我們需要回傳的東西有兩項：image 和 target。

image 與先前作業沒有太大差異，只有讀取方式有所不同。至於 target 則是一個 dictionary，裡面需包含 5 個 key：

1. `boxes`：該影像中所有 bounding box 的標註，格式為 $[\text{xmin}, \text{ymin}, \text{xmax}, \text{ymax}]$。$\text{xmin}$ 和 $\text{ymin}$ 表示 bounding box 左上角在影像上的座標，$\text{xmax}$ 和 $\text{ymax}$ 則表示 bounding box 右下角在影像上的座標
2. `labels`：每個 bounding box 所對應的疾病類別
3. `image_id`：影像編號
4. `area`：每個 bounding box 所圍出的面積。**若 bounding box 有經過 transform，一定要記得重新計算**
5. `iscrowd`：是單一物件 ( 0 ) 或一組物件 ( 1 )。segmentation 時使用，此處直接設為 0 即可

In [None]:
class CXRDataset(Dataset):
    
    def __init__(self, root, split, transforms = None):
        self.split = split
        self.root = root
        self.transforms = transforms
        self.coco = COCO(os.path.join(config.root, f"{self.split}.json"))
        self.ids = list(sorted(self.coco.imgs.keys()))
        self.ids = [img_id for img_id in self.ids if (len(self.get_annotation(img_id)) > 0)]
    
    def get_image(self, img_id: int):
        image_path = self.coco.loadImgs(img_id)[0]['file_name']
        image = cv2.imread(os.path.join(self.root, image_path))
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        
        return image
    
    def get_annotation(self, img_id: int):
        annotation = self.coco.loadAnns(self.coco.getAnnIds(img_id))
        
        return annotation
          
    def __getitem__(self, index):
        img_id = self.ids[index]
        image = self.get_image(img_id)
        annotation = self.get_annotation(img_id)
        bboxes = [a['bbox'] + [a['category_id']] for a in annotation]
        
        if self.transforms:
            transformed = self.transforms(image = image, bboxes = bboxes)
        
        image = transformed['image'] / 255
        bboxes = transformed['bboxes']
        
        new_bboxes = list(map(lambda x: [x[0], x[1], x[0] + x[2], x[1] + x[3]], bboxes))
        new_bboxes = torch.tensor(new_bboxes, dtype = torch.float32)
        
        target = {}
        target['boxes'] = new_bboxes
        target['labels'] = torch.tensor([a['category_id'] for a in annotation], dtype = torch.int64)
        target['image_id'] = img_id
        target['area'] = (new_bboxes[:, 3] - new_bboxes[:, 1]) * (new_bboxes[:, 2] - new_bboxes[:, 0])
        target['iscrowd'] = torch.tensor([a['iscrowd'] for a in annotation], dtype = torch.int64)
        
        return image, target
        
    def __len__(self):
        return len(self.ids)

## Collate_fn

用於 dataloader。由於 object detection 讀取 data 的方式與先前的 classification 和 segmentation 有所不同，因此需自定義 `collate_fn`。 <br>
此處有沒有加 `tuple()` 都沒關係。

In [None]:
def collate_fn(batch: list[torch.tensor, dict]):
    return zip(*batch)


In [None]:

train_dataset = CXRDataset(root = config.root, split = "train", transforms = get_transforms(train = True))
train_loader = DataLoader(train_dataset, batch_size = config.batch_size, shuffle = False, collate_fn = collate_fn)
print(len(train_dataset))
print(len(train_loader))
      

---

# Model: Faster R-CNN

這邊使用 torchvision 中內建的 Faster R-CNN 模型，並加載預訓練權重，但要記得更改 predictor 的類別數量為 8 類 ( 含 background，也就是 normal ) ，如下所示： 

In [None]:
def fasterrcnn(num_classes):
    model = models.detection.fasterrcnn_resnet50_fpn(pretrained = True)
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

    return model

模型架構如下：

In [None]:
model = fasterrcnn(num_classes = config.num_classes)
print(model)

del model

---

# Training

在 PyTorch 的 Faster R-CNN 這個模型中，我們不須再自行定義 loss function，因為在 `model.train()` 下，`model(images, targets)` 會自動回傳訓練的 loss，其包含以下四種損失：

1. `loss_classifier`：分類器之損失
2. `loss_box_reg`：bounding box regressor 之損失
3. `loss_rpn_box_reg`：RPN 中 bounding box regressor 之損失
4. `loss_objectness`：RPN 中分類器之損失，此分類器用以判斷 bounding box 中是否包含物體

總損失為這四種 loss 的總和。

In [None]:
def train_one_epoch(model, train_loader, optimizer, scheduler, device):
    model.train()
    
    train_loss = []
    train_loss_dict = []
    
    for images, targets in tqdm(train_loader):
        images = list(image.to(device) for image in images)
        targets = [{k: torch.tensor(v).to(device) for k, v in t.items()} for t in targets]
        
        optimizer.zero_grad()
        
        loss = model(images, targets)
        total_loss = sum(l for l in loss.values())
        loss_value = total_loss.item()
        loss_dict = {k: v.item() for k, v in loss.items()}
        train_loss.append(loss_value)
        train_loss_dict.append(loss_dict)
        
        total_loss.backward()
        optimizer.step()
    scheduler.step()
        
    train_loss = np.mean(train_loss)
    
    train_loss_dict = pd.DataFrame(train_loss_dict)
    train_loss_classifier = train_loss_dict['loss_classifier'].mean()
    train_loss_box_reg = train_loss_dict['loss_box_reg'].mean()
    train_loss_rpn_box_reg = train_loss_dict['loss_rpn_box_reg'].mean()
    train_loss_objectness = train_loss_dict['loss_objectness'].mean()
    
    return train_loss, train_loss_classifier, train_loss_box_reg, train_loss_rpn_box_reg, train_loss_objectness

---

# Validation

在此模型中，若設定 `model.eval()`，只會返回預測的 bounding box、confidence score 和該 bounding box 的 label。

為了取得 validation set 的 loss 以選出最好的模型，這裡我在進行 validation 時使用 `model.train()`。如果要這麼做，需要把模型中的 batch normalization 和 dropout 的係數固定住，但因 Faster R-CNN 中不含 dropout 層，所以只需凍結 batch normalization 層的參數。

In [None]:
@torch.no_grad()
def validation(model, val_loader, device):
    model.train()
    
    for m in model.modules():
        if isinstance(m, torchvision.ops.Conv2dNormActivation):
            m.eval()
    
    val_loss = []
    val_loss_dict = []
    
    for images, targets in tqdm(val_loader):
        images = list(image.to(device) for image in images)
        targets = [{k: torch.tensor(v).to(device) for k, v in t.items()} for t in targets]
        
        loss = model(images, targets)
        total_loss = sum(l for l in loss.values())
        loss_value = total_loss.item()
        loss_dict = {k: v.item() for k, v in loss.items()}
        val_loss.append(loss_value)
        val_loss_dict.append(loss_dict)
    
    val_loss = np.mean(val_loss)
    
    val_loss_dict = pd.DataFrame(val_loss_dict)
    val_loss_classifier = val_loss_dict['loss_classifier'].mean()
    val_loss_box_reg = val_loss_dict['loss_box_reg'].mean()
    val_loss_rpn_box_reg = val_loss_dict['loss_rpn_box_reg'].mean()
    val_loss_objectness = val_loss_dict['loss_objectness'].mean()
    
    return val_loss, val_loss_classifier, val_loss_box_reg, val_loss_rpn_box_reg, val_loss_objectness

# Test

In [None]:
def predict(input_tensor, model, device):
    outputs = model(input_tensor)
    pred_classes = [config.categories[i] for i in outputs[0]['labels'].cpu().numpy()]
    pred_labels = outputs[0]['labels'].cpu().numpy()
    pred_scores = outputs[0]['scores'].detach().cpu().numpy()
    pred_bboxes = outputs[0]['boxes'].detach().cpu().numpy()
    
    boxes, classes, labels, scores, indices = [], [], [], [], []
    for index in range(len(pred_scores)):
        # 不再檢查閾值，保留所有預測框
        boxes.append(pred_bboxes[index].astype(np.int32))
        classes.append(pred_classes[index])
        labels.append(pred_labels[index])
        scores.append(pred_scores[index])
        indices.append(index)
    
    return boxes, classes, labels, scores, indices

---

# Main

In [None]:
def main():
    
    seed_everything(config.seed)
    
    train_dataset = CXRDataset(root = config.root, split = "train", transforms = get_transforms(train = True))
    val_dataset = CXRDataset(root = config.root, split = "val", transforms = get_transforms(train = False))

    train_loader = DataLoader(train_dataset, batch_size = config.batch_size, shuffle = True, collate_fn = collate_fn)
    val_loader = DataLoader(val_dataset, batch_size = config.batch_size, shuffle = False, collate_fn = collate_fn)
    
    device = config.device
    model =  fasterrcnn(num_classes = config.num_classes)
    model.to(device)
    parameters = [p for p in model.parameters() if p.requires_grad]
    optimizer = torch.optim.SGD(parameters, lr = config.lr, momentum = config.momentum, nesterov = True, weight_decay = config.weight_decay)
    scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones = config.milestones, gamma = config.gamma)
    #scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size = 5, gamma = config.gamma)
    
    best_val_loss = float("inf")
    history = {
        "train": {
            "loss": [],
            "loss_classifier": [],
            "loss_box_reg": [],
            "loss_rpn_box_reg": [],
            "loss_objectness": []
        },
        "val": {
            "loss": [],
            "loss_classifier": [],
            "loss_box_reg": [],
            "loss_rpn_box_reg": [],
            "loss_objectness": []
        },
    }
    
    for epoch in range(config.epochs):
        train_loss, train_loss_classifier, train_loss_box_reg, train_loss_rpn_box_reg, train_loss_objectness = train_one_epoch(model, train_loader, optimizer, scheduler, device)
        val_loss, val_loss_classifier, val_loss_box_reg, val_loss_rpn_box_reg, val_loss_objectness = validation(model, val_loader, device)
    
        history["train"]["loss"].append(train_loss)
        history["train"]["loss_classifier"].append(train_loss_classifier)
        history["train"]["loss_box_reg"].append(train_loss_box_reg)
        history["train"]["loss_rpn_box_reg"].append(train_loss_rpn_box_reg)
        history["train"]["loss_objectness"].append(train_loss_objectness)
        
        history["val"]["loss"].append(val_loss)
        history["val"]["loss_classifier"].append(val_loss_classifier)
        history["val"]["loss_box_reg"].append(val_loss_box_reg)
        history["val"]["loss_rpn_box_reg"].append(val_loss_rpn_box_reg)
        history["val"]["loss_objectness"].append(val_loss_objectness)
        
        print(f'Epoch: {epoch+1}/{config.epochs} | LR: {optimizer.state_dict()["param_groups"][0]["lr"]:.6f}')
        print("*****Training*****")
        print(f'Loss: {train_loss:.4f} | Classifier Loss: {train_loss_classifier:.4f} | Box Reg Loss: {train_loss_box_reg:.4f} | RPN Box Reg Loss: {train_loss_rpn_box_reg:.4f} | Objectness Loss: {train_loss_objectness:.4f}')
        evaluate(model, train_loader, device = device)
        print("*****Validation*****")
        print(f'Loss: {val_loss:.4f} | Classifier Loss: {val_loss_classifier:.4f} | Box Reg Loss: {val_loss_box_reg:.4f} | RPN Box Reg Loss: {val_loss_rpn_box_reg:.4f} | Objectness Loss: {val_loss_objectness:.4f}')
        evaluate(model, val_loader, device = device)
        
        if val_loss < best_val_loss:
            save_file = {
                "model": model.state_dict(),
                "optimizer": optimizer.state_dict(),
                "scheduler": scheduler.state_dict(),
                "epoch": epoch,
                "args": config
            }
            best_val_loss = val_loss
            torch.save(save_file, "checkpoint.pth")
            
    best_ckpt = torch.load("checkpoint.pth", map_location = device)
    model.load_state_dict(best_ckpt["model"])
        
    plt.figure(figsize = (12, 5))
    plt.subplot(1, 2, 1)
    plt.plot(range(config.epochs), history["train"]["loss"], label = 'Training Loss')
    plt.plot(range(config.epochs), history["val"]["loss"], label = 'Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.title('Training and Validation Loss Curves')
    plt.show()
        
    plt.figure(figsize = (12, 5))
    plt.subplot(1, 2, 1)
    plt.plot(range(config.epochs), history["train"]["loss_classifier"], label = 'Training Classifier Loss')
    plt.plot(range(config.epochs), history["val"]["loss_classifier"], label = 'Validation Classifier Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Classifier Loss')
    plt.legend()
    plt.title('Training and Validation Classifier Loss Curves')
    plt.show()
        
    plt.figure(figsize = (12, 5))
    plt.subplot(1, 2, 1)
    plt.plot(range(config.epochs), history["train"]["loss_box_reg"], label = 'Training Box Reg Loss')
    plt.plot(range(config.epochs), history["val"]["loss_box_reg"], label = 'Validation Box Reg Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Box Reg Loss')
    plt.legend()
    plt.title('Training and Validation Box Reg Loss Curves')
    plt.show()
        
    plt.figure(figsize = (12, 5))
    plt.subplot(1, 2, 1)
    plt.plot(range(config.epochs), history["train"]["loss_rpn_box_reg"], label = 'Training RPN Box Reg Loss')
    plt.plot(range(config.epochs), history["val"]["loss_rpn_box_reg"], label = 'Validation RPN Box Reg Loss')
    plt.xlabel('Epoch')
    plt.ylabel('RPN Box Reg Loss')
    plt.legend()
    plt.title('Training and Validation RPN Box Reg Loss Curves')
    plt.show()
        
    plt.figure(figsize = (12, 5))
    plt.subplot(1, 2, 1)
    plt.plot(range(config.epochs), history["train"]["loss_objectness"], label = 'Training Objectness Loss')
    plt.plot(range(config.epochs), history["val"]["loss_objectness"], label = 'Validation Objectness Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Objectness Loss')
    plt.legend()
    plt.title('Training and Validation Objectness Loss Curves')
    plt.show()
    
    print("Test data")
    
    model.to(device)
    transforms =  torchvision.transforms.Compose([ torchvision.transforms.ToTensor(), ])
    # 假設 test.csv 文件有 'ID'、'Filename' 等列
    test_df = pd.read_csv('/kaggle/input/hw5-dataset/hwk05_data/test.csv')
    all_images = os.listdir(os.path.join(config.root, 'test'))
    # 創建一個將檔案名映射到ID的字典
    filename_to_id = dict(zip(test_df['Filename'], test_df['ID']))
    predictions_list = []
    for image_name in all_images:
        image_path = os.path.join(config.root, 'test', image_name)
        image = Image.open(image_path)
        current_filename = f'{image_name.split(".")[0]}.dcm'
        # 從字典中獲取對應的 ID
        current_id = filename_to_id.get(current_filename, '未知')
        # 為每個圖像初始化 prediction_dict
        prediction_dict = {'ID': current_id, 'Filename': current_filename, 'predictions': []}
        image = np.array(image)
        image = cv2.resize(image, (800, 800))
        image_float_np = np.float32(image) / 255
        input_tensor = transforms(image)
        input_tensor = input_tensor.to(config.device)
        input_tensor = input_tensor.unsqueeze(0)
        boxes, classes, labels, scores, indices = predict(input_tensor, model, config.device)
        for box, cls, label, score, ind in zip(boxes, classes, labels, scores, indices):
            xmin, ymin, xmax, ymax = box.tolist()
            box_dict = {
                'Class': cls,
                'Label': label,
                'Score': score,
                'xmin': xmin/800,
                'ymin': ymin/800,
                'xmax': xmax/800,
                'ymax': ymax/800
            }
            prediction_dict['predictions'].append(box_dict)
        predictions_list.append(prediction_dict)
    predictions_list = sorted(predictions_list, key=lambda x: x['Filename'])

        # 將 predictions_list 轉換為 DataFrame
    columns = ['ID', 'Category', 'Score', 'xmin', 'ymin', 'xmax', 'ymax']
    data = []
    for prediction in predictions_list:
        current_id = prediction['ID']
        for box_dict in prediction['predictions']:
            current_class = box_dict['Class']
            current_score = box_dict['Score']
            xmin = box_dict['xmin']
            ymin = box_dict['ymin']
            xmax = box_dict['xmax']
            ymax = box_dict['ymax']
                
            data.append([current_id, current_class, current_score, xmin, ymin, xmax, ymax])
    result_df = pd.DataFrame(data, columns=columns)
    # 將結果 DataFrame 保存為 CSV 檔
    result_df.to_csv('fastrcnn.csv', index=False)
    print("fastrcnn save")

 

In [None]:
if __name__ == "__main__":
    main()