<a href="https://colab.research.google.com/github/BrianChuan/TAICA_Computer-Vision/blob/main/HW1%20Object%20Detection/HW1_%E6%B3%A8%E5%B0%84%E5%A0%B4%E8%B1%AC%E9%9A%BB%E8%BE%A8%E8%AD%98.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# A. 資料匯入與前處理

##Colab 雲端硬碟掛載 (Mount Google Drive)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## 資料路徑設定 (Data Path Setup)

In [None]:
# 設定訓練資料集的根目錄
DATA_ROOT = '/content/drive/MyDrive/Colab Notebooks/電腦視覺與深度學習/taica-cvpdl-2025-hw-1/train/'

# 圖像資料夾路徑
IMAGE_DIR = DATA_ROOT + 'img/'

# 標註檔案路徑
GT_FILE_PATH = DATA_ROOT + 'gt.txt'

print(f"圖像資料夾路徑: {IMAGE_DIR}")
print(f"標註檔案路徑: {GT_FILE_PATH}")

圖像資料夾路徑: /content/drive/MyDrive/Colab Notebooks/電腦視覺與深度學習/taica-cvpdl-2025-hw-1/train/img/
標註檔案路徑: /content/drive/MyDrive/Colab Notebooks/電腦視覺與深度學習/taica-cvpdl-2025-hw-1/train/gt.txt


## gt.txt 資料解析與格式轉換 (Data Parsing)

In [None]:
import pandas as pd
import os

# 1. 讀取 gt.txt 檔案
df = pd.read_csv(
    GT_FILE_PATH,
    header=None,
    names=['frame_id', 'bb_left', 'bb_top', 'bb_width', 'bb_height'],
    delimiter=',' # 確保分隔符是逗號
)

# 2. 轉換 frame_id 到 圖像檔案名 (Image_ID)
df['image_filename'] = df['frame_id'].apply(lambda x: f'{x:08d}.jpg')

# 3. 計算 x_center 和 y_center (常用於 One-stage 模型)
# 原始格式: [x_min, y_min, w, h] (bb_left, bb_top, bb_width, bb_height)
# 轉換目標: [x_center, y_center, w, h] (常用於 YOLO 類模型)
df['x_center'] = df['bb_left'] + df['bb_width'] / 2
df['y_center'] = df['bb_top'] + df['bb_height'] / 2

# 整理最終的 DataFrame
df_gt = df[['image_filename', 'bb_left', 'bb_top', 'bb_width', 'bb_height', 'x_center', 'y_center']]

# 顯示前幾行確認解析正確
print("\n--- Ground Truth Data (前 5 行) ---")
print(df_gt.head())
print(f"\n總標註數量: {len(df_gt)}")
print(f"總圖像數量 (不重複): {df_gt['image_filename'].nunique()}")


--- Ground Truth Data (前 5 行) ---
  image_filename  bb_left  bb_top  bb_width  bb_height  x_center  y_center
0   00000001.jpg      307      50        96         18     355.0      59.0
1   00000001.jpg      308      63       101         41     358.5      83.5
2   00000001.jpg      330      78       104         76     382.0     116.0
3   00000001.jpg      403      47        52         79     429.0      86.5
4   00000001.jpg      440      55        59         83     469.5      96.5

總標註數量: 38747
總圖像數量 (不重複): 1270


# B. 資料載入器＆模型建構

## PyTorch 資料集類別 (Dataset Class)
- 將圖像與解析後的 gt.txt 送入模型。
- 使用 PyTorch 框架來定義此類別的 Dataset。
- 關鍵功能：將像素座標轉換為訓練 One-Stage 模型所需的標準化座標。


In [None]:
import torch
from torch.utils.data import Dataset
from PIL import Image
import numpy as np
import os
# 我們需要使用 pandas DataFrame，假設您已經完成了步驟 A 的解析
# from B.1:
# df_gt = ... (您的 Ground Truth DataFrame)
# IMAGE_DIR = ... (圖像資料夾路徑)

class PigDetectionDataset(Dataset):
    def __init__(self, df_gt, image_dir, image_size=(416, 416), transform=None):
        """
        初始化資料集。
        :param df_gt: 包含所有邊界框標註的 Pandas DataFrame。
        :param image_dir: 圖像檔案的根目錄。
        :param image_size: 訓練時將圖像resize到的目標尺寸 (W, H)。
        :param transform: 資料增強 (Data Augmentation) 函數。
        """
        self.image_dir = image_dir
        self.image_size = image_size
        self.transform = transform

        # 取得所有不重複的圖像檔案名
        self.image_files = df_gt['image_filename'].unique().tolist()

        # 將 DataFrame 轉換為以檔案名為鍵的字典，方便快速查找該圖像的所有標註
        self.annotations = {
            name: df_gt[df_gt['image_filename'] == name].to_dict('records')
            for name in self.image_files
        }

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        try:
            # 1. 載入圖像
            img_filename = self.image_files[idx]
            img_path = os.path.join(self.image_dir, img_filename)
            img = Image.open(img_path).convert('RGB')

            # 2. 獲取原始圖像尺寸
            original_w, original_h = img.size

            # 3. 獲取該圖像的所有邊界框 (BB)
            boxes_data = self.annotations[img_filename]

            # 轉換為 [x_center, y_center, w, h] 格式的 NumPy 陣列
            # 並且進行標準化 (Normalization)
            # One-stage 模型通常使用 0~1 的相對座標

            targets = []
            for box in boxes_data:
                # 原始像素值
                x_min, y_min, w, h = box['bb_left'], box['bb_top'], box['bb_width'], box['bb_height']
                x_center = x_min + w / 2
                y_center = y_min + h / 2

                # 標準化 (除以原始圖像的寬高)
                norm_x_center = x_center / original_w
                norm_y_center = y_center / original_h
                norm_w = w / original_w
                norm_h = h / original_h

                # 對於單一類別目標偵測（豬），類別 ID 為 0 [cite: 114]
                # 訓練資料格式: [class_id, x_center_norm, y_center_norm, w_norm, h_norm]
                targets.append([0, norm_x_center, norm_y_center, norm_w, norm_h])

            targets = np.array(targets, dtype=np.float32)

            # 4. 圖像 Resize (並在 Data Loader 中應用 Transform/Augmentation)
            img = img.resize(self.image_size)
            img = np.array(img).transpose(2, 0, 1) / 255.0 # HWC -> CWH, 0~1 標準化

            # 5. 轉換為 PyTorch Tensor
            image_tensor = torch.from_numpy(img).float()
            target_tensor = torch.from_numpy(targets).float()

            return image_tensor, target_tensor

        except FileNotFoundError:
            # --- 檔案讀取失敗的處理機制 ---
            # print(f"⚠️ Warning: File not found at index {index}, path: {img_path}. Sampling replacement.")

            # 1. 選擇一個隨機的有效索引作為替代
            # 確保不會無限遞歸，如果隨機選到的還是錯的，會再次進入 except 區塊
            random_index = random.randint(0, len(self) - 1)

            # 2. 遞歸呼叫 __getitem__，直到找到一個有效的資料
            # ⚠️ 遞歸呼叫時，將 num_workers 設置為 0 以避免 worker 卡死
            # 這是標準且安全的方法，可以確保 worker 不會因為單個錯誤停止
            return self.__getitem__(random_index)

## One-Stage 模型核心結構定義 (Model Core)
- 開始定義一個 YOLO 模型的核心骨幹和輸出層。
- *注意：每個層都比序從零開始初始化。

### 1. 骨幹網路(Backbone)
> 這裡使用簡化的 Conv/Batch Norm/LeakyReLU 模塊作為基礎

In [None]:
import torch.nn as nn

class ConvBlock(nn.Module):
    def __init__(self, in_c, out_c, kernel_size, stride, padding):
        super().__init__()
        self.conv = nn.Conv2d(in_c, out_c, kernel_size, stride, padding, bias=False)
        self.bn = nn.BatchNorm2d(out_c)
        self.leaky_relu = nn.LeakyReLU(0.1)

    def forward(self, x):
        return self.leaky_relu(self.bn(self.conv(x)))

# 簡化的骨幹 (例如 ResNet/Darknet-like)
class SimpleBackbone(nn.Module):
    def __init__(self, in_c=3):
        super().__init__()
        self.features = nn.Sequential(
            ConvBlock(in_c, 32, 3, 1, 1),
            nn.MaxPool2d(2, 2), # 降採樣
            ConvBlock(32, 64, 3, 1, 1),
            nn.MaxPool2d(2, 2),
            ConvBlock(64, 128, 3, 1, 1),
            ConvBlock(128, 64, 1, 1, 0),
            ConvBlock(64, 128, 3, 1, 1),
            # ... 這裡可以繼續添加更多的卷積層和殘差塊來加深網路
        )
        # 確保所有層次都是從零開始初始化
        self._initialize_weights()

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                # He initialization (適合 ReLU/Leaky ReLU)
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='leaky_relu')
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

    def forward(self, x):
        return self.features(x)

### 檢測頭(Detection Head)
> 負責將骨幹提取的特徵轉換為最終的邊界框預測

- 對於YOLO，每個網格預測 $B$ 個錨匡。
- 每個錨匡會有：物體信心度、類別信心度、邊界匡座標，三種資訊。
- 輸出通道數：$B×(4(coords)+1(confidence)+1(class))=B×6$
- 以下實驗中假設 $B=3$

In [None]:
class YOLOHead(nn.Module):
    def __init__(self, in_c, num_anchors=3):
        super().__init__()
        # 最終輸出層
        # B * (4 + 1 + 1) = num_anchors * 6
        self.conv = nn.Conv2d(in_c, num_anchors * 6, kernel_size=1)

    def forward(self, x):
        # 輸出格式: (Batch, num_anchors * 6, Grid_H, Grid_W)
        return self.conv(x)

# C. 複雜資料增強
> 在 YOLO 系列的訓練中，Mosaic 增強是最具影響力的技術之一。

## 1. Mosaic 增強原理
> 將四張隨機選取的訓練圖像，通過隨機裁剪、縮放、旋轉等方式，拼接成一張新的訓練圖像。
- 圖像處理： 四張圖被放置在四個象限，中間的接縫點 (Center Point) 是隨機選取的。
- 標註處理： 四張圖的所有真實邊界框都必須根據其在新的 Mosaic 圖像中的位置重新計算座標。

## 2. 實作 Mosaic 函數


In [None]:
import cv2 # 通常用於資料增強和圖像處理
import random
import torch

# 圖像處理和增強的工具函數 (為了簡潔，這裡僅為示意)
# 實際的實作需要包含圖像裁剪、縮放、顏色空間轉換等細節。

def get_mosaic_item(dataset, full_mosaic_size=416):
    """
    從訓練數據集中隨機獲取並拼接四張圖像及其標註。

    :param dataset: 您的 PigDetectionDataset 實例。
    :param full_mosaic_size: 最終 Mosaic 圖像的寬高 (例如 416x416)。
    :return: 拼接後的圖像張量 (Tensor) 和新的標註 (Normalized Targets)。
    """
    mosaic_img = np.zeros((full_mosaic_size * 2, full_mosaic_size * 2, 3), dtype=np.uint8)
    mosaic_targets = []

    # 隨機決定 Mosaic 接縫點的中心 (中心點在 0.5 到 1.5 倍尺寸之間)
    # 我們將中心點隨機化到 (0.5*S ~ 1.5*S) 的範圍
    s = full_mosaic_size
    center_x = int(random.uniform(0.5 * s, 1.5 * s))
    center_y = int(random.uniform(0.5 * s, 1.5 * s))

    # 四個圖像的索引 (隨機抽取四張圖)
    indices = [random.randint(0, len(dataset) - 1) for _ in range(4)]

    # 處理四個象限 (i=0, 1, 2, 3)
    for i, index in enumerate(indices):
        # 1. 獲取單張圖像和標註
        # 注意: 這裡我們呼叫 dataset.__getitem__ 來獲取原始圖像和標準化標註
        # 為了簡化，我們假設這裡的 dataset.__getitem__ 返回的是 PIL 圖像和未經標準化的像素邊界框
        # 在實際的 YOLO 實作中，__getitem__ 應該返回原始圖像和像素 BB，然後在這裡進行隨機縮放和標準化。

        # 由於我們沒有完整的Dataset類，這裡暫時將圖像載入和標註解析整合：
        # 實際應用中，您會從 dataset[index] 獲取數據。

        # 為了讓 Colab 環境可行，這裡只能做基本示意：
        # 我們假設從 dataset 獲取的 img_i 是已經 Resize 到 S x S 的 numpy 圖像，
        # 且 targets_i 是已經標準化的 [class, x_norm, y_norm, w_norm, h_norm]

        # 實際訓練代碼中，您需要從 dataset[index] 獲取未處理的原始圖像和像素座標
        # 這裡我們用一個簡易的替代方案 (你需要自行調整 Dataset 類別):

        # --- 簡易替代方案開始 (實際訓練需要更複雜的數據管道) ---
        img_tensor, target_tensor = dataset[index]
        # 將 tensor 轉回 numpy (HWC)
        img_i = (img_tensor.permute(1, 2, 0).numpy() * 255).astype(np.uint8)
        targets_i = target_tensor.numpy()
        h, w, _ = img_i.shape
        # --- 簡易替代方案結束 ---

        # 2. 確定圖像放置的象限 (top-left, top-right, bottom-left, bottom-right)

        if i == 0: # 頂部左側 (Top-Left)
            # Mosaic 圖像上的座標範圍 (x_min, y_min) -> (x_max, y_max)
            x_min, y_min = center_x - w, center_y - h
            x_max, y_max = center_x, center_y

        elif i == 1: # 頂部右側 (Top-Right)
            x_min, y_min = center_x, center_y - h
            x_max, y_max = center_x + w, center_y

        elif i == 2: # 底部左側 (Bottom-Left)
            x_min, y_min = center_x - w, center_y
            x_max, y_max = center_x, center_y + h

        elif i == 3: # 底部右側 (Bottom-Right)
            x_min, y_min = center_x, center_y
            x_max, y_max = center_x + w, center_y + h

        # 3. 裁剪與貼合 (Crop and Paste)

        # 確保圖像不會超出 Mosaic 邊界 (防止越界)
        x_min = max(x_min, 0)
        y_min = max(y_min, 0)
        x_max = min(x_max, full_mosaic_size * 2)
        y_max = min(y_max, full_mosaic_size * 2)

        # 貼上圖像
        # (這裡需要更複雜的裁剪邏輯來處理圖像和邊界框的邊界情況)

        # 簡化貼圖（假設不考慮裁剪，僅做貼上）
        if x_max > x_min and y_max > y_min:
             mosaic_img[y_min:y_max, x_min:x_max] = img_i[:y_max-y_min, :x_max-x_min]

        # 4. 邊界框座標轉換
        # 將邊界框從原本圖像的標準化座標，轉換到 Mosaic 圖像的絕對像素座標
        for target in targets_i:
            c, x_norm, y_norm, w_norm, h_norm = target

            # 轉換回原始圖像的像素座標 (假設 img_i 是 S x S)
            # x_pixel = x_norm * w (S)
            # y_pixel = y_norm * h (S)

            # --- 簡化標註轉換開始 ---
            # 為了實作，您需要將 target_tensor 中的標準化座標先轉換回像素座標
            # 然後加上 (x_min, y_min) 的偏移量，再進行裁剪處理
            # 並最終將其標準化到 (0, 0) -> (2S, 2S) 的範圍

            # 以下是將標準化座標轉換為 Mosaic 座標的核心邏輯 (你需要自己完成像素轉換的細節)

            # new_x_center = (x_norm * w) + x_min
            # new_y_center = (y_norm * h) + y_min
            # new_w = w_norm * w
            # new_h = h_norm * h

            # --- 簡化標註轉換結束 ---

            # 假設已經完成了座標轉換和裁剪
            # final_target = [c, new_x_center, new_y_center, new_w, new_h]
            # mosaic_targets.append(final_target)
            pass

    # 5. 最終處理：將 2S x 2S 的 Mosaic 圖像 Resize/裁剪回 S x S (例如 416x416)
    # 這一步通常是隨機裁剪，並將最終的標註標準化到 S x S 範圍

    # 這是 Mosaic 實作中最複雜的環節，確保邊界框轉換和裁剪正確是關鍵。

    # 最終返回 (經過 HWC -> CWH 轉換和 0~1 標準化的 Tensor):
    # final_img_tensor = ...
    # final_targets_tensor = ...
    # return final_img_tensor, final_targets_tensor

    return None, None # 暫時返回 None

# D. 錨框 K-means 聚類分析實作

## 1. 準備程式碼環境與 IoU 距離函數
- 定義一個特殊的距離函數，用於 K-means 聚類

In [None]:
import numpy as np
from sklearn.cluster import KMeans
import pandas as pd
import torch # 假設 df_gt 已經在內存中

# 確保我們有 df_gt:
# 假設這是從步驟 A 傳遞過來的 DataFrame
# df_gt = pd.read_csv(...)

def iou_metric(boxes, clusters):
    """
    計算每個邊界框 (boxes) 與每個錨框 (clusters) 之間的 IoU (Intersection over Union)。

    :param boxes: 真實邊界框的寬高 [N, 2] (width, height)
    :param clusters: 錨框的寬高 [K, 2] (width, height)
    :return: IoU 值 [N, K]
    """
    # 假設所有框的中心點都在 (0, 0)
    # IoU = 交集面積 / 聯集面積

    # 擴展維度以便廣播運算
    box_w = np.expand_dims(boxes[:, 0], axis=1) # [N, 1]
    box_h = np.expand_dims(boxes[:, 1], axis=1) # [N, 1]
    cluster_w = np.expand_dims(clusters[:, 0], axis=0) # [1, K]
    cluster_h = np.expand_dims(clusters[:, 1], axis=0) # [1, K]

    # 交集 (Intersection) 面積
    # 由於中心點對齊，交集面積 = min(w_box, w_cluster) * min(h_box, h_cluster)
    inter_w = np.minimum(box_w, cluster_w)
    inter_h = np.minimum(box_h, cluster_h)
    intersection = inter_w * inter_h # [N, K]

    # 聯集 (Union) 面積
    box_area = box_w * box_h # [N, 1]
    cluster_area = cluster_w * cluster_h # [1, K]
    union = box_area + cluster_area - intersection # [N, K]

    # 確保不會除以零
    iou = np.where(union != 0, intersection / union, 0)
    return iou

def iou_distance(boxes, clusters):
    """
    定義 K-means 聚類使用的距離度量: 1 - IoU。
    距離越小 (IoU 越大) 代表匹配越好。
    """
    iou = iou_metric(boxes, clusters)
    # 我們需要找到最大 IoU 的錨框，所以距離是 1 - max(IoU)
    max_iou = np.max(iou, axis=1)
    # K-means 最小化的是距離的平方和，所以我們回傳 1 - IoU
    return 1 - max_iou

## 2. 執行 K-means 聚類分析
- 通常 YOLOv3 或 YOLOv4 會使用 K=9 或 K=12 個錨框來處理不同尺度的目標。
- 本次從 K=9 開始。

In [None]:
# 從 DataFrame 中提取所有真實邊界框的寬度和高度
# 這裡使用像素值 (bb_width, bb_height)
# 注意：這些是原始的像素座標，不是標準化後的 (0~1) 值
data = df_gt[['bb_width', 'bb_height']].values

# 選擇錨框的數量 (K)
NUM_ANCHORS = 9

# 初始化 K-means 模型
# 我們使用標準的 sklearn.cluster.KMeans，但需要手動實作自定義的距離計算
# 因為 sklearn 不支持直接使用自定義距離進行聚類，我們模擬其行為
# 實際的優化器需要手動實現 IoU K-means，這裡我們使用一個簡化/近似方法
# 注意: 這裡的 K-means 實作並不完全等同於 YOLO 中使用的 IoU K-means
# 但可以作為一個優秀的起點。
kmeans = KMeans(n_clusters=NUM_ANCHORS,
                random_state=42,
                n_init=10,
                max_iter=500).fit(data)

# 獲取聚類中心，即錨框的寬高（像素值）
anchor_boxes_pixel = kmeans.cluster_centers_

# 對錨框進行排序 (例如按面積大小)
anchor_boxes_pixel = anchor_boxes_pixel[
    np.argsort(anchor_boxes_pixel[:, 0] * anchor_boxes_pixel[:, 1])
]

# 計算平均 IoU (Average IoU) 作為聚類質量的指標
avg_iou = np.mean(iou_distance(data, anchor_boxes_pixel))
mean_avg_iou = 1 - avg_iou


print(f"--- K-means ({NUM_ANCHORS} 個錨框) 聚類結果 ---")
print(f"平均 IoU (Mean Avg IoU) 為: {mean_avg_iou:.4f}")
print("\n最終錨框尺寸 (像素 W, H) - 由小到大排序:")
for i, (w, h) in enumerate(anchor_boxes_pixel):
    print(f"Anchor {i+1}: 寬={w:.2f}, 高={h:.2f}")

--- K-means (9 個錨框) 聚類結果 ---
平均 IoU (Mean Avg IoU) 為: 0.7522

最終錨框尺寸 (像素 W, H) - 由小到大排序:
Anchor 1: 寬=28.62, 高=28.98
Anchor 2: 寬=35.99, 高=52.87
Anchor 3: 寬=59.45, 高=40.71
Anchor 4: 寬=92.40, 高=41.99
Anchor 5: 寬=55.08, 高=84.09
Anchor 6: 寬=129.55, 高=55.87
Anchor 7: 寬=89.84, 高=81.11
Anchor 8: 寬=81.40, 高=127.60
Anchor 9: 寬=135.95, 高=107.01


# YOLO 複合損失函數製作

主要分類：
- 邊界框定位損失 (Coord Loss)
- 物體信心度損失 (Objectness Loss)
- 分類損失 (Classification Loss)

### 預先定義

In [None]:
def calculate_iou(box1, box2):
    """
    計算兩個邊界框之間的 IoU (Intersection over Union)。

    :param box1: [..., 4] (x_min, y_min, x_max, y_max)
    :param box2: [..., 4] (x_min, y_min, x_max, y_max)
    :return: [..., 1] 的 IoU 張量
    """
    # 確保所有維度匹配，進行廣播運算

    # 獲取交集區域的座標
    x1 = torch.max(box1[..., 0], box2[..., 0])
    y1 = torch.max(box1[..., 1], box2[..., 1])
    x2 = torch.min(box1[..., 2], box2[..., 2])
    y2 = torch.min(box1[..., 3], box2[..., 3])

    # 計算交集面積 (確保面積非負)
    intersection = (x2 - x1).clamp(0) * (y2 - y1).clamp(0)

    # 計算各自面積
    area1 = (box1[..., 2] - box1[..., 0]) * (box1[..., 3] - box1[..., 1])
    area2 = (box2[..., 2] - box2[..., 0]) * (box2[..., 3] - box2[..., 1])

    # 計算聯集面積
    union = area1 + area2 - intersection

    # 防止除以零
    iou = intersection / (union + 1e-6)
    return iou.unsqueeze(-1)

In [None]:
def diou_loss(pred_boxes, target_boxes):
    """
    計算 D-IoU (Distance-IoU) 損失。

    :param pred_boxes: 預測框 [..., 4] (x_min, y_min, x_max, y_max)
    :param target_boxes: 真實框 [..., 4] (x_min, y_min, x_max, y_max)
    """
    # 1. IoU
    iou = calculate_iou(pred_boxes, target_boxes)

    # 2. 獲取中心點
    pred_c = (pred_boxes[..., 0:2] + pred_boxes[..., 2:4]) / 2
    target_c = (target_boxes[..., 0:2] + target_boxes[..., 2:4]) / 2

    # 3. 中心點距離平方 (d^2)
    center_dist_sq = torch.sum(torch.pow(pred_c - target_c, 2), dim=-1) # [..., 1]

    # 4. 最小閉合框 (C)
    c_min = torch.min(pred_boxes[..., 0:2], target_boxes[..., 0:2])
    c_max = torch.max(pred_boxes[..., 2:4], target_boxes[..., 2:4])

    # C 的對角線距離平方 (c^2)
    c_dist_sq = torch.sum(torch.pow(c_max - c_min, 2), dim=-1) + 1e-6

    # 5. D-IoU 損失
    diou_term = center_dist_sq / c_dist_sq
    diou = iou - diou_term.unsqueeze(-1) # [..., 1]

    # Loss = 1 - D-IoU
    loss = 1.0 - diou
    return loss

### 1. 核心結構

In [None]:
import torch
import torch.nn as nn

# ⚠️ 確保 calculate_iou 和 diou_loss 已經定義在 Notebook 中
# (假設這兩個函數可以正確處理 [..., 4] 的輸入並輸出 [..., 1] 的損失)

class YoloLoss(nn.Module):
    def __init__(self, anchors, num_classes=1, img_size=416, iou_threshold=0.5):
        super().__init__()
        self.anchors = anchors # [K, 2] Normalized tensor
        self.num_classes = num_classes
        self.img_size = img_size
        self.iou_threshold = iou_threshold

        self.lambda_coord = 5.0
        self.lambda_noobj = 0.5
        self.lambda_obj = 1.0

        self.bce_loss = nn.BCEWithLogitsLoss(reduction='none')

    def forward(self, predictions, targets):
        batch_size, num_anchors_x_6, grid_h, grid_w = predictions.shape
        num_anchors = len(self.anchors)

        # 調整為 (Batch, Anchors, Grid_H, Grid_W, 6)
        predictions = predictions.view(batch_size, num_anchors, 6, grid_h, grid_w).permute(0, 1, 3, 4, 2).contiguous()

        # 提取與轉換預測值 (解碼)
        pred_xy = torch.sigmoid(predictions[..., 0:2])
        pred_wh = torch.exp(predictions[..., 2:4])

        # 創建網格座標 (Grid Coordinates)
        grid_x = torch.arange(grid_w, device=predictions.device).repeat(grid_h, 1).view([1, 1, grid_h, grid_w]).type_as(predictions)
        grid_y = torch.arange(grid_h, device=predictions.device).repeat(grid_w, 1).t().view([1, 1, grid_h, grid_w]).type_as(predictions)

        # 初始化用於損失計算的掩碼 (Mask)
        obj_mask = torch.zeros(batch_size, num_anchors, grid_h, grid_w, device=predictions.device, requires_grad=False).type_as(predictions)
        noobj_mask = torch.ones(batch_size, num_anchors, grid_h, grid_w, device=predictions.device, requires_grad=False).type_as(predictions)
        coord_mask = torch.zeros(batch_size, num_anchors, grid_h, grid_w, device=predictions.device, requires_grad=False).type_as(predictions)

        target_xy = torch.zeros(batch_size, num_anchors, grid_h, grid_w, 2, device=predictions.device, requires_grad=False).type_as(predictions)
        target_wh = torch.zeros(batch_size, num_anchors, grid_h, grid_w, 2, device=predictions.device, requires_grad=False).type_as(predictions)

        # --- 關鍵：將真實標註 (targets) 映射到預測網格 (Mapping) ---
        for b in range(batch_size):
            targets_b = targets[targets[:, 0] == b, 1:]
            if targets_b.numel() == 0:
                continue

            # 1. 轉換真實框到網格座標 [0, G]
            target_gs = targets_b.clone()
            target_gs[:, 0] *= grid_w
            target_gs[:, 1] *= grid_h
            target_gs[:, 2] *= grid_w
            target_gs[:, 3] *= grid_h

            target_cx = target_gs[:, 0]
            target_cy = target_gs[:, 1]

            target_gx = target_cx.long()
            target_gy = target_cy.long()

            # 2. 準備 IoU 廣播計算
            targets_b_wh = targets_b[:, 2:4] * self.img_size # 像素尺寸 [N_targets, 2]

            # 2.1. GT Boxes: [N, 1, 4] 廣播形狀
            gt_wh = targets_b_wh.unsqueeze(1)
            gt_boxes_for_iou = torch.cat([torch.zeros_like(gt_wh), gt_wh], dim=2)

            # 2.2. Anchor Boxes: [1, K, 4] 廣播形狀
            anchors_wh_pixels = self.anchors.type_as(targets_b_wh) * self.img_size
            anchors_wh_broadcast = anchors_wh_pixels.unsqueeze(0)
            anchors_boxes_for_iou = torch.cat([torch.zeros_like(anchors_wh_broadcast), anchors_wh_broadcast], dim=2)

            # 3. 計算 IoU: [N, K, 1] - 這裡不會報錯
            anchor_ious = calculate_iou(
                gt_boxes_for_iou,
                anchors_boxes_for_iou
            )

            best_anchor_ious, best_anchor_idx = anchor_ious.max(dim=1)

            # --- 設置 Mask 和 Target ---
            for t in range(targets_b.shape[0]):
                a = best_anchor_idx[t]
                gx, gy = target_gx[t], target_gy[t]

                if gx >= grid_w or gy >= grid_h:
                    continue

                obj_mask[b, a, gy, gx] = 1.0
                noobj_mask[b, a, gy, gx] = 0.0
                coord_mask[b, a, gy, gx] = 1.0

                # 將 Python tuple 轉換為 Tensor 進行賦值
                target_xy[b, a, gy, gx] = torch.tensor(
                    [target_cx[t] - gx, target_cy[t] - gy],
                    device=target_xy.device,
                    dtype=target_xy.dtype
                )

                # 將 Python tuple 轉換為 Tensor 進行賦值
                target_wh[b, a, gy, gx] = torch.tensor(
                    [target_gs[t, 2], target_gs[t, 3]],
                    device=target_wh.device,
                    dtype=target_wh.dtype
                )

        # --- 損失計算 ---

        # 1. 座標損失 (D-IoU Loss)

        # 獲取預測框的寬高 (轉換為網格座標)
        anchors_grid_w = self.anchors.type_as(pred_wh)[..., 0].view(1, num_anchors, 1, 1) * grid_w
        anchors_grid_h = self.anchors.type_as(pred_wh)[..., 1].view(1, num_anchors, 1, 1) * grid_h

        pred_bw = pred_wh[..., 0] * anchors_grid_w
        pred_bh = pred_wh[..., 1] * anchors_grid_h

        # 預測框的 (x_min, y_min, x_max, y_max) 網格座標
        pred_x_c = pred_xy[..., 0] + grid_x
        pred_y_c = pred_xy[..., 1] + grid_y

        pred_x1 = pred_x_c - pred_bw / 2
        pred_y1 = pred_y_c - pred_bh / 2
        pred_x2 = pred_x_c + pred_bw / 2
        pred_y2 = pred_y_c + pred_bh / 2
        pred_bbox = torch.stack((pred_x1, pred_y1, pred_x2, pred_y2), dim=-1) # [B, A, G, G, 4]

        # 真實框的 (x_min, y_min, x_max, y_max) 網格座標
        target_x_c = target_xy[..., 0] + grid_x
        target_y_c = target_xy[..., 1] + grid_y

        target_x1 = target_x_c - target_wh[..., 0] / 2
        target_y1 = target_y_c - target_wh[..., 1] / 2
        target_x2 = target_x_c + target_wh[..., 0] / 2
        target_y2 = target_y_c + target_wh[..., 1] / 2
        target_bbox = torch.stack((target_x1, target_y1, target_x2, target_y2), dim=-1) # [B, A, G, G, 4]

        # 計算 D-IoU 損失
        coord_loss_all = diou_loss(pred_bbox, target_bbox)
        coord_loss = self.lambda_coord * (coord_loss_all * coord_mask.unsqueeze(-1)).sum()

        # 2. 物體信心度損失 (Objectness Loss)
        obj_loss = self.lambda_obj * (self.bce_loss(predictions[..., 4], obj_mask) * obj_mask).sum()
        noobj_loss = self.lambda_noobj * (self.bce_loss(predictions[..., 4], torch.zeros_like(obj_mask)) * noobj_mask).sum()

        # 3. 分類損失 (Classification Loss)
        total_cls_loss = (self.bce_loss(predictions[..., 5], obj_mask) * obj_mask).sum()

        # 4. 總損失 (Total Loss)
        total_loss = coord_loss + obj_loss + noobj_loss + total_cls_loss

        return total_loss

### 關鍵函數：D/C-IoU 損失實作

In [None]:
def giou_loss(pred_boxes, target_boxes):
    """
    計算 D-IoU 或 C-IoU 損失。
    這裡使用簡化版的 IoU/D-IoU 實現。
    """
    # 假設 pred_boxes 和 target_boxes 都是 [..., 4] 的張量 (x_min, y_min, x_max, y_max)

    # 1. 計算 IoU (Intersection over Union)
    # ... (需要複雜的張量運算來計算交集和聯集)
    iou = calculate_iou(pred_boxes, target_boxes)

    # 2. 計算中心點距離平方 (d^2)
    # 獲取中心點 (x_center, y_center)
    # pred_c = (pred_boxes[..., 0:2] + pred_boxes[..., 2:4]) / 2
    # target_c = (target_boxes[..., 0:2] + target_boxes[..., 2:4]) / 2
    # center_dist_sq = torch.sum(torch.pow(pred_c - target_c, 2), dim=-1)

    # 3. 計算最小閉合框對角線距離平方 (c^2)
    # c_min = torch.min(pred_boxes[..., 0:2], target_boxes[..., 0:2])
    # c_max = torch.max(pred_boxes[..., 2:4], target_boxes[..., 2:4])
    # c_dist_sq = torch.sum(torch.pow(c_max - c_min, 2), dim=-1)

    # 4. D-IoU 損失
    # diou = iou - (center_dist_sq / c_dist_sq)
    # loss = 1 - diou

    return 1 - iou.mean() # 這裡使用 IoU 替代，但在實戰中必須使用 D/C-IoU

## 模型實例化

### 1. 完整的模型架構定義(YoloModel)
> 創建一個頂級的`YoloModel`類別，將骨幹網路的特徵圖，傳遞給檢測頭。

In [None]:
import torch
import torch.nn as nn

# --- 前置模組 (假設 ConvBlock 已經在您的 Notebook 中定義) ---
class ConvBlock(nn.Module):
    def __init__(self, in_c, out_c, kernel_size, stride, padding):
        super().__init__()
        self.conv = nn.Conv2d(in_c, out_c, kernel_size, stride, padding, bias=False)
        self.bn = nn.BatchNorm2d(out_c)
        self.leaky_relu = nn.LeakyReLU(0.1)

    def forward(self, x):
        return self.leaky_relu(self.bn(self.conv(x)))

# --- 骨幹網路 (假設 SimpleBackbone 已經在您的 Notebook 中定義) ---
class SimpleBackbone(nn.Module):
    def __init__(self, in_c=3):
        super().__init__()
        # 這裡的 features 需要確保輸出通道數為 128，以便傳遞給 YOLOHead
        self.features = nn.Sequential(
            ConvBlock(in_c, 32, 3, 1, 1), # 3 -> 32
            nn.MaxPool2d(2, 2),
            ConvBlock(32, 64, 3, 1, 1), # 32 -> 64
            nn.MaxPool2d(2, 2),
            ConvBlock(64, 128, 3, 1, 1), # 64 -> 128
            ConvBlock(128, 64, 1, 1, 0),
            ConvBlock(64, 128, 3, 1, 1),
            # 確保最終輸出通道為 128
        )
        self._initialize_weights()

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='leaky_relu')
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

    def forward(self, x):
        return self.features(x)

# --- 檢測頭 (假設 YOLOHead 已經在您的 Notebook 中定義) ---
class YOLOHead(nn.Module):
    def __init__(self, in_c, num_anchors=3):
        super().__init__()
        # 輸出通道: B * (4(coords) + 1(conf) + 1(class))
        self.conv = nn.Conv2d(in_c, num_anchors * 6, kernel_size=1)

    def forward(self, x):
        return self.conv(x)


# --- 完整的 YOLO 模型 ---
class YoloModel(nn.Module):
    def __init__(self, num_anchors=9):
        super().__init__()
        self.backbone = SimpleBackbone(in_c=3)
        # 假設 backbone 的輸出通道是 128
        self.head = YOLOHead(in_c=128, num_anchors=num_anchors)

    def forward(self, x):
        # 1. 特徵提取 (Feature Extraction)
        features = self.backbone(x)

        # 2. 檢測預測 (Detection Prediction)
        predictions = self.head(features)

        return predictions

### 2. 模型實例化與設備設定
> 將模型實例化，必且準備在 GPU 上運行。

In [None]:
# 1. 模型參數設定 (這些數值需要根據您的數據集和優化器設定來確定)
NUM_ANCHORS = 9 # 從步驟 D 獲得的最佳錨框數量
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
INPUT_SIZE = 416 # 訓練時圖像的輸入尺寸

# 2. 模型實例化
model = YoloModel(num_anchors=NUM_ANCHORS).to(DEVICE)

# 3. 定義損失函數
# 假設您從步驟 D 得到的 9 個錨框是 'pixel_anchors' (像素值)
# 並且我們將其標準化為相對於輸入尺寸 (416x416) 的值，並轉換為 PyTorch Tensor
# 您需要將步驟 D 的輸出替換到這裡
pixel_anchors = np.array([
    [ 10.00,  13.00], [ 16.00,  30.00], [ 33.00,  23.00], # 假設值
    [ 30.00,  61.00], [ 62.00,  45.00], [ 59.00, 119.00],
    [116.00,  90.00], [156.00, 198.00], [373.00, 326.00]
])

# 標準化錨框 (除以輸入尺寸)
normalized_anchors = torch.from_numpy(pixel_anchors / INPUT_SIZE).float().to(DEVICE)


# 損失函數實例化
# 由於 YOLOv3/v4/v5 採用多尺度檢測，這裡的損失函數可能需要處理多個輸出。
# 這裡我們使用單一輸出 YOLO 模型的損失函數，這是一個簡化版本。
# 注意: 您需要手動將 diou_loss, calculate_iou 等函數定義在您的 notebook 中。
criterion = YoloLoss(anchors=normalized_anchors,
                     num_classes=1,
                     img_size=INPUT_SIZE).to(DEVICE)

print(f"模型已實例化並移動到設備: {DEVICE}")
print(f"訓練錨框 (標準化): {normalized_anchors.cpu().numpy()}")

模型已實例化並移動到設備: cuda
訓練錨框 (標準化): [[0.02403846 0.03125   ]
 [0.03846154 0.07211538]
 [0.07932692 0.05528846]
 [0.07211538 0.14663461]
 [0.14903846 0.10817308]
 [0.14182693 0.28605768]
 [0.27884614 0.21634616]
 [0.375      0.47596154]
 [0.89663464 0.78365386]]


# 優化器設定
> 訓練配置的最終步驟：優化器和學習率調度策略。


In [None]:
import torch.optim as optim
from torch.optim.lr_scheduler import CosineAnnealingLR

# 訓練超參數
LEARNING_RATE = 1e-3
EPOCHS = 300
WARMUP_EPOCHS = 3
WEIGHT_DECAY = 5e-4 # 權重衰減，防止過擬合

# 1. 優化器
optimizer = optim.AdamW(model.parameters(),
                        lr=LEARNING_RATE,
                        weight_decay=WEIGHT_DECAY)

# 2. 學習率調度器 (在 Warm-up 結束後開始 Cosine Annealing)
scheduler = CosineAnnealingLR(optimizer,
                              T_max=EPOCHS - WARMUP_EPOCHS,
                              eta_min=1e-6)

print(f"優化器: {type(optimizer).__name__}, 初始學習率: {LEARNING_RATE}")
print(f"學習率調度策略: Cosine Annealing (T_max={EPOCHS - WARMUP_EPOCHS}, eta_min=1e-6)")

優化器: AdamW, 初始學習率: 0.001
學習率調度策略: Cosine Annealing (T_max=297, eta_min=1e-6)


# 最終訓練迴圈 (The Final Training Loop)

## 1. Data Loader Setup

### 將長 Batch size 轉換乘一個批次張量

In [None]:
def custom_collate_fn(batch):
    """
    將批次數據的標註進行填充 (Padding) 或堆疊 (Stack)。
    """
    images = []
    targets = []

    for img_tensor, target_tensor in batch:
        images.append(img_tensor)

        # Target 格式: [class_id, x_norm, y_norm, w_norm, h_norm]
        # 我們需要新增一個 batch_id 作為第一列
        target_with_id = torch.zeros((target_tensor.shape[0], 6), dtype=torch.float32)
        target_with_id[:, 0] = len(targets)  # 設置 batch_id
        target_with_id[:, 1:] = target_tensor
        targets.append(target_with_id)

    # 堆疊圖像張量
    images = torch.stack(images, 0)
    # 將所有 targets 連接成一個大的張量
    targets = torch.cat(targets, 0)

    return images, targets

In [None]:
from torch.utils.data import DataLoader, random_split

# 假設 df_gt, IMAGE_DIR, INPUT_SIZE 已經在您的環境中定義
# 並且 PigDetectionDataset 類別已經實作

# 創建完整的數據集實例
full_dataset = PigDetectionDataset(df_gt, IMAGE_DIR, image_size=(INPUT_SIZE, INPUT_SIZE))

# 劃分訓練集和驗證集 (例如 90% 訓練, 10% 驗證)
train_size = int(0.9 * len(full_dataset))
val_size = len(full_dataset) - train_size
train_dataset, val_dataset = random_split(full_dataset, [train_size, val_size])

BATCH_SIZE = 16 # 請根據您的 GPU 顯存調整 Batch Size

# 創建 DataLoader
train_loader = DataLoader(
    train_dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=2, # 設置工作進程數 (Colab 通常建議 2 或 4)
    pin_memory=True,
    collate_fn=custom_collate_fn
)

val_loader = DataLoader(
    val_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=2,
    pin_memory=True,
    collate_fn=custom_collate_fn
)

## 2. 訓練單一 Epoch 函數

In [None]:
import time
import tqdm # 用於顯示進度條

def train_one_epoch(model, dataloader, criterion, optimizer, device, scaler=None):
    """
    執行單一訓練週期的邏輯。
    """
    model.train() # 設置模型為訓練模式
    total_loss = 0.0

    # 設置進度條
    pbar = tqdm.tqdm(dataloader, desc=f"Epoch Loss: 0.000", leave=False)

    for i, (images, targets) in enumerate(pbar):
        # 將數據移動到正確的設備
        images = images.to(device)
        targets = targets.to(device)

        # 梯度清零
        optimizer.zero_grad()

        # --- 前向傳播 (Forward Pass) ---
        # 這裡推薦使用 PyTorch 的 AMP (自動混合精度) 來加速訓練並節省顯存
        # with torch.autocast(device_type=device.type if device.type != 'mps' else 'cpu', enabled=device.type != 'cpu'):

        predictions = model(images)
        loss = criterion(predictions, targets)

        # --- 反向傳播與優化 (Backward Pass & Optimization) ---
        loss.backward()
        optimizer.step()

        # 更新總損失
        total_loss += loss.item()
        avg_loss = total_loss / (i + 1)

        # 更新進度條顯示
        pbar.set_description(f"Epoch Loss: {avg_loss:.4f}")

    # 返回平均損失
    return avg_loss

### 實作 `validate_model` 函數

In [None]:
import torch.nn.functional as F

def validate_model(model, dataloader, criterion, device):
    """
    執行一個 Epoch 的模型驗證，計算平均損失。
    """
    model.eval()  # 將模型設置為評估模式
    total_loss = 0.0
    num_batches = 0

    # 驗證階段不需要計算梯度
    with torch.no_grad():
        for images, targets in dataloader:
            images = images.to(device)
            # targets 保持在 CPU 上，直到在損失函數中傳輸到 GPU/Device

            # 1. 前向傳播 (Forward Pass)
            predictions = model(images)

            # 2. 計算損失
            # 這裡我們將 targets 傳輸到與 predictions 相同的設備上
            targets_device = targets.to(device)
            loss = criterion(predictions, targets_device)

            total_loss += loss.item()
            num_batches += 1

    # 計算平均驗證損失
    avg_loss = total_loss / num_batches
    model.train() # 驗證完成後，將模型切換回訓練模式
    return avg_loss

## 3. 主訓練函數

In [None]:
def train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, device, epochs):

    best_loss = float('inf')

    print(f"\n--- 開始訓練，總 Epoch 數: {epochs} ---")

    for epoch in range(1, epochs + 1):
        print(f"\nEpoch {epoch}/{epochs}")

        # ---------------------
        # 訓練階段
        # ---------------------
        train_loss = train_one_epoch(model, train_loader, criterion, optimizer, device)

        # ---------------------
        # 學習率調度 (Warm-up 和 Cosine Annealing)
        # ---------------------

        # 1. Warm-up 邏輯 (前 WARMUP_EPOCHS)
        if epoch <= WARMUP_EPOCHS:
            # 計算 Warm-up 學習率
            lr = LEARNING_RATE * (0.1 + 0.9 * (epoch / WARMUP_EPOCHS))
            for param_group in optimizer.param_groups:
                param_group['lr'] = lr

        # 2. Cosine Annealing 邏輯 (Warm-up 結束後)
        elif scheduler is not None:
            scheduler.step() # 執行調度器

        # 記錄當前學習率
        current_lr = optimizer.param_groups[0]['lr']
        print(f"訓練損失 (Train Loss): {train_loss:.4f} | LR: {current_lr:.6e}")


        # ---------------------
        # 驗證階段 (這裡需要計算 mAP，但為簡化先計算 Val Loss)
        # ---------------------
        val_loss = validate_model(model, val_loader, criterion, device)
        print(f"驗證損失 (Validation Loss): {val_loss:.4f}")


        # ---------------------
        # 模型儲存
        # ---------------------
        if val_loss < best_loss: # 這裡應該用 mAP 替代 val_loss
             best_loss = val_loss
             torch.save(model.state_dict(), 'best_yolo_model.pth')
             print(" -> 模型已儲存 (基於 Val Loss)")

    print("\n--- 訓練完成 ---")

# 開始訓練

In [None]:
# --- 確保所有超參數已定義 ---

# 訓練超參數 (請根據您的實驗需求調整)
EPOCHS = 300       # 總訓練週期數
BATCH_SIZE = 16    # 批次大小 (請務必根據您的 GPU 顯存調整)
LEARNING_RATE = 1e-3
WARMUP_EPOCHS = 3

# 設備
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# --- 確保您的所有實例已經準備好 ---
# 假設 model, train_loader, val_loader, criterion, optimizer, scheduler, DEVICE 已經被實例化並配置完成

print(f"=========================================")
print(f"     🚀 開始訓練 (Training Start) 🚀    ")
print(f"=========================================")
print(f"  目標設備: {DEVICE}")
print(f"  總 Epochs: {EPOCHS}, Warmup Epochs: {WARMUP_EPOCHS}")
print(f"  Batch Size: {BATCH_SIZE}")
print(f"  訓練集大小: {len(train_loader.dataset)} 圖像")
# print(f"  驗證集大小: {len(val_loader.dataset)} 圖像") # 如果您實作了驗證集
print(f"=========================================")

# 呼叫主訓練函數
train_model(
    model=model,
    train_loader=train_loader,
    val_loader=val_loader, # 傳入驗證載入器 (如果已實作)
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    device=DEVICE,
    epochs=EPOCHS
)

# 訓練結束後，您可以加載最佳模型權重
# best_model_path = 'best_yolo_model.pth'
# if os.path.exists(best_model_path):
#     model.load_state_dict(torch.load(best_model_path))
#     print(f"已載入最佳模型權重: {best_model_path}")

     🚀 開始訓練 (Training Start) 🚀    
  目標設備: cuda
  總 Epochs: 300, Warmup Epochs: 3
  Batch Size: 16
  訓練集大小: 1143 圖像

--- 開始訓練，總 Epoch 數: 300 ---

Epoch 1/300




KeyboardInterrupt: 

# 準備預測與 NMS

## 載入 `.pth` 檔案
> 回覆訓練進度

In [None]:
import torch
import os

def load_checkpoint_for_prediction(model, checkpoint_path, device):
    """
    載入模型權重並設定為評估模式。修正了檢查點格式不一致的問題。
    """
    if not os.path.exists(checkpoint_path):
        print(f"錯誤：找不到檢查點檔案於 {checkpoint_path}")
        return False

    try:
        # 載入整個檢查點內容
        checkpoint = torch.load(checkpoint_path, map_location=device)

        # 嘗試從標準的檢查點字典中讀取 'model_state_dict'
        if 'model_state_dict' in checkpoint:
            model.load_state_dict(checkpoint['model_state_dict'])
            loaded_epoch = checkpoint.get('epoch', '未知')
            print(f"✅ 成功從標準檢查點 (Epoch {loaded_epoch}) 載入模型權重。")
        else:
            # 假設整個檔案內容就是 model.state_dict()
            model.load_state_dict(checkpoint)
            print("✅ 成功從原始 State Dict 格式載入模型權重。")

        # 將模型切換到評估模式
        model.eval()
        return True

    except Exception as e:
        print(f"❌ 載入權重失敗。錯誤訊息: {e}")
        return False

# 📌 記得再次呼叫：
# load_checkpoint_for_prediction(model, BEST_CHECKPOINT_PATH, DEVICE)
# model.eval()

## 預測與 NMS 實作
> 要從模型輸出得到的邊界匡列表，需要兩個參數
- decode_predictions
- non_max_suppression



### 解碼函數

In [None]:
import torch
import torch.nn.functional as F

def decode_predictions(predictions, anchors, img_size, conf_threshold=0.25):
    """
    解碼 YOLO 模型輸出的原始張量，生成邊界框、信心度和類別分數。

    :param predictions: 模型原始輸出 [B, A*6, G, G]
    :param anchors: 該輸出層對應的錨框 (Normalized)
    :param img_size: 輸入圖像尺寸 (例如 416)
    :param conf_threshold: 物體信心度閾值
    :return: 包含所有合格預測框的張量 [N_boxes, 6 (x1, y1, x2, y2, conf, cls)]
    """

    # 設置參數
    batch_size, _, grid_h, grid_w = predictions.shape
    num_anchors = len(anchors)
    num_classes = 1 # 只有 'pig' 一類

    # 調整為 (Batch, Anchors, Grid_H, Grid_W, 6)
    predictions = predictions.view(batch_size, num_anchors, num_classes + 5, grid_h, grid_w).permute(0, 1, 3, 4, 2).contiguous()

    # 獲取原始 logits
    pred_xy_logits = predictions[..., 0:2]
    pred_wh_logits = predictions[..., 2:4]
    pred_conf_logits = predictions[..., 4:5]
    pred_cls_logits = predictions[..., 5:6]

    # --- 1. 計算網格中心點座標 ---
    grid_x = torch.arange(grid_w, device=predictions.device).repeat(grid_h, 1).view([1, 1, grid_h, grid_w]).type_as(predictions)
    grid_y = torch.arange(grid_h, device=predictions.device).repeat(grid_w, 1).t().view([1, 1, grid_h, grid_w]).type_as(predictions)

    # --- 2. 解碼座標與尺寸 ---

    # a. x, y (中心點座標)
    pred_x = (torch.sigmoid(pred_xy_logits[..., 0]) + grid_x) / grid_w
    pred_y = (torch.sigmoid(pred_xy_logits[..., 1]) + grid_y) / grid_h

    # b. w, h (尺寸)
    anchors_wh = anchors.to(predictions.device).view(1, num_anchors, 1, 1, 2).type_as(predictions)
    pred_w = torch.exp(pred_wh_logits[..., 0]) * anchors_wh[..., 0]
    pred_h = torch.exp(pred_wh_logits[..., 1]) * anchors_wh[..., 1]

    # --- 3. 轉換為 (x1, y1, x2, y2) 絕對座標 (Normalized 0-1) ---

    # x1 = x_center - w/2
    pred_x1 = pred_x - pred_w / 2
    pred_y1 = pred_y - pred_h / 2
    pred_x2 = pred_x + pred_w / 2
    pred_y2 = pred_y + pred_h / 2

    # --- 4. 計算最終分數 ---

    # c. 信心度與分類
    pred_conf = torch.sigmoid(pred_conf_logits.squeeze(-1)) # [B, A, G, G]
    pred_cls = torch.sigmoid(pred_cls_logits.squeeze(-1))   # [B, A, G, G]

    # 最終分數 = Conf * Class_Score (只有一類，這裡簡化)
    # 由於只有一類，我們通常可以只看 pred_conf
    final_score = pred_conf * pred_cls

    # --- 5. 篩選與整合 ---

    # 篩選掉低信心度的預測
    mask = final_score > conf_threshold

    # 獲取所有合格的邊界框數據

    # 將所有 (B, A, G, G) 維度的數據展平
    flat_x1 = pred_x1[mask]
    flat_y1 = pred_y1[mask]
    flat_x2 = pred_x2[mask]
    flat_y2 = pred_y2[mask]
    flat_scores = final_score[mask]

    # 由於只有一類，類別 ID 永遠是 0
    flat_cls = torch.zeros_like(flat_scores)

    # 將所有數據合併為 [N_boxes, 6 (x1, y1, x2, y2, conf, cls)]
    output = torch.stack([flat_x1, flat_y1, flat_x2, flat_y2, flat_scores, flat_cls], dim=1)

    # 將 Normalized 座標 (0-1) 轉換為實際像素座標
    # 這裡只轉換 x1, y1, x2, y2
    if output.numel() > 0:
        output[:, :4] *= img_size

    return output

### 非最大值抑制 (NMS)
> 移除重疊的邊界框，並利用 Pytorch 提供的函式

In [None]:
# 需要安裝 torchvision
# !pip install torchvision

from torchvision.ops import nms

def apply_nms(decoded_boxes, iou_threshold=0.45):
    """
    對解碼後的邊界框應用非最大值抑制。

    :param decoded_boxes: 從 decode_predictions 得到的張量 [N_boxes, 6 (x1, y1, x2, y2, conf, cls)]
    :param iou_threshold: NMS 的 IoU 閾值
    :return: NMS 後的最終邊界框張量
    """
    if decoded_boxes.numel() == 0:
        return decoded_boxes

    # NMS 需要 (x1, y1, x2, y2) 格式的邊界框和信心度分數
    boxes = decoded_boxes[:, :4] # [N, 4]
    scores = decoded_boxes[:, 4] # [N]

    # 執行 NMS
    keep_indices = nms(boxes, scores, iou_threshold)

    # 根據保留的索引返回最終的邊界框
    final_boxes = decoded_boxes[keep_indices]
    return final_boxes

# 產生提交檔 (.csv)

## 呼叫載入函數

In [None]:
import torch
import os

DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

# 📌 替換為您保存的、驗證損失最低的 .pth 檔案路徑
BEST_CHECKPOINT_PATH = '/content/drive/MyDrive/Colab Notebooks/電腦視覺與深度學習/taica-cvpdl-2025-hw-1/train/best_yolo_model (1).pth'

# 呼叫 load_checkpoint_for_prediction 函數
load_checkpoint_for_prediction(model, BEST_CHECKPOINT_PATH, DEVICE)

# 確保模型已設置為評估模式
model.eval()

✅ 成功從原始 State Dict 格式載入模型權重。


YoloModel(
  (backbone): SimpleBackbone(
    (features): Sequential(
      (0): ConvBlock(
        (conv): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (leaky_relu): LeakyReLU(negative_slope=0.1)
      )
      (1): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
      (2): ConvBlock(
        (conv): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (leaky_relu): LeakyReLU(negative_slope=0.1)
      )
      (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
      (4): ConvBlock(
        (conv): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        

## 實現預測流程

### 建立專門用於測試集的 `DataLoader`
> 只需要負責加載圖片，不需要標註。

In [None]:
import os
from torch.utils.data import Dataset
from PIL import Image

class TestDataset(Dataset):
    def __init__(self, img_dir, transform=None):
        self.img_dir = img_dir
        self.transform = transform
        # 讀取所有圖片檔案名稱，並確保它們是 .jpg 格式
        self.img_files = [f for f in os.listdir(img_dir) if f.endswith('.jpg')]

    def __len__(self):
        return len(self.img_files)

    def __getitem__(self, idx):
        # 1. 取得圖片 ID
        image_id = self.img_files[idx] # 檔案名稱，例如: '000000.jpg'

        # 2. 載入圖片
        img_path = os.path.join(self.img_dir, image_id)
        image = Image.open(img_path).convert("RGB")

        # 3. 應用 Transform
        if self.transform:
            image = self.transform(image)

        # 測試集只需返回圖片 Tensor 和 圖片 ID (檔案名稱)
        return image, image_id

In [None]:
from torchvision import transforms

# 假設您的模型輸入尺寸是 416x416
IMG_SIZE = 416

test_transforms = transforms.Compose([
    # 確保圖片調整到模型需要的輸入尺寸
    transforms.Resize((IMG_SIZE, IMG_SIZE)),

    # 轉換為 Tensor
    transforms.ToTensor(),

    # 正規化 (與訓練集的正規化參數保持一致)
    # 這裡使用 ImageNet 的標準值作為範例，請根據您訓練時使用的值進行調整
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

In [None]:
from torch.utils.data import DataLoader

# ⚠️ 替換為您的測試集圖片實際路徑
TEST_IMG_DIR = '/content/drive/MyDrive/Colab Notebooks/電腦視覺與深度學習/taica-cvpdl-2025-hw-1/test/img/'

test_dataset = TestDataset(
    img_dir=TEST_IMG_DIR,
    transform=test_transforms
)

test_loader = DataLoader(
    test_dataset,
    batch_size=16,          # 預測時可以使用較大的 Batch Size
    shuffle=False,          # 預測時必須是 False
    num_workers=4           # 根據您的 Colab 配置調整
)

print(f"測試集圖片總數: {len(test_dataset)}")
print(f"測試集 DataLoader 創建成功。")

測試集圖片總數: 1864
測試集 DataLoader 創建成功。




### 創建主預測迴圈
> 主迴圈會跑過一遍測試集，執行解碼和 NMS，並將結果收集。

In [None]:
from tqdm import tqdm
import pandas as pd


def run_inference(model, test_loader, anchors, img_size, device):
    all_predictions = []

    with torch.no_grad():
        for images, image_ids in tqdm(test_loader, desc="進行測試集預測"):
            images = images.to(device)

            # 1. 模型前向傳播
            predictions = model(images)

            # 2. 解碼與 NMS (每個 Batch 處理)
            for i in range(images.shape[0]):
                single_image_output = predictions[i].unsqueeze(0) # 保持 Batch 維度為 1

                # 解碼：將模型輸出轉換為邊界框
                decoded_boxes = decode_predictions(
                    single_image_output,
                    anchors,
                    img_size=img_size,
                    conf_threshold=0.01 # 預測時閾值可以設低一點
                )

                # NMS：移除重疊框
                final_boxes = apply_nms(decoded_boxes, iou_threshold=0.45)

                # 3. 收集結果
                if final_boxes.numel() > 0:
                    # final_boxes: [N, 6] (x1, y1, x2, y2, conf, cls)

                    # 轉換為 [x_min, y_min, w, h] 格式
                    x1 = final_boxes[:, 0].cpu().numpy()
                    y1 = final_boxes[:, 1].cpu().numpy()
                    w = (final_boxes[:, 2] - final_boxes[:, 0]).cpu().numpy()
                    h = (final_boxes[:, 3] - final_boxes[:, 1]).cpu().numpy()
                    scores = final_boxes[:, 4].cpu().numpy()

                    # 格式化為 RLE
                    rle_string = format_to_rle(x1, y1, w, h, scores)

                    all_predictions.append({
                        'Image_ID': image_ids[i],
                        'PredictionString': rle_string
                    })
                else:
                    all_predictions.append({
                        'Image_ID': image_ids[i],
                        'PredictionString': '' # 沒有檢測到物體
                    })

    return pd.DataFrame(all_predictions)

### 實作 RLE 的格式化函數
> 以符合作業要求上傳格式

In [None]:
def format_to_rle(x_min, y_min, w, h, scores):
    """
    將邊界框數據轉換為 Kaggle 提交所需的最終格式字符串。
    格式: [score x_min y_min w h class_id]
    """
    if len(scores) == 0:
        return ""

    # 📌 根據範例，類別 ID 必須在最後，且為 0
    # 這是單類別檢測常見的 ID
    class_id = 0

    rle_parts = []

    # 將數據按照分數降序排列 (推薦做法)
    sorted_indices = scores.argsort()[::-1]

    for idx in sorted_indices:
        # ⚠️ 最終順序修正為：score x_min y_min w h class_id
        # 使用 .1f 確保座標精度符合範例風格
        part = f"{scores[idx]:.4f} {x_min[idx]:.1f} {y_min[idx]:.1f} {w[idx]:.1f} {h[idx]:.1f} {class_id}"
        rle_parts.append(part)

    # 用空格連接所有片段
    return " ".join(rle_parts)

### 生成並下載結果檔案

In [None]:
# 確保您已經成功載入權重並將模型設置為 eval 模式 (model.eval())

# 呼叫預測函數，並將結果存入 final_df
final_df = run_inference(
    model=model,
    test_loader=test_loader,
    anchors=normalized_anchors,
    img_size=IMG_SIZE,
    device=DEVICE
)

print("✅ 預測結果 DataFrame (final_df) 已生成。")
print(f"總共處理了 {len(final_df)} 張圖片。")

進行測試集預測:   0%|          | 0/117 [00:00<?, ?it/s]Exception ignored in: <function _MultiProcessingDataLoaderIter.__del__ at 0x7e7e7e9ad4e0>
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/torch/utils/data/dataloader.py", line 1664, in __del__
    self._shutdown_workers()
  File "/usr/local/lib/python3.12/dist-packages/torch/utils/data/dataloader.py", line 1647, in _shutdown_workers
    if w.is_alive():
       ^^^^Exception ignored in: ^<function _MultiProcessingDataLoaderIter.__del__ at 0x7e7e7e9ad4e0>^^
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/torch/utils/data/dataloader.py", line 1664, in __del__
    ^self._shutdown_workers()
  File "/usr/local/lib/python3.12/dist-packages/torch/utils/data/dataloader.py", line 1647, in _shutdown_workers
^    if w.is_alive():
 ^  ^ ^  
  File "/usr/lib/python3.12/multiprocessing/process.py", line 160, in is_alive
     ^assert self._parent_pid == os.getpid(), 'can only test a c

✅ 預測結果 DataFrame (final_df) 已生成。
總共處理了 1864 張圖片。





In [None]:
# 假設 final_df 已經從 run_inference 函數中獲得
final_df.to_csv('submission.csv', index=False)

print("提交檔案 submission.csv 已生成！")
# 您可以在 Colab 左側的文件瀏覽器中下載此檔案並上傳到 Kaggle。

提交檔案 submission.csv 已生成！
