# Method 2

## True Code 1

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as T
from PIL import Image, ImageFilter
from collections import deque
import random
import numpy as np

# Giả lập kiến trúc mô hình
# Sử dụng S^2A-Net làm nền tảng (như trong bài báo)
# Do không có mã nguồn S^2A-Net, chúng ta sẽ mô phỏng nó
class S2ANet(nn.Module):
    def __init__(self, num_classes):
        super(S2ANet, self).__init__()
        # Backbone: Giả lập ResNet
        self.backbone = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )
        # FPN (Feature Pyramid Network)
        self.fpn = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.ReLU()
        )
        # Feature Alignment Module (FAM)
        self.fam = nn.Sequential(
            nn.Conv2d(128, 128, kernel_size=3, padding=1),
            nn.ReLU()
        )
        # Định nghĩa các đầu ra cho phân loại và hồi quy
        self.cls_head = nn.Linear(128, num_classes) # Classification head
        self.reg_head = nn.Linear(128, 5) # Regression head (x, y, w, h, angle)

    def forward(self, x):
        features = self.backbone(x)
        fpn_features = self.fpn(features)
        aligned_features = self.fam(fpn_features)
        # Giả sử pooling để lấy một vector đặc trưng duy nhất
        pooled_features = torch.mean(aligned_features, dim=[2, 3])
        
        cls_output = self.cls_head(pooled_features)
        reg_output = self.reg_head(pooled_features)
        
        return cls_output, reg_output, pooled_features

# Module Học Tương Phản Đáng Nhớ (Memorable Contrastive Learning - MCL)
class MCL(nn.Module):
    def __init__(self, feature_dim=128, memory_bank_size=4096, momentum=0.999):
        super(MCL, self).__init__()
        self.feature_dim = feature_dim
        self.memory_bank_size = memory_bank_size
        self.momentum = momentum
        
        # Hàng đợi (queue) để lưu trữ các đặc trưng và nhãn
        self.memory_bank = deque(maxlen=self.memory_bank_size)
        
        # Projection Encoder: 2 lớp conv với 1 lớp ReLU
        self.projection_encoder = nn.Sequential(
            nn.Conv2d(feature_dim, feature_dim, kernel_size=1),
            nn.ReLU(),
            nn.Conv2d(feature_dim, feature_dim, kernel_size=1)
        )
        
    def forward(self, proposals_features, proposal_labels, proposal_ious):
        # Lấy các đặc trưng có IoU > ngưỡng (ví dụ 0.5)
        # Điều này giúp loại bỏ các proposals không tốt
        relevant_indices = (proposal_ious > 0.5)
        if not torch.any(relevant_indices):
            return torch.tensor(0.0) # Không có proposal phù hợp để tính loss
        
        relevant_features = proposals_features[relevant_indices]
        relevant_labels = proposal_labels[relevant_indices]
        
        # Ánh xạ các đặc trưng vào không gian nhúng
        embeddings = self.projection_encoder(relevant_features.unsqueeze(-1).unsqueeze(-1))
        embeddings = embeddings.squeeze()
        
        # Chuẩn hóa các vector đặc trưng
        embeddings = nn.functional.normalize(embeddings, dim=1)
        
        # Cập nhật ngân hàng bộ nhớ
        for embed, label in zip(embeddings, relevant_labels):
            self.memory_bank.append({'embedding': embed.detach(), 'label': label.item()})
            
        # Tính toán MCL loss
        if len(self.memory_bank) < 2:
            return torch.tensor(0.0)
            
        loss = 0.0
        for i in range(len(embeddings)):
            current_embed = embeddings[i]
            current_label = relevant_labels[i].item()
            
            # Tính in-batch loss
            for j in range(len(embeddings)):
                if i != j:
                    if relevant_labels[j].item() == current_label:
                        loss += -torch.log(torch.exp(torch.dot(current_embed, embeddings[j])))
                    else:
                        loss += -torch.log(torch.exp(-torch.dot(current_embed, embeddings[j])))
            
            # Tính cross-batch loss (sử dụng memory bank)
            for mem_entry in self.memory_bank:
                mem_embed = mem_entry['embedding']
                mem_label = mem_entry['label']
                if mem_label == current_label:
                    loss += -torch.log(torch.exp(torch.dot(current_embed, mem_embed)))
                else:
                    loss += -torch.log(torch.exp(-torch.dot(current_embed, mem_embed)))
        
        return loss / (len(embeddings) * (len(embeddings) + len(self.memory_bank) -1))


# Kỹ thuật Shot Masking
def apply_shot_masking(image, objects_to_keep, all_objects):
    masked_image = image.copy()
    for obj in all_objects:
        if obj not in objects_to_keep:
            # Giả lập làm mờ Gaussian cho các đối tượng không được chọn
            x, y, w, h, angle = obj['bbox']
            # Chuyển đổi OBB thành một vùng để làm mờ
            box = (int(x - w/2), int(y - h/2), int(x + w/2), int(y + h/2)) # <-- Sửa lỗi ở đây
            region = masked_image.crop(box)
            region = region.filter(ImageFilter.GaussianBlur(radius=5))
            masked_image.paste(region, box)
    return masked_image

# Giả lập quá trình đào tạo
def train_model():
    # Khởi tạo mô hình và các module
    model = S2ANet(num_classes=5)
    mcl_module = MCL()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    
    # Giả lập dữ liệu đào tạo (hình ảnh và nhãn)
    base_data = [{'image': Image.new('RGB', (256, 256)), 'labels': [0, 1]}, {'image': Image.new('RGB', (256, 256)), 'labels': [0, 1]}]
    novel_data = [{'image': Image.new('RGB', (256, 256)), 'labels': [2, 3]}, {'image': Image.new('RGB', (256, 256)), 'labels': [4]}]
    
    # Giai đoạn đào tạo cơ sở
    print("--- Giai đoạn đào tạo cơ sở ---")
    for epoch in range(5):
        for data in base_data:
            img = T.ToTensor()(data['image']).unsqueeze(0)
            cls_output, reg_output, _ = model(img)
            # Giả lập tính toán loss
            cls_loss = nn.CrossEntropyLoss()(cls_output, torch.tensor([data['labels'][0]]))
            reg_loss = nn.MSELoss()(reg_output, torch.rand(1, 5))
            total_loss = cls_loss + reg_loss
            
            optimizer.zero_grad()
            total_loss.backward()
            optimizer.step()
        print(f"Epoch {epoch+1}, Loss: {total_loss.item():.4f}")
        
    # Giai đoạn tinh chỉnh với dữ liệu few-shot
    print("\n--- Giai đoạn tinh chỉnh (Few-shot) ---")
    # Đóng băng backbone
    for param in model.backbone.parameters():
        param.requires_grad = False
        
    for epoch in range(5):
        for data in novel_data:
            # Áp dụng Shot Masking (giả lập)
            # Giả định chỉ chọn một số đối tượng để training
            selected_objects = [{'bbox': (100, 100, 50, 80, 45)}]
            all_objects = [{'bbox': (100, 100, 50, 80, 45)}, {'bbox': (150, 150, 30, 60, 0)}]
            masked_img = apply_shot_masking(data['image'], selected_objects, all_objects)
            img = T.ToTensor()(masked_img).unsqueeze(0)
            
            cls_output, reg_output, features = model(img)
            
            # Tính các loss
            cls_loss = nn.CrossEntropyLoss()(cls_output, torch.tensor([data['labels'][0]]))
            reg_loss = nn.MSELoss()(reg_output, torch.rand(1, 5))
            
            # Tính MCL loss (giả lập proposals)
            proposals = torch.rand(10, 128) # Giả lập 10 proposals
            ious = torch.rand(10)
            labels = torch.randint(0, 5, (10,))
            mcl_loss = mcl_module(proposals, labels, ious)
            
            # Tổng loss
            total_loss = cls_loss + reg_loss + 0.1 * mcl_loss # 0.1 là hyperparameter lambda
            
            optimizer.zero_grad()
            total_loss.backward()
            optimizer.step()
        print(f"Epoch {epoch+1}, Loss: {total_loss.item():.4f}")
    
# Chạy demo
train_model()

--- Giai đoạn đào tạo cơ sở ---
Epoch 1, Loss: 1.9536
Epoch 2, Loss: 1.6171
Epoch 3, Loss: 1.1455
Epoch 4, Loss: 0.7560
Epoch 5, Loss: 0.1160

--- Giai đoạn tinh chỉnh (Few-shot) ---
Epoch 1, Loss: 5.0564
Epoch 2, Loss: 4.1135
Epoch 3, Loss: 3.0515
Epoch 4, Loss: 2.3637
Epoch 5, Loss: 2.0044


## True Code 2

In [2]:
import torch
import torch.nn as nn
from torchvision.models import resnet18

# Giả định các module phức tạp đã được định nghĩa
# Thay thế cho việc triển khai thực tế của các thành phần trong bài báo

class OrientedDetectorHead(nn.Module):
    """
    Module đầu (head) cho phát hiện đối tượng định hướng.
    Thành phần này dự đoán các hộp giới hạn xoay (góc, chiều dài, chiều rộng, tâm).
    """
    def __init__(self, in_channels):
        super().__init__()
        # Triển khai các lớp tích chập và lớp kết nối đầy đủ để dự đoán
        # 5 giá trị cho mỗi hộp: (x, y, w, h, theta)
        self.conv1 = nn.Conv2d(in_channels, 256, kernel_size=3, padding=1)
        self.output = nn.Conv2d(256, 5, kernel_size=1) # Ví dụ, 5 giá trị cho hộp xoay

    def forward(self, features):
        return self.output(self.conv1(features))

class MemorableContrastiveLearning(nn.Module):
    """
    Module học tương phản đáng nhớ (MCL).
    Sử dụng các prototype được lưu trữ để so sánh và học.
    """
    def __init__(self, num_base_classes, feature_dim):
        super().__init__()
        # Bộ nhớ để lưu trữ các prototype của các lớp cơ sở
        # Trong thực tế, đây là một bộ nhớ động được cập nhật trong quá trình huấn luyện
        self.register_buffer("prototypes", torch.zeros(num_base_classes, feature_dim))

    def forward(self, support_features, support_labels, query_features):
        # Đây là nơi logic phức tạp của MCL diễn ra.
        # 1. Trích xuất prototype từ các mẫu hỗ trợ (support samples).
        # 2. So sánh đặc trưng của các mẫu truy vấn (query samples)
        #    với các prototype trong bộ nhớ.
        # 3. Tính toán loss tương phản (contrastive loss)
        #    để kéo các đặc trưng của cùng một lớp lại gần nhau.
        print("Executing Memorable Contrastive Learning logic...")
        # Mã thực tế ở đây sẽ rất phức tạp và cần dữ liệu support/query.
        # Vì đây là demo khái niệm, ta chỉ in ra thông báo.
        return torch.tensor(0.0) # Trả về một giá trị loss giả định

class FOMC_Model(nn.Module):
    """
    Mô hình phát hiện đối tượng FOMC hoàn chỉnh
    """
    def __init__(self, num_base_classes=10, feature_dim=512):
        super().__init__()
        # Backbone: Để trích xuất đặc trưng từ hình ảnh
        # Bài báo có thể sử dụng các mạng mạnh hơn như ResNet-50 hoặc ResNet-101
        self.backbone = resnet18(weights=None)
        self.backbone = nn.Sequential(*list(self.backbone.children())[:-2])

        # Head để phát hiện đối tượng
        self.oriented_head = OrientedDetectorHead(in_channels=feature_dim)

        # Module học tương phản
        self.mcl_module = MemorableContrastiveLearning(num_base_classes, feature_dim)
        
    def forward(self, images, support_data=None, query_data=None):
        # Trích xuất đặc trưng từ hình ảnh
        features = self.backbone(images)

        # Dự đoán các hộp giới hạn định hướng
        detections = self.oriented_head(features)
        
        # Tính toán loss tương phản nếu dữ liệu hỗ trợ được cung cấp
        contrastive_loss = torch.tensor(0.0)
        if support_data and query_data:
            support_features = self.backbone(support_data["images"])
            query_features = self.backbone(query_data["images"])
            contrastive_loss = self.mcl_module(
                support_features, support_data["labels"], query_features
            )

        return detections, contrastive_loss


# --- Cách sử dụng (khái niệm) ---

# Khởi tạo mô hình
model = FOMC_Model()

# Giả định dữ liệu đầu vào
# Một batch hình ảnh có kích thước (batch_size, channels, height, width)
dummy_image = torch.randn(1, 3, 512, 512)

# Giả định dữ liệu ít mẫu (few-shot)
# Trong quá trình huấn luyện thực tế, các tập này được tạo động
# từ dataset cơ sở.
dummy_support_data = {
    "images": torch.randn(5, 3, 512, 512),
    "labels": torch.tensor([1, 2, 3, 4, 5])
}
dummy_query_data = {
    "images": torch.randn(10, 3, 512, 512)
}

# Chạy forward pass
detections, contrastive_loss = model(
    images=dummy_image,
    support_data=dummy_support_data,
    query_data=dummy_query_data
)

print("--- Kết quả mô hình khái niệm ---")
print("Dự đoán hộp giới hạn:", detections.shape)
print("Giá trị Contrastive Loss:", contrastive_loss.item())

Executing Memorable Contrastive Learning logic...
--- Kết quả mô hình khái niệm ---
Dự đoán hộp giới hạn: torch.Size([1, 5, 16, 16])
Giá trị Contrastive Loss: 0.0


## True Code 3

In [3]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
from torch.utils.data import DataLoader, Subset
from torchgeo.datasets import VHR10
import random

# ----- Step 1: Load dataset -----
def preprocess(sample):
    sample["image"] = sample["image"].float() / 255.0
    return sample

ds = VHR10(
    root="data/VHR10/",
    split="positive",
    transforms=preprocess,
    download=True,
    checksum=True,
)

# ----- Step 2: Split base vs novel classes -----
# NWPU VHR-10: 10 classes (1–10). Giả sử chọn 3 novel classes: airplane=1, baseball diamond=4, tennis court=5
novel_classes = [1, 4, 5]
base_classes = [c for c in range(1, 11) if c not in novel_classes]

base_indices, novel_indices = [], []
for i in range(len(ds)):
    labels = ds[i]["label"]
    if any(l in novel_classes for l in labels):
        novel_indices.append(i)
    else:
        base_indices.append(i)

base_ds = Subset(ds, base_indices)
novel_ds = Subset(ds, novel_indices)

# Few-shot: chọn K=5 ảnh cho mỗi novel class
K = 5
fewshot_indices = []
for cls in novel_classes:
    cls_idxs = [i for i in novel_indices if cls in ds[i]["label"]]
    fewshot_indices.extend(random.sample(cls_idxs, min(K, len(cls_idxs))))
fewshot_ds = Subset(ds, fewshot_indices)

# ----- Step 3: Model (toy Faster R-CNN) -----
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(
    weights="DEFAULT"
)
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = torchvision.models.detection.faster_rcnn.FastRCNNPredictor(
    in_features, 11  # 10 classes + background
)

# ----- Step 4: Memory Bank for MCL -----
class MemoryBank:
    def __init__(self, size=8192, feat_dim=256, device="cpu"):
        self.size = size
        self.ptr = 0
        self.feats = torch.zeros(size, feat_dim, device=device)
        self.labels = torch.zeros(size, dtype=torch.long, device=device)

    def update(self, feats, labels):
        bsz = labels.shape[0]   # <-- luôn sync với labels
        if bsz > self.size:
            feats, labels = feats[:self.size], labels[:self.size]
            bsz = self.size

        idxs = (self.ptr + torch.arange(bsz)) % self.size
        idxs = idxs.long().to(self.feats.device)

        # Debug
        # print("update:", bsz, feats.shape, labels.shape, idxs.shape)

        self.feats[idxs] = feats.detach()
        # self.labels[idxs] = labels.detach()
        self.labels[idxs] = labels.detach().view(-1)[:len(idxs)]

        self.ptr = (self.ptr + bsz) % self.size

    def get(self):
        return self.feats, self.labels

def supervised_contrastive_loss(features, labels, memory_feats, memory_labels, temperature=0.1):
    # Normalize
    features = F.normalize(features, dim=1)
    memory_feats = F.normalize(memory_feats, dim=1)

    logits = torch.mm(features, memory_feats.t()) / temperature
    labels = labels.view(-1, 1)
    mask = (labels == memory_labels.view(1, -1)).float()

    exp_logits = torch.exp(logits)
    log_prob = logits - torch.log(exp_logits.sum(1, keepdim=True) + 1e-8)
    loss = -(mask * log_prob).sum(1) / (mask.sum(1) + 1e-8)
    return loss.mean()

# ----- Step 5: Training Loop (pseudo) -----
device = "cuda" if torch.cuda.is_available() else "cpu"
model = model.to(device)
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3, momentum=0.9)

memory = MemoryBank(size=1024, feat_dim=in_features)

# def extract_roi_features(model, images, targets):
#     # Toy: use box_head from FasterRCNN to get features
#     features = model.backbone(images.tensors)
#     return features  # placeholder (need integration with roi_heads)

def extract_gt_features(model, images, targets):
    # Transform
    transformed = model.transform(images, targets)
    images_t, targets_t = transformed

    # Backbone
    features = model.backbone(images_t.tensors)
    if isinstance(features, torch.Tensor):
        features = {"0": features}

    # Lấy gt boxes làm proposal
    proposals = [t["boxes"] for t in targets_t]

    # ROI Pooling trên GT boxes
    box_features = model.roi_heads.box_roi_pool(
        features, proposals, images_t.image_sizes
    )
    # Head
    box_features = model.roi_heads.box_head(box_features)

    labels = torch.cat([t["labels"] for t in targets_t])
    return box_features, labels

for epoch in range(2):  # demo only
    for batch in DataLoader(fewshot_ds, batch_size=2, shuffle=True, collate_fn=lambda x: x):
        images = [s["image"] for s in batch]
        targets = []
        for s in batch:
            boxes = s["bbox_xyxy"]
            labels = s["label"]
            targets.append({"boxes": boxes, "labels": labels})

        images = [img.to(device) for img in images]
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        loss_dict = model(images, targets)
        detection_loss = sum(loss for loss in loss_dict.values())

        # MCL: giả sử có roi_feats (cần implement thêm)
        # roi_feats = torch.randn(len(targets), in_features, device=device)  # placeholder
        # roi_labels = torch.cat([t["labels"] for t in targets])

        roi_feats, roi_labels = extract_gt_features(model, images, targets)

        memory_feats, memory_labels = memory.get()
        memory_feats, memory_labels = memory_feats.to(device), memory_labels.to(device)

        if memory_labels.sum() > 0:
            mcl_loss = supervised_contrastive_loss(roi_feats, roi_labels, memory_feats, memory_labels)
        else:
            mcl_loss = torch.tensor(0.0, device=device)

        loss = detection_loss + 0.3 * mcl_loss
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        memory.update(roi_feats.cpu(), roi_labels.cpu())

    print(f"Epoch {epoch}, Loss {loss.item():.4f}")


Files already downloaded and verified
loading annotations into memory...
Done (t=0.05s)
creating index...
index created!
Epoch 0, Loss 2.9247
Epoch 1, Loss 2.3868
