In [None]:
# 导入必要的库
import os
import torch
import numpy as np
from torch.utils.tensorboard import SummaryWriter
from sklearn.metrics import precision_recall_curve


# 设置环境变量
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"

# 导入自定义模块
from config import Config
from data.dataset import create_data_loaders
from models.efficientnet import efficientnetv2_s
from models.components import ModelEMA
from utils.trainer import train_model, LabelSmoothingCrossEntropy, get_cosine_schedule_with_warmup
from utils.metrics import validate
from utils.cam import visualize_cam_for_test
from error_analysis.error_analyzer import test_model_and_analyze_errors

# 创建TensorBoard日志记录器
writer = SummaryWriter(log_dir=Config.LOG_DIR)


In [None]:
# 加载数据
train_loader, val_loader, test_loader, full_loader = create_data_loaders()

In [None]:
# 创建模型
model = efficientnetv2_s(num_classes=2, dropout_rate=0.3).to(Config.DEVICE)
dummy_input = torch.randn(1, 3, Config.IMG_SIZE, Config.IMG_SIZE).to(Config.DEVICE)
writer.add_graph(model, dummy_input)

# 使用权重衰减和AdamW优化器 - 提高泛化性
optimizer = torch.optim.AdamW(
    model.parameters(), 
    lr=Config.BASE_LR,
    weight_decay=Config.WEIGHT_DECAY
)

# 设置学习率调度器
scheduler = get_cosine_schedule_with_warmup(
    optimizer, 
    Config.WARMUP_EPOCHS, 
    Config.EPOCHS, 
    Config.MIN_LR / Config.BASE_LR
)

# 使用标签平滑的损失函数
criterion = LabelSmoothingCrossEntropy(smoothing=0.1)

# 设置EMA模型
if Config.USE_EMA:
    ema_model = ModelEMA(model, decay=Config.EMA_DECAY, device=Config.DEVICE)
else:
    ema_model = None

# 使用自动混合精度进行训练 - 提高训练速度和稳定性
scaler = torch.amp.GradScaler(enabled=Config.USE_AMP)


In [None]:
# 运行训练
print("开始训练...")
best_metrics = train_model(
    model=model,
    train_loader=train_loader,
    val_loader=val_loader,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    scaler=scaler,
    ema_model=ema_model,
    writer=writer
)


In [None]:
# 测试过程
print("\n开始测试...")
best_model = torch.load(os.path.join(Config.MODEL_DIR, "Cervix_Classifier_best.pth"), weights_only=False)
best_model.eval()

test_metrics = validate(best_model, full_loader, criterion)

# 添加CAM可视化
print("\n生成CAM可视化结果...")
cam_dir, cam_results = visualize_cam_for_test(best_model, full_loader, num_samples=20)
print(f"CAM可视化结果保存在: {cam_dir}")

print("\n测试结果:")
print(f"准确率: {test_metrics['acc']:.4f}")
print(f"精确率: {test_metrics['precision']:.4f}")
print(f"召回率: {test_metrics['recall']:.4f}")
print(f"F1分数: {test_metrics['f1']:.4f}")
print(f"AUC: {test_metrics['auc']:.4f}")

# 选择最佳阈值
precision, recall, thresholds = precision_recall_curve(
    test_metrics['labels'], test_metrics['probs']
)
f1_scores = 2 * recall * precision / (recall + precision + 1e-10)
best_threshold = thresholds[np.argmax(f1_scores)]
print(f"最佳决策阈值: {best_threshold:.4f}")

# 关闭TensorBoard写入器
writer.close()


In [1]:
# 加载最佳模型
print("加载最佳模型...")
best_model = torch.load(os.path.join(Config.MODEL_DIR, "Cervix_Classifier_best.pth"), weights_only=False)
best_model.eval()

# 执行错误分析
print("开始进行错误预测分析...")
test_metrics, error_results = test_model_and_analyze_errors(
    model=best_model,
    test_loader=full_loader,
    criterion=criterion
)


加载最佳模型...


NameError: name 'torch' is not defined

In [None]:
# 此单元格仅用于对MobileNet模型进行测试和CAM可视化
import torch.nn as nn
from torchvision import models, transforms
from torch.utils.data import Subset
import pandas as pd
from tqdm import tqdm
from pytorch_grad_cam import GradCAM
from pytorch_grad_cam.utils.image import show_cam_on_image
from pytorch_grad_cam.utils.model_targets import ClassifierOutputTarget
import cv2

# 定义MobileNetV2模型，添加获取CAM层的方法
class MobileNetForCAM(nn.Module):
    def __init__(self, num_classes=2):
        super(MobileNetForCAM, self).__init__()
        # 加载MobileNetV2模型但不要加载预训练权重
        self.model = models.mobilenet_v2(pretrained=False)
        # 修改分类器以适应二分类
        num_ftrs = self.model.classifier[1].in_features
        self.model.classifier[1] = nn.Linear(num_ftrs, num_classes)
        
    def forward(self, x):
        return self.model(x)
    
    # 添加获取最后卷积层的方法，用于CAM
    def get_cam_layer(self):
        # MobileNetV2的最后一个特征层
        return self.model.features[-1]

# 加载预训练的MobileNet模型
print("加载MobileNet模型...")
mobilenet_model = MobileNetForCAM(num_classes=2).to(Config.DEVICE)

# 加载模型权重，使用与训练时相同的方式
try:
    state_dict = torch.load('./saved_models/model_mobilenetv2_Classify_Mix.pth', map_location=Config.DEVICE)
    new_state_dict = {}
    # 检查状态字典是否包含预期的键
    print(f"状态字典中的键数量: {len(state_dict.keys())}")
    
    # 尝试加载模型
    for key, value in state_dict.items():
        if key.startswith('features.') or key.startswith('classifier.'):
            new_key = 'model.' + key  # 添加'model.'前缀
            new_state_dict[new_key] = value
        else:
            new_state_dict[key] = value  # 保持其他键不变

    load_result = mobilenet_model.load_state_dict(new_state_dict, strict=False)

    if load_result.missing_keys:
        print(f"缺少的键: {load_result.missing_keys}")
    if load_result.unexpected_keys:
        print(f"多余的键: {load_result.unexpected_keys}")
    
    print(f"成功加载模型权重")
except Exception as e:
    print(f"加载模型时出错: {e}")

mobilenet_model.eval()  # 设置为评估模式

# 评估模型性能
print("\n开始测试MobileNet模型...")
# 确保使用与训练时相同的验证数据加载器
mobilenet_metrics = validate(mobilenet_model, full_loader, criterion)

# 输出测试结果
print("\nMobileNet测试结果:")
print(f"准确率: {mobilenet_metrics['acc']:.4f}")
print(f"精确率: {mobilenet_metrics['precision']:.4f}")
print(f"召回率: {mobilenet_metrics['recall']:.4f}")
print(f"F1分数: {mobilenet_metrics['f1']:.4f}")
print(f"AUC: {mobilenet_metrics['auc']:.4f}")

# 生成CAM可视化
print("\n生成MobileNet CAM可视化结果...")
mobilenet_cam_dir = os.path.join(Config.MODEL_DIR, "mobilenet_cam_results")
os.makedirs(mobilenet_cam_dir, exist_ok=True)

# 获取目标层
target_layer = mobilenet_model.get_cam_layer()
print(f"使用 {target_layer.__class__.__name__} 作为CAM目标层")

# 初始化GradCAM
cam = GradCAM(
    model=mobilenet_model,
    target_layers=[target_layer]
)

# 为测试集中的样本生成CAM可视化
print("开始测试并生成CAM可视化...")
num_samples = 20
class_names = ['Lesion', 'Normal']  # 与训练时相同的类别名称

# 创建一个较小的测试集用于CAM可视化
indices = list(range(min(num_samples * 2, len(full_loader.dataset))))
subset_dataset = Subset(full_loader.dataset, indices)
cam_loader = torch.utils.data.DataLoader(subset_dataset, batch_size=1, shuffle=True, num_workers=0)

# 创建结果DataFrame
results_df = pd.DataFrame(columns=['image_path', 'true_class', 'pred_class', 'probability', 'correct', 'cam_path'])

# 跟踪处理进度
with tqdm(total=min(num_samples, len(cam_loader))) as pbar:
    for i, (inputs, labels) in enumerate(cam_loader):
        if len(results_df) >= num_samples:
            break
            
        # 准备输入
        input_tensor = inputs.to(Config.DEVICE)
        label_idx = labels.item()
        
        # 前向传播
        with torch.no_grad():
            outputs = mobilenet_model(input_tensor)
            probabilities = torch.nn.functional.softmax(outputs, dim=1)
            prob, pred_idx = torch.max(probabilities, 1)
            prob = prob.item()
            pred_idx = pred_idx.item()
        
        # 恢复原始图像用于可视化
        try:
            # 从tensor恢复图像
            img_tensor = inputs[0].cpu()
            rgb_img = img_tensor.numpy().transpose(1, 2, 0)
            rgb_img = (rgb_img * np.array([0.229, 0.224, 0.225]) + np.array([0.485, 0.456, 0.406]))
            rgb_img = np.clip(rgb_img, 0, 1)
            
            # 为预测类别生成CAM
            targets = [ClassifierOutputTarget(pred_idx)]
            grayscale_cam = cam(input_tensor=input_tensor, targets=targets)
            
            # 确保grayscale_cam形状正确
            cam_image = show_cam_on_image(rgb_img, grayscale_cam[0], use_rgb=True)
            
            # 创建文件名和路径
            cam_filename = f"sample_{i}_true_{class_names[label_idx]}_pred_{class_names[pred_idx]}_prob_{prob:.4f}.jpg"
            cam_path = os.path.join(mobilenet_cam_dir, cam_filename)
            
            # 保存CAM图像
            cv2.imwrite(cam_path, cv2.cvtColor(cam_image, cv2.COLOR_RGB2BGR))
            
            # 保存原始图像供比较
            orig_filename = f"sample_{i}_original.jpg"
            orig_path = os.path.join(mobilenet_cam_dir, orig_filename)
            cv2.imwrite(orig_path, cv2.cvtColor((rgb_img*255).astype(np.uint8), cv2.COLOR_RGB2BGR))
            
            # 添加到结果DataFrame
            new_row = pd.DataFrame({
                'image_path': [f"sample_{i}"],
                'true_class': [class_names[label_idx]],
                'pred_class': [class_names[pred_idx]],
                'probability': [prob],
                'correct': [bool(pred_idx == label_idx)],  # 明确转换为布尔值
                'cam_path': [cam_path]
            })
            results_df = pd.concat([results_df, new_row], ignore_index=True)
            
            pbar.update(1)
            
        except Exception as e:
            print(f"处理图像时出错: {e}")
            continue

# 保存结果DataFrame
results_csv_path = os.path.join(mobilenet_cam_dir, "results.csv")
results_df.to_csv(results_csv_path, index=False)
print(f"CAM可视化结果已保存到: {mobilenet_cam_dir}")
print(f"结果摘要已保存到: {results_csv_path}")

# 计算评估指标
try:
    correct_samples = results_df[results_df['correct'] == True]
    incorrect_samples = results_df[results_df['correct'] == False]
    
    print(f"\n结果摘要:")
    print(f"总样本数: {len(results_df)}")
    print(f"正确分类样本: {len(correct_samples)} ({len(correct_samples)/len(results_df)*100:.2f}%)")
    print(f"错误分类样本: {len(incorrect_samples)} ({len(incorrect_samples)/len(results_df)*100:.2f}%)")
    
    # 按类别统计
    class_stats = results_df.groupby(['true_class', 'correct']).size().unstack(fill_value=0)
    print("\n按类别统计:")
    print(class_stats)
except Exception as e:
    print(f"计算统计信息时出错: {e}")

# 比较MobileNet性能
print("\n模型性能比较:")
print(f"                准确率      精确率      召回率      F1分数      AUC")
print(f"MobileNetV2:   {mobilenet_metrics['acc']:.4f}     {mobilenet_metrics['precision']:.4f}     {mobilenet_metrics['recall']:.4f}     {mobilenet_metrics['f1']:.4f}     {mobilenet_metrics['auc']:.4f}")
print(f"EfficientNetV2: {test_metrics['acc']:.4f}     {test_metrics['precision']:.4f}     {test_metrics['recall']:.4f}     {test_metrics['f1']:.4f}     {test_metrics['auc']:.4f}")
