# 实验环境设置与项目导入

**目标**: 克隆 `SED` 仓库


In [1]:

import os
import sys

repo_dir = 'SED'  # 本地文件夹名
repo_url = 'https://github.com/shengmengmeng/SED.git'


if not os.path.exists(repo_dir):
    print(f"克隆仓库 '{repo_url}'...")

    !git clone {repo_url}
else:

    print(f"仓库 '{repo_dir}' 已存在。")

if repo_dir not in sys.path:
    sys.path.append(repo_dir)
    print(f"'{repo_dir}' 已成功添加到系统路径。")
else:
    print(f"'{repo_dir}' 已经在系统路径中。")

克隆仓库 'https://github.com/shengmengmeng/SED.git'...
Cloning into 'SED'...
remote: Enumerating objects: 44, done.[K
remote: Counting objects: 100% (44/44), done.[K
remote: Compressing objects: 100% (41/41), done.[K
remote: Total 44 (delta 7), reused 0 (delta 0), pack-reused 0 (from 0)[K
Receiving objects: 100% (44/44), 1.09 MiB | 8.06 MiB/s, done.
Resolving deltas: 100% (7/7), done.
'SED' 已成功添加到系统路径。


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import timm
import numpy as np
from PIL import Image
from torchvision import transforms
from sklearn.mixture import GaussianMixture
from tqdm.notebook import tqdm
import random

class Config:
    # 数据集参数
    DATA_PATH = "/kaggle/input/webfg400-train/train"  # 修正变量名
    TEST_DATA_PATH = "/kaggle/input/webfg400-test-a/test_A"
    NUM_CLASSES = 12  # 我们只使用前12个类
    IMAGE_SIZE = 224  # 细粒度识别通常需要更高分辨率
    VALIDATION_RATIO = 0.1  # 从训练集中取10%作为验证集

    # 模型参数
    MODEL_NAME = 'vit_base_patch16_224' # TransFG 的基础模型
    PRETRAINED = True
    PART_SELECT_LAYER = 8 # 在第8层后选择关键区域
    NUM_SELECTED_PARTS = 16 # 选择16个关键图像块

    # 训练参数
    DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    EPOCHS = 50
    BATCH_SIZE = 8 # 可调
    LR = 1e-5 #学习率
    WEIGHT_DECAY = 1e-4 #权重衰减
    SEED = 42

    WARMUP_EPOCHS = 10 #用10轮初步学习
    

    OUTPUT_DIR = "/kaggle/working/output"


CFG = Config()

#输出目录
os.makedirs(CFG.OUTPUT_DIR, exist_ok=True)

#随机种子
def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_seed(CFG.SEED)

print(f"设备:{CFG.DEVICE}")

设备:cuda


# CUB-200-2011 数据集定义

**目标**: 创建一个自定义的 `Dataset` 类，用于加载 CUB 数据集。

**说明**:
- 这个类将只加载指定的12个类别 (文件夹 '001' 到 '012')。
- 它会自动划分训练集和测试集 (80/20比例)。
- 它会应用适当的数据增强和预处理。

In [None]:
class FineGrainedCUBDataset(Dataset):
    def __init__(self, root_dir, num_classes, train=True, transform=None, validation_ratio=0.1):
        self.root_dir = root_dir
        self.transform = transform
        self.train = train
        self.image_paths = []
        self.labels = []

        # 获取所有类别文件夹
        class_folders = sorted([d for d in os.listdir(root_dir) if os.path.isdir(os.path.join(root_dir, d))])
        selected_folders = class_folders[:num_classes]

        for label, class_folder in enumerate(selected_folders):
            class_path = os.path.join(root_dir, class_folder)
            images_in_class = sorted([os.path.join(class_path, f) for f in os.listdir(class_path) 
                                    if f.lower().endswith(('.png', '.jpg', '.jpeg'))])
            
            # 划分训练集和验证集
            split_idx = int(len(images_in_class) * (1 - validation_ratio))
            if self.train:
                self.image_paths.extend(images_in_class[:split_idx])
                self.labels.extend([label] * split_idx)
            else:
                self.image_paths.extend(images_in_class[split_idx:])
                self.labels.extend([label] * (len(images_in_class) - split_idx))

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        label = self.labels[idx]
        image = Image.open(img_path).convert('RGB')
        
        if self.transform:
            image = self.transform(image)
            
        return image, label, idx

class TestDataset(Dataset):
    def __init__(self, test_dir, transform=None):
        self.test_dir = test_dir
        self.transform = transform
        self.image_paths = sorted([os.path.join(test_dir, f) for f in os.listdir(test_dir) 
                                  if f.lower().endswith(('.png', '.jpg', '.jpeg'))])
        
    def __len__(self):
        return len(self.image_paths)
    
    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert('RGB')
        
        if self.transform:
            image = self.transform(image)
            
        # 返回图像和文件名，用于生成提交结果
        filename = os.path.basename(img_path)
        return image, filename

数据集创建完成:
 - 训练集样本数: 1001
 - 测试集样本数: 256


# TransFG 模型定义

实现 TransFG 模型的关键部分。


In [4]:
class PartSelection(nn.Module):
    #根据注意力矩阵选择最重要的图像块
    
    def __init__(self, num_selected_parts):
        
        super().__init__()
        self.num_selected_parts = num_selected_parts #选取图块数量

    def forward(self, x, attention_matrix):
        """
        根据注意力权重选择最重要的图像块
        参数:
            x: 特征张量 [batch_size, num_patches+1, embed_dim]
            attention_matrix: 注意力权重矩阵 [batch_size, num_heads, num_patches+1, num_patches+1]
        返回:
            selected_parts: 选中的图像块特征 [batch_size, num_selected_parts, embed_dim]
        """
        # 提取CLS token对所有图像块的注意力，并在头维度上平均
        cls_attention = attention_matrix[:, :, 0, 1:].mean(dim=1)  
        
        # 选择注意力最高的K个图像块的索引
        _, top_indices = torch.topk(cls_attention, self.num_selected_parts, dim=1) 
        
        top_indices = top_indices + 1
        
        batch_indices = torch.arange(x.size(0)).unsqueeze(1).to(x.device) #批次索引
        
        # 提取选定的图像块特征
        selected_parts = x[batch_indices, top_indices] 
        return selected_parts

class TransFG(nn.Module):
    """
    TransFG模型:基于Vision Transformer的细粒度分类模型
    结合全局特征和关键区域特征进行细粒度识别
    """
    def __init__(self, model_name, num_classes, pretrained=True, part_select_layer=8, num_selected_parts=16):
        """
            model_name: 基础ViT模型名称
            num_classes: 分类类别数
            pretrained: 是否使用预训练权重
            part_select_layer: 在哪一层之后选择关键区域
            num_selected_parts: 选择的关键图像块数量
        """
        super().__init__()
        self.part_select_layer = part_select_layer
        
        self.vit = timm.create_model(model_name, pretrained=pretrained)
        
        # 将ViT模型分为两部分，以便在中间提取关键区域
        self.vit_part1 = nn.Sequential(*self.vit.blocks[:part_select_layer])  # 前半部分Transformer块
        self.vit_part2 = nn.Sequential(*self.vit.blocks[part_select_layer:])  # 后半部分Transformer块
        
        # 关键区域选择器
        self.part_selector = PartSelection(num_selected_parts)
        
        # 获取嵌入维度
        embed_dim = self.vit.embed_dim
        
        # 分类头
        self.head = nn.Linear(embed_dim * 2, num_classes)
        
        # 移除原始ViT的分类头
        self.vit.head = nn.Identity()

    def forward(self, x):
        """
        前向传播
        参数:
            x: 输入图像 [batch_size, channels, height, width]
        返回:
            logits: 分类logits [batch_size, num_classes]
        """
        # 图像切片嵌入
        x = self.vit.patch_embed(x) 
        
        # 分类token
        cls_token = self.vit.cls_token.expand(x.shape[0], -1, -1) 
        x = torch.cat((cls_token, x), dim=1)          
        # 位置编码
        x = x + self.vit.pos_embed
        x = self.vit.pos_drop(x)

        for blk in self.vit_part1:
            x = blk(x)
        
        attn_weights = self.vit_part1[-1].attn.get_attention_map()
        selected_parts = self.part_selector(x, attn_weights)
        
        part_cls_token = self.vit.cls_token.expand(selected_parts.shape[0], -1, -1)
        part_x = torch.cat([part_cls_token, selected_parts], dim=1)
        
        global_stream = self.vit_part2(x)
        global_cls = global_stream[:, 0]

        part_stream = self.vit_part2(part_x)
        part_cls = part_stream[:, 0]
        
        final_feature = torch.cat([global_cls, part_cls], dim=1)
        logits = self.head(final_feature)
        return logits

# 在 timm 的 ViTBlock 中添加一个方法来获取注意力图
def get_attention_map(self):
    return self.attention_map #返回注意力权重矩阵

timm.models.vision_transformer.Attention.get_attention_map = get_attention_map


#自注意力机制
def forward_attn(self, x):
    B, N, C = x.shape
    qkv = self.qkv(x).reshape(B, N, 3, self.num_heads, C // self.num_heads).permute(2, 0, 3, 1, 4)
    q, k, v = qkv.unbind(0)
    attn = (q @ k.transpose(-2, -1)) * self.scale
    attn = attn.softmax(dim=-1)
    self.attention_map = attn 
    attn = self.attn_drop(attn)
    x = (attn @ v).transpose(1, 2).reshape(B, N, C)
    x = self.proj(x)
    x = self.proj_drop(x)
    return x

timm.models.vision_transformer.Attention.forward = forward_attn

print("TransFG 模型定义完成。")

TransFG 模型定义完成。


# SED 训练与评估函数

SED 训练策略的核心逻辑和标准的评估函数。

In [None]:
def evaluate(model, dataloader, device):
    #评估模型在验证集上的准确率
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels, _ in dataloader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)  # 获取预测类别
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    accuracy = 100 * correct / total
    return accuracy

# 添加函数用于生成测试集的预测结果
def predict_test_set(model, test_loader, device):
    model.eval()
    predictions = []
    filenames = []
    
    with torch.no_grad():
        for images, image_filenames in test_loader:
            images = images.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            
            predictions.extend(predicted.cpu().numpy().tolist())
            filenames.extend(image_filenames)
    
    return filenames, predictions

def train_sed_epoch(epoch, model, train_loader, optimizer, device, num_classes):
    #使用SED策略训练一个epoch，将样本分为易学习和难学习两类分别处理
    model.train()
    
    # 计算每个样本的损失值
    losses = torch.zeros(len(train_loader.dataset))
    model.eval()
    with torch.no_grad():
        for images, labels, indices in train_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = F.cross_entropy(outputs, labels, reduction='none')  # 计算单个样本损失
            for i, idx in enumerate(indices):
                losses[idx] = loss[i].item()
    model.train()

    # 使用GMM将样本划分为干净样本和困难样本 高斯混合模型，多个正态分布混合
    clean_idx, noisy_idx = [], []
    for c in range(num_classes):
        # 提取每个类别的样本索引
        class_indices = np.where(np.array(train_loader.dataset.labels) == c)[0]
        if len(class_indices) == 0: continue
        
        # 使用二分量GMM对损失进行聚类 GMM可以实现自适应阈值
        class_losses = losses[class_indices].numpy().reshape(-1, 1)
        gmm = GaussianMixture(n_components=2, max_iter=10, tol=1e-2, reg_covar=5e-4)
        gmm.fit(class_losses)
        clean_component_idx = gmm.means_.argmin()  # 损失较小的分量为干净样本
        pred = gmm.predict(class_losses)
        clean_mask = (pred == clean_component_idx)
        
        # 划分干净样本和困难样本
        clean_idx.extend(class_indices[clean_mask])
        noisy_idx.extend(class_indices[~clean_mask])

    # 使用标准交叉熵损失训练干净样本
    clean_sampler = torch.utils.data.SubsetRandomSampler(clean_idx)
    clean_loader = DataLoader(train_loader.dataset, batch_size=CFG.BATCH_SIZE, sampler=clean_sampler, num_workers=2)
    
    print(f"Epoch [{epoch+1}/{CFG.EPOCHS}] - 划分完成. 干净样本: {len(clean_idx)}, 困难样本: {len(noisy_idx)}")

    for images, labels, _ in clean_loader:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = F.cross_entropy(outputs, labels)
        loss.backward()
        optimizer.step()

    # 对困难样本使用伪标签和类别平衡策略训练
    if len(noisy_idx) > 0:
        noisy_sampler = torch.utils.data.SubsetRandomSampler(noisy_idx)
        noisy_loader = DataLoader(train_loader.dataset, batch_size=CFG.BATCH_SIZE, sampler=noisy_sampler, num_workers=2)
        
        # 生成伪标签
        model.eval()
        pseudo_labels = torch.zeros(len(train_loader.dataset), num_classes).to(device)
        with torch.no_grad():
            for images, _, indices in noisy_loader:
                images = images.to(device)
                outputs = model(images)
                pseudo_labels[indices] = F.softmax(outputs, dim=1)  # 软标签
        model.train()
        
        # 使用伪标签和类别平衡权重训练
        for images, labels, indices in noisy_loader:
            images, labels = images.to(device), labels.to(device)
            pt = pseudo_labels[indices]
            
            # 计算类别平衡权重
            pt_per_class = pt.sum(dim=0)
            pt_per_class = pt_per_class / pt_per_class.sum()
            class_weights = (1.0 / (num_classes * pt_per_class)).to(device)
            batch_weights = torch.matmul(pt, class_weights)
            
            optimizer.zero_grad()
            outputs = model(images)
            # 使用KL散度损失
            loss = -torch.sum(F.log_softmax(outputs, dim=1) * pt, dim=1)
            loss = (loss * batch_weights).mean()  # 应用权重
            loss.backward()
            optimizer.step()

print("SED 训练和评估函数定义完成。")

SED 训练和评估函数定义完成。


# 主程序：模型训练与评估

**目标**: 将所有部分组合在一起，启动完整的训练和评估流程。

**说明**:
1.  实例化模型、优化器和数据加载器。
2.  执行 `WARMUP_EPOCHS` 次数的标准训练。
3.  执行剩余次数的 `SED` 训练。
4.  在每个 epoch 结束后，在测试集上评估模型性能，并保存表现最好的模型。

In [None]:
# --- 定义数据预处理和增强 ---
train_transform = transforms.Compose([
    transforms.Resize((CFG.IMAGE_SIZE, CFG.IMAGE_SIZE)),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

val_transform = transforms.Compose([
    transforms.Resize((CFG.IMAGE_SIZE, CFG.IMAGE_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# --- 1. 实例化模型、优化器和数据加载器 ---
model = TransFG(
    model_name=CFG.MODEL_NAME,
    num_classes=CFG.NUM_CLASSES,
    pretrained=CFG.PRETRAINED,
    part_select_layer=CFG.PART_SELECT_LAYER,
    num_selected_parts=CFG.NUM_SELECTED_PARTS
).to(CFG.DEVICE)

optimizer = optim.AdamW(model.parameters(), lr=CFG.LR, weight_decay=CFG.WEIGHT_DECAY)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=CFG.EPOCHS)

# --- 创建数据集实例 ---
try:
    train_dataset = FineGrainedCUBDataset(
        CFG.DATA_PATH, 
        CFG.NUM_CLASSES, 
        train=True, 
        transform=train_transform, 
        validation_ratio=CFG.VALIDATION_RATIO
    )
    
    val_dataset = FineGrainedCUBDataset(
        CFG.DATA_PATH, 
        CFG.NUM_CLASSES, 
        train=False, 
        transform=val_transform, 
        validation_ratio=CFG.VALIDATION_RATIO
    )
    
    test_dataset = TestDataset(CFG.TEST_DATA_PATH, transform=val_transform)
    
    print(f"数据集创建完成:")
    print(f" - 训练集样本数: {len(train_dataset)}")
    print(f" - 验证集样本数: {len(val_dataset)}")
    print(f" - 测试集样本数: {len(test_dataset)}")
except FileNotFoundError as e:
    print(f"错误: {e}")
    print("确保已添加正确的数据集路径。")

数据集创建完成:
 - 训练集样本数: 1001
 - 测试集样本数: 256


model.safetensors:   0%|          | 0.00/346M [00:00<?, ?B/s]

数据集创建完成:
 - 训练集样本数: 1001
 - 测试集样本数: 256


In [None]:
train_loader = DataLoader(train_dataset, batch_size=CFG.BATCH_SIZE, shuffle=True, num_workers=2)
val_loader = DataLoader(val_dataset, batch_size=CFG.BATCH_SIZE, shuffle=False, num_workers=2)
test_loader = DataLoader(test_dataset, batch_size=CFG.BATCH_SIZE, shuffle=False, num_workers=2)

# --- 2. 开始训练 ---
best_accuracy = 0.0

for epoch in range(CFG.EPOCHS):
    print(f"--- Epoch {epoch+1}/{CFG.EPOCHS} ---")
    
    if epoch < CFG.WARMUP_EPOCHS:
        print("模式: 标准训练 (热身)")
        model.train()
        for images, labels, _ in tqdm(train_loader, desc="训练中"):
            images, labels = images.to(CFG.DEVICE), labels.to(CFG.DEVICE)
            
            optimizer.zero_grad()
            outputs = model(images)
            loss = F.cross_entropy(outputs, labels)
            loss.backward()
            optimizer.step()
    else:
        # --- SED 训练 ---
        print("模式: SED 训练")
        train_sed_epoch(epoch, model, train_loader, optimizer, CFG.DEVICE, CFG.NUM_CLASSES)

    # --- 评估与保存 ---
    current_accuracy = evaluate(model, val_loader, CFG.DEVICE)
    print(f"Epoch [{epoch+1}/{CFG.EPOCHS}] - 验证集准确率: {current_accuracy:.2f}%")

    
    if current_accuracy > best_accuracy:
        best_accuracy = current_accuracy
        
        model_path = os.path.join(CFG.OUTPUT_DIR, 'best_model.pth')
        torch.save(model.state_dict(), model_path)
        print(f"得分更高，模型已保存至: {model_path}")
            
    scheduler.step()

print("\n--- 训练完成 ---")
print(f"最佳验证集准确率: {best_accuracy:.2f}%")

# 对测试集进行预测并保存结果
print("\n--- 生成测试集预测结果 ---")
model.load_state_dict(torch.load(os.path.join(CFG.OUTPUT_DIR, 'best_model.pth')))
filenames, predictions = predict_test_set(model, test_loader, CFG.DEVICE)

# 创建提交文件
import pandas as pd
submission = pd.DataFrame({
    'image': filenames,
    'label': predictions
})
submission.to_csv(os.path.join(CFG.OUTPUT_DIR, 'submission.csv'), index=False)
print(f"测试集预测结果已保存至: {os.path.join(CFG.OUTPUT_DIR, 'submission.csv')}")

--- Epoch 1/50 ---


热身中:   0%|          | 0/126 [00:00<?, ?it/s]

Epoch [1/50] - 测试集准确率: 75.00%
--- Epoch 2/50 ---


热身中:   0%|          | 0/126 [00:00<?, ?it/s]

Epoch [2/50] - 测试集准确率: 75.39%
--- Epoch 3/50 ---


热身中:   0%|          | 0/126 [00:00<?, ?it/s]

Epoch [3/50] - 测试集准确率: 80.08%
--- Epoch 4/50 ---


热身中:   0%|          | 0/126 [00:00<?, ?it/s]

Epoch [4/50] - 测试集准确率: 81.25%
--- Epoch 5/50 ---


热身中:   0%|          | 0/126 [00:00<?, ?it/s]

Epoch [5/50] - 测试集准确率: 82.03%
--- Epoch 6/50 ---


热身中:   0%|          | 0/126 [00:00<?, ?it/s]

Epoch [6/50] - 测试集准确率: 81.64%
--- Epoch 7/50 ---


热身中:   0%|          | 0/126 [00:00<?, ?it/s]

Epoch [7/50] - 测试集准确率: 82.81%
--- Epoch 8/50 ---


热身中:   0%|          | 0/126 [00:00<?, ?it/s]

Epoch [8/50] - 测试集准确率: 83.20%
--- Epoch 9/50 ---


热身中:   0%|          | 0/126 [00:00<?, ?it/s]

Epoch [9/50] - 测试集准确率: 81.25%
--- Epoch 10/50 ---


热身中:   0%|          | 0/126 [00:00<?, ?it/s]

Epoch [10/50] - 测试集准确率: 83.20%
--- Epoch 11/50 ---
模式: SED 训练
Epoch [11/50] - 划分完成. 干净样本: 994, 困难样本: 7
Epoch [11/50] - 测试集准确率: 84.38%
--- Epoch 12/50 ---
模式: SED 训练
Epoch [12/50] - 划分完成. 干净样本: 994, 困难样本: 7
Epoch [12/50] - 测试集准确率: 83.59%
--- Epoch 13/50 ---
模式: SED 训练
Epoch [13/50] - 划分完成. 干净样本: 994, 困难样本: 7
Epoch [13/50] - 测试集准确率: 83.98%
--- Epoch 14/50 ---
模式: SED 训练
Epoch [14/50] - 划分完成. 干净样本: 994, 困难样本: 7
Epoch [14/50] - 测试集准确率: 84.38%
--- Epoch 15/50 ---
模式: SED 训练
Epoch [15/50] - 划分完成. 干净样本: 994, 困难样本: 7
Epoch [15/50] - 测试集准确率: 84.38%
--- Epoch 16/50 ---
模式: SED 训练
Epoch [16/50] - 划分完成. 干净样本: 994, 困难样本: 7
Epoch [16/50] - 测试集准确率: 83.98%
--- Epoch 17/50 ---
模式: SED 训练
Epoch [17/50] - 划分完成. 干净样本: 994, 困难样本: 7
Epoch [17/50] - 测试集准确率: 83.98%
--- Epoch 18/50 ---
模式: SED 训练
Epoch [18/50] - 划分完成. 干净样本: 994, 困难样本: 7
Epoch [18/50] - 测试集准确率: 84.38%
--- Epoch 19/50 ---
模式: SED 训练
Epoch [19/50] - 划分完成. 干净样本: 994, 困难样本: 7
Epoch [19/50] - 测试集准确率: 83.98%
--- Epoch 20/50 ---
模式: SED 训练
Epoch [20/5