In [None]:
# --- Step 1: Mount Google Drive and Set Dataset Path ---
from google.colab import drive
import os

drive.mount('/content/drive')
extract_path = "/content/drive/MyDrive/Colab Notebooks/CarDD/CarDD_COCO"

# --- Step 2: Install Required Libraries ---
!pip install -q timm pycocotools
!pip install albumentations --quiet

# --- Step 3: Import Libraries ---
import torch
import torch.nn as nn
from torchvision.ops import generalized_box_iou_loss
from torchvision.ops.boxes import box_convert
import timm
import json
import numpy as np
import random
import cv2
from pycocotools.coco import COCO
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

torch._C._jit_set_nvfuser_enabled(False)



In [None]:
# --- Step 4: Define COCO-Style Dataset Loader ---
class CarDDDataset(Dataset):
    def __init__(self, annotation_path, image_dir, transforms=None):
        self.coco = COCO(annotation_path)
        self.image_dir = image_dir
        self.transforms = transforms
        self.image_ids = self.coco.getImgIds()

    def __len__(self):
        return len(self.image_ids)

    def __getitem__(self, idx):
        img_id = self.image_ids[idx]
        ann_ids = self.coco.getAnnIds(imgIds=img_id)
        anns = self.coco.loadAnns(ann_ids)
        img_info = self.coco.loadImgs(img_id)[0]
        img_path = os.path.join(self.image_dir, img_info['file_name'])
        img = cv2.imread(img_path)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

        boxes = []
        labels = []
        for ann in anns:
            if ann['iscrowd'] == 1 or ann['bbox'][2] <= 1 or ann['bbox'][3] <= 1:
                continue  # skip crowd or near-zero boxes
            if ann['category_id'] == 6:
                continue  # skip rare or invalid class
            bbox = ann['bbox']
            boxes.append([bbox[0], bbox[1], bbox[0]+bbox[2], bbox[1]+bbox[3]])
            labels.append(ann['category_id'])

        boxes = torch.tensor(boxes, dtype=torch.float32)
        labels = torch.tensor(labels, dtype=torch.int64)

        if self.transforms:
            img = self.transforms(img)

        return img, boxes, labels, img_id  # include ID if needed for logging or eval


In [None]:
# --- Step 5: Define Positional Encoding ---
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)  # shape: (1, max_len, d_model) to match batch_first=True
        self.register_buffer('pe', pe)

    def forward(self, x):
        return x + self.pe[:, :x.size(1), :]


In [None]:

# --- Step 6: Define Model Skeleton with CBAM ---
import torch
import torch.nn as nn
import timm

from positional_encoding import PositionalEncoding

class ChannelAttention(nn.Module):
    def __init__(self, in_planes, reduction=16):
        super(ChannelAttention, self).__init__()
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.max_pool = nn.AdaptiveMaxPool2d(1)

        self.fc = nn.Sequential(
            nn.Conv2d(in_planes, in_planes // reduction, 1, bias=False),
            nn.ReLU(),
            nn.Conv2d(in_planes // reduction, in_planes, 1, bias=False)
        )
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        avg_out = self.fc(self.avg_pool(x))
        max_out = self.fc(self.max_pool(x))
        out = avg_out + max_out
        return self.sigmoid(out)


class SpatialAttention(nn.Module):
    def __init__(self, kernel_size=7):
        super(SpatialAttention, self).__init__()
        assert kernel_size in (3, 7), "kernel size must be 3 or 7"
        padding = 3 if kernel_size == 7 else 1

        self.conv = nn.Conv2d(2, 1, kernel_size, padding=padding, bias=False)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        avg_out = torch.mean(x, dim=1, keepdim=True)
        max_out, _ = torch.max(x, dim=1, keepdim=True)
        x = torch.cat([avg_out, max_out], dim=1)
        x = self.conv(x)
        return self.sigmoid(x)


class CBAM(nn.Module):
    def __init__(self, channels, reduction=16, kernel_size=7):
        super(CBAM, self).__init__()
        self.channel_attention = ChannelAttention(channels, reduction)
        self.spatial_attention = SpatialAttention(kernel_size)

    def forward(self, x):
        out = x * self.channel_attention(x)
        out = out * self.spatial_attention(out)
        return out


class CoDETR(nn.Module):
    def __init__(self, num_classes, hidden_dim=256, num_queries=100):
        super().__init__()
        assert num_classes > 0, "num_classes must be greater than zero"
        assert num_queries > 0, "num_queries must be greater than zero"

        self.num_classes = num_classes
        self.register_buffer("valid_class_range", torch.arange(num_classes))

        self.backbone = timm.create_model('swin_base_patch4_window12_384', pretrained=True, features_only=True)
        self.cbam = CBAM(self.backbone.feature_info[-1]['num_chs'])  # Inject CBAM
        self.input_proj = nn.Conv2d(self.backbone.feature_info[-1]['num_chs'], hidden_dim, kernel_size=1)

        encoder_layer = nn.TransformerEncoderLayer(d_model=hidden_dim, nhead=8, batch_first=True)
        self.encoder = nn.TransformerEncoder(encoder_layer, num_layers=6)

        decoder_layer = nn.TransformerDecoderLayer(d_model=hidden_dim, nhead=8, batch_first=True)
        self.decoder = nn.TransformerDecoder(decoder_layer, num_layers=6)

        self.query_embed = nn.Embedding(num_queries, hidden_dim)
        self.positional_encoding = PositionalEncoding(hidden_dim)

        self.bbox_embed = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 4)
        )
        self.class_embed = nn.Linear(hidden_dim, num_classes)

    def forward(self, x):
        features = self.backbone(x)[-1]                      # [B, H, W, C]
        features = features.permute(0, 3, 1, 2).contiguous() # [B, C, H, W]
        features = self.cbam(features)                       # Apply CBAM here
        x = self.input_proj(features)                        # [B, hidden_dim, H, W]

        bs, c, h, w = x.shape
        x = x.flatten(2).transpose(1, 2)                     # (B, HW, C)
        x = self.positional_encoding(x)
        memory = self.encoder(x)                             # (B, HW, C)

        queries = self.query_embed.weight.unsqueeze(0).repeat(bs, 1, 1)  # (B, num_queries, C)

        if not hasattr(self, "_logged_once"):
            print(f"[DEBUG] features: {features.shape}, x: {x.shape}, memory: {memory.shape}, queries: {queries.shape}")
            self._logged_once = True

        hs = self.decoder(queries, memory)  # (B, num_queries, C)
        outputs_class = self.class_embed(hs)                 # (B, num_queries, num_classes)
        outputs_coord = self.bbox_embed(hs).sigmoid()        # normalized boxes (B, num_queries, 4)

        return {'pred_logits': outputs_class[:, -1], 'pred_boxes': outputs_coord[:, -1]}


        # Pseudocode: Replace GIoU with SCYLLA-IoU by implementing a custom loss function per the paper.
        # Suggested usage: replace criterion_bbox with scylla_iou_loss
        # Example:
        # def scylla_iou_loss(pred_boxes, target_boxes):
        #     # Implement SCYLLA-IoU formula using angle, distance, aspect ratio components
        #     return loss_value
        # criterion_bbox = scylla_iou_loss
        # See: scylla_iou_loss skeleton code file

In [None]:
# --- Step 7: Training Loop and Evaluation ---
from scipy.optimize import linear_sum_assignment
from torch.nn.functional import cross_entropy, one_hot
from torchvision.ops import generalized_box_iou, box_convert
from tqdm import tqdm

def hungarian_match(pred_logits, pred_boxes, tgt_labels, tgt_boxes):
    with torch.no_grad():
        if tgt_labels.numel() == 0 or pred_logits.numel() == 0 or pred_boxes.ndim != 2 or tgt_boxes.ndim != 2:
            return [], []

        num_queries = pred_logits.shape[0]
        num_classes = pred_logits.shape[-1]
        num_targets = tgt_labels.shape[0]

        out_prob = pred_logits.softmax(-1)
        out_bbox = pred_boxes

        tgt_labels_onehot = one_hot(tgt_labels, num_classes=num_classes).float()
        if tgt_labels_onehot.ndim == 1:
            tgt_labels_onehot = tgt_labels_onehot.unsqueeze(0)

        class_cost = -torch.matmul(out_prob, tgt_labels_onehot.T)
        bbox_cost = torch.cdist(out_bbox, tgt_boxes, p=1)

        out_bbox_xyxy = box_convert(out_bbox, 'cxcywh', 'xyxy')
        tgt_bbox_xyxy = box_convert(tgt_boxes, 'cxcywh', 'xyxy')
        giou = generalized_box_iou(out_bbox_xyxy, tgt_bbox_xyxy)
        giou_cost = 1 - giou

        total_cost = 1.0 * class_cost + 5.0 * bbox_cost + 2.0 * giou_cost
        cost_matrix = total_cost.cpu().detach().numpy()
        row_ind, col_ind = linear_sum_assignment(cost_matrix)
        return row_ind, col_ind

def train_one_epoch(model, dataloader, optimizer, criterion_cls, criterion_bbox, device):
    model.train()
    total_loss = 0

    for batch_idx, (images, boxes, labels, img_ids) in enumerate(tqdm(dataloader, desc="Training", leave=True, ncols=100)):
        print(f"[BATCH {batch_idx}] loading...")
        print(f"  └─ box counts: {[len(b) for b in boxes]}, label counts: {[len(l) for l in labels]}")
        print(f"[DEBUG] Labels this batch: {[l.tolist() for l in labels]}")

        if all(len(b) == 0 for b in boxes) or all(len(l) == 0 for l in labels):
            print(f"[SKIP] Batch {batch_idx} is empty — skipping.")
            continue

        images = torch.stack(images).to(device)
        boxes = [b.to(device) for b in boxes]
        labels = [l.to(device) for l in labels]

        optimizer.zero_grad()
        outputs = model(images)

        def compute_losses(logits, coords, boxes, labels):
            all_cls_losses, all_bbox_losses = [], []
            for i in range(len(images)):
                if len(labels[i]) == 0 or coords[i].numel() == 0 or logits[i].numel() == 0:
                    continue

                pred_logits = logits[i]
                pred_boxes = coords[i]
                tgt_labels = labels[i]
                tgt_boxes = box_convert(boxes[i], 'xyxy', 'cxcywh')

                row_ind, col_ind = hungarian_match(pred_logits, pred_boxes, tgt_labels, tgt_boxes)
                if len(row_ind) == 0 or len(col_ind) == 0:
                    continue

                matched_preds_logits = pred_logits[row_ind]
                matched_preds_boxes = pred_boxes[row_ind]
                matched_tgt_labels = tgt_labels[col_ind]
                matched_tgt_boxes = tgt_boxes[col_ind]

                cls_loss = criterion_cls(matched_preds_logits, matched_tgt_labels)
                bbox_loss = criterion_bbox(matched_preds_boxes, matched_tgt_boxes)

                all_cls_losses.append(cls_loss)
                all_bbox_losses.append(bbox_loss)

            return all_cls_losses, all_bbox_losses

        main_cls_losses, main_bbox_losses = compute_losses(
            outputs['pred_logits'], outputs['pred_boxes'], boxes, labels
        )

        aux_cls_losses, aux_bbox_losses = [], []
        if 'aux_outputs' in outputs:
            for aux in outputs['aux_outputs']:
                cls, bbox = compute_losses(aux['pred_logits'], aux['pred_boxes'], boxes, labels)
                aux_cls_losses.extend(cls)
                aux_bbox_losses.extend(bbox)

        all_cls_losses = main_cls_losses + aux_cls_losses
        all_bbox_losses = main_bbox_losses + aux_bbox_losses

        if all_cls_losses:
            loss_cls = torch.stack(all_cls_losses).mean()
            loss_bbox = torch.stack(all_bbox_losses).mean()
            loss = loss_cls + 5.0 * loss_bbox
            print(f"Backpropagating loss: {loss.item():.4f}")
            loss.backward()

            # ✅ GRADIENT SANITY CHECK
            for name, param in model.named_parameters():
                if param.grad is not None and torch.isnan(param.grad).any():
                    print(f"[WARNING] NaN gradients in {name}")

            optimizer.step()
            total_loss += loss.item()

    return total_loss / len(dataloader)

def evaluate(model, dataloader, device):
    model.eval()
    all_preds, all_targets = [], []
    with torch.no_grad():
        for images, boxes, labels, img_ids in tqdm(dataloader, desc="Evaluating", leave=True, ncols=100):
            images = torch.stack(images).to(device)
            labels = [l for l in labels if len(l) > 0]
            if not labels:
                continue
            labels = torch.cat(labels).to(device)
            outputs = model(images)
            pred_classes = outputs['pred_logits'].argmax(-1).cpu()
            all_preds.extend(pred_classes.view(-1).tolist())
            all_targets.extend(labels.view(-1).tolist())

    correct = sum([int(p == t) for p, t in zip(all_preds, all_targets)])
    acc = correct / len(all_targets) if all_targets else 0
    print(f"Validation Accuracy: {acc:.4f}")
    return acc


In [None]:
import os
os.environ["CUDA_LAUNCH_BLOCKING"] = "1"


In [None]:

# --- Step 8: Main Training Script with Albumentations ---
if __name__ == '__main__':
    import torchvision.transforms as T
    from sklearn.metrics import average_precision_score, roc_auc_score, precision_score, recall_score
    from torch.nn.functional import one_hot
    from collections import Counter
    import torch
    import numpy as np
    import time
    from tqdm import tqdm
    import albumentations as A
    from albumentations.pytorch import ToTensorV2

    # Albumentations transform blocks
    def get_train_transforms():
        return A.Compose([
            A.OneOf([
                A.Mosaic(p=1.0),
                A.Cutout(num_holes=8, max_h_size=32, max_w_size=32, p=1.0),
            ], p=0.7),
            A.RandomBrightnessContrast(p=0.5),
            A.HueSaturationValue(p=0.5),
            A.RGBShift(p=0.3),
            A.HorizontalFlip(p=0.5),
            A.RandomResizedCrop(height=384, width=384, scale=(0.8, 1.0), ratio=(0.75, 1.33), p=1.0),
            A.Blur(blur_limit=3, p=0.3),
            A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
            ToTensorV2()
        ], bbox_params=A.BboxParams(format='pascal_voc', label_fields=['labels'], min_visibility=0.3))

    def get_val_transforms():
        return A.Compose([
            A.Resize(height=384, width=384),
            A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
            ToTensorV2()
        ], bbox_params=A.BboxParams(format='pascal_voc', label_fields=['labels']))

    # Albumentations-aware collate function
    def albumentations_collate_fn(batch):
        images, boxes, labels, img_ids = [], [], [], []
        for img, box, label, img_id in batch:
            transformed = get_train_transforms()(image=img, bboxes=box, labels=label)
            images.append(transformed['image'])
            boxes.append(torch.tensor(transformed['bboxes'], dtype=torch.float32))
            labels.append(torch.tensor(transformed['labels'], dtype=torch.int64))
            img_ids.append(img_id)
        return images, boxes, labels, img_ids

    # Hyperparameters
    num_classes = 6
    batch_size = 2
    num_epochs = 1  # short for debugging
    learning_rate = 1e-4
    num_queries = 100
    patience = 10

    # Device setup
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print("Using device:", torch.cuda.get_device_name(0) if torch.cuda.is_available() else device)

    # Dataset paths
    train_json = os.path.join(extract_path, 'annotations/instances_train2017.json')
    val_json = os.path.join(extract_path, 'annotations/instances_val2017.json')
    train_img_dir = os.path.join(extract_path, 'train2017')
    val_img_dir = os.path.join(extract_path, 'val2017')

    # Datasets and loaders
    train_dataset = CarDDDataset(train_json, train_img_dir)
    val_dataset = CarDDDataset(val_json, val_img_dir)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=albumentations_collate_fn)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, collate_fn=lambda x: tuple(zip(*x)))

    # ⚖️ Recalculate class frequencies after filtering
    label_counter = Counter()
    for _, _, labels, _ in DataLoader(train_dataset, batch_size=1, collate_fn=lambda x: tuple(zip(*x))):
        for label_tensor in labels:
            label_counter.update(label_tensor.tolist())
    print("Actual class counts after filtering:", label_counter)

    class_counts = torch.tensor([label_counter.get(i, 0) for i in range(num_classes)], dtype=torch.float)
    class_weights = 1.0 / (class_counts + 1e-6)
    class_weights /= class_weights.sum()

    # Model
    model = CoDETR(num_classes=num_classes, num_queries=num_queries).to(device)

    # Loss functions
    class WeightedFocalLoss(nn.Module):
        def __init__(self, alpha, gamma=2.0):
            super().__init__()
            self.alpha = alpha
            self.gamma = gamma

        def forward(self, inputs, targets):
            ce_loss = nn.functional.cross_entropy(inputs, targets, reduction='none', weight=self.alpha.to(inputs.device))
            pt = torch.exp(-ce_loss)
            focal_loss = (1 - pt) ** self.gamma * ce_loss
            if torch.rand(1).item() < 0.05:
                print(f"[FocalLoss] mean CE: {ce_loss.mean():.4f}, mean FL: {focal_loss.mean():.4f}")
            return focal_loss.mean()

    criterion_cls = WeightedFocalLoss(alpha=class_weights, gamma=2.0)

    from torchvision.ops import generalized_box_iou_loss, box_convert
    criterion_bbox = lambda pred, target: generalized_box_iou_loss(
        box_convert(pred, 'cxcywh', 'xyxy'),
        box_convert(target, 'cxcywh', 'xyxy'),
        reduction='mean'
    )

    optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)

    best_map = 0
    epochs_no_improve = 0
    best_model = None

    print(f"Starting training for {num_epochs} epochs...")


In [None]:
for i, (_, _, labels, _) in enumerate(train_loader):
    for label_tensor in labels:
        if label_tensor.max() >= num_classes:
            print(f"❌ Label index out of range in sample {i}: {label_tensor}")


In [None]:
# --- Step 9: Logging and Training Metrics Visualization ---
import matplotlib.pyplot as plt

loss_log = []
acc_log = []

def log_metrics(loss, acc):
    loss_log.append(loss)
    acc_log.append(acc)

def plot_training_metrics(save_path="/content/drive/MyDrive/Colab Notebooks/cardd_output/training_metrics.png"):
    plt.figure(figsize=(10, 5))
    plt.subplot(1, 2, 1)
    plt.plot(loss_log, label="Loss", color='red')
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.title("Training Loss")
    plt.grid(True)

    plt.subplot(1, 2, 2)
    plt.plot(acc_log, label="Accuracy", color="green")
    plt.xlabel("Epoch")
    plt.ylabel("Accuracy")
    plt.title("Validation Accuracy")
    plt.grid(True)

    plt.tight_layout()
    plt.savefig(save_path)
    print(f"📈 Training metrics plot saved to: {save_path}")