In [7]:
import os
from load_img import load_dcm, load_nii 
import numpy as np
import cv2

def load_jpg(jpg_path, target_size=(512, 512)):
    # 加载 DICOM 图像
    jpg_image = cv2.imread(jpg_path, cv2.IMREAD_GRAYSCALE)
    jpg_image = jpg_image.astype(np.float32)

    jpg_image_normalized = (jpg_image - np.min(jpg_image)) / (np.max(jpg_image) - np.min(jpg_image))  

    # 转换尺寸（512x512）
    jpg_image_resized = cv2.resize(jpg_image_normalized, target_size)


    return jpg_image_resized

# 遍历病人的姓名，找到对应的文件夹并加载图像
def process_images_for_patients(base_path, target_size=(512, 512)):

    # 获取所有病人文件夹
    all_folders = sorted([f for f in os.listdir(base_path) if os.path.isdir(os.path.join(base_path, f))])

    # 按病人分组文件夹，每两个文件夹为一个病人
    patient_folders = []
    for i in range(0, len(all_folders), 2):
        if i + 1 < len(all_folders):
            patient_folders.append([all_folders[i], all_folders[i + 1]])


    # for _, row in labels_df.iterrows():
    #     label = row['N分期']  # 获取病人标签（N分期）
        
    # 加载每个文件夹中的图像
    patient_images = []

    for folder_pair in patient_folders:
        all_images = []
        for folder in folder_pair:
            dcm_file = None
            nii_file = None
            jpg_file = None

            # print(folder)

            # 查找对应的 .dcm 和 .nii 文件
            for file in os.listdir(os.path.join(base_path, folder)):
                if file.endswith('.dcm'):
                    dcm_file = os.path.join(base_path, folder, file)
                elif file.endswith('.nii.gz'):
                    nii_file = os.path.join(base_path, folder, file)
                elif file.endswith('.jpg'):
                    jpg_file = os.path.join(base_path, folder, file)
                           
            # print(f"dcm_file: {dcm_file}, nii_file: {nii_file}")

            if dcm_file and nii_file:
                # 读取并处理 .dcm 和 .nii 图像
                dcm_image = load_dcm(dcm_file, target_size)
                nii_mask = load_nii(nii_file, target_size)

                # 将两个图像相乘
                focused_dcm_image = dcm_image * nii_mask
                all_images.append(focused_dcm_image)
            
            elif jpg_file and nii_file:
                # print(f"folder {folder} jpg")
                jpg_image = load_jpg(jpg_file, target_size)
                nii_mask = load_nii(nii_file, target_size)

                # 将两个图像相乘
                focused_jpg_image = jpg_image * nii_mask
                all_images.append(focused_jpg_image)

            elif dcm_file:
                # print(f"folder {folder} no nii")
                all_images.append(load_dcm(dcm_file, target_size))
                        
        if len(all_images) == 2:  # 确保每个病人有 2 张图像

            # print(all_images[0].shape)
            # 将两个图像堆叠在一起
            patient_input = np.stack(all_images, axis=0)  # 形状为 (2, 512, 512)
            # 追加至列表中
            patient_images.append(patient_input)

        else:
            print(f"Skipping patient {folder_pair} due to missing images")

    return patient_images

In [8]:
# from process_data import process_images_for_patients

base_path = './chaoyang_qianzhan_190'  # 图像数据的根目录

target_size = (512, 512)  # 目标图像尺寸

patient_images = process_images_for_patients(base_path, target_size)

  _warn_about_invalid_encoding(encoding)


In [9]:
import pandas as pd

def load_labels(excel_path):
    labels_df = pd.read_excel(excel_path)
    return labels_df

excel_path = './beiyou_excel/chaoyang_prospective_190.xlsx'  # 包含病人姓名和标签的Excel文件路径
labels_df = load_labels(excel_path)
# 补全标签并构建 images_with_labels 列表
images_with_labels = []
for i, patient_input in enumerate(patient_images):
    label = labels_df.iloc[i]['N分期']  # 按顺序获取对应的标签

    # 如果标签为 NaN，则用均值填充
    if pd.isna(label):
        label = 1.0

    images_with_labels.append((patient_input, label))

In [10]:
from torchvision import transforms
from torch.utils.data import DataLoader

from dataset import ImageDataset

# 测试数据预处理，与验证集一致
test_transform = transforms.Compose([
    transforms.Normalize([0.5], [0.5]),  # 标准化
])

# 创建测试数据集
test_dataset = ImageDataset(images_with_labels, transform=test_transform)

# 创建测试数据加载器
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)



In [18]:
from model import Resnet18_cbam
import torch
from sklearn.metrics import roc_auc_score
import torch.nn.functional as F

model = Resnet18_cbam(num_classes=4)
model_path = './epoch100_model.pth'

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

print(f"使用设备: {device}")

model.load_state_dict(torch.load(model_path))
model.eval()  # 设置模型为评估模式


# 初始化变量
correct = 0
total = 0
all_labels = []
all_probs = []

with torch.no_grad():  # 禁用梯度计算
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)  # 数据移动到设备

        outputs = model(inputs)  # 模型推理
        probs = F.softmax(outputs, dim=1)  # 计算每个类别的概率
        _, predicted = torch.max(outputs, 1)  # 获取预测结果

        # 统计正确数和总数
        correct += (predicted == labels).sum().item()
        total += labels.size(0)

        # 收集所有标签和概率
        all_labels.append(labels.cpu().numpy())
        all_probs.append(probs.cpu().numpy())

# 计算准确率
accuracy = correct / total
print(f"测试集准确率: {accuracy:.4f}")

# 计算 AUC
all_labels = np.concatenate(all_labels)  # 合并所有批次标签
all_probs = np.concatenate(all_probs)    # 合并所有批次概率

# 计算每个类别的 AUC
try:
    auc = roc_auc_score(all_labels, all_probs, multi_class='ovr')
    print(f"测试集 AUC 值: {auc:.4f}")
except ValueError as e:
    print(f"AUC 计算失败: {e}")

  model.load_state_dict(torch.load(model_path))


使用设备: cuda
测试集准确率: 0.5000
测试集 AUC 值: 0.5678
