In [1]:
# import drive files
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
import pandas as pd

# 原始 Excel 文件路径
input_path = "/content/drive/MyDrive/PCC2/have_birads/GDPH_SYSUCC/BIRADS&FOLD.xlsx"

# 读取 Excel 文件
df = pd.read_excel(input_path)

# 重命名列名
df.rename(columns={"BIRADS-reader1": "type"}, inplace=True)

# 新 CSV 文件保存路径
output_path = "/content/drive/MyDrive/PCC2/have_birads/GDPH_SYSUCC/BIRADS_FOLD_renamed.csv"

# 保存为 CSV 格式
df.to_csv(output_path, index=False)

print("✅ 文件已成功保存为 CSV 格式，路径为：", output_path)


✅ 文件已成功保存为 CSV 格式，路径为： /content/drive/MyDrive/PCC2/have_birads/GDPH_SYSUCC/BIRADS_FOLD_renamed.csv


birads转换

In [4]:
import pandas as pd

# 原始 CSV 文件路径
input_path = "/content/drive/MyDrive/PCC2/have_birads/GDPH_SYSUCC/BIRADS_FOLD_renamed.csv"

# 读取 CSV 文件
df = pd.read_csv(input_path)

# 将 "type" 列统一转换为小写字符串并去除空格
df['type'] = df['type'].astype(str).str.lower().str.strip()

# 定义四分类映射规则
mapping = {
    "1": 0,   # Normal
    "2": 1,   # Benign
    "3": 1,   # Benign
    "4": 2,   # Suspicious
    "4a": 2,  # Suspicious
    "4b": 2,  # Suspicious
    "4c": 2,  # Suspicious
    "5": 3    # Malignant
}

# 映射并创建新列
df['label'] = df['type'].map(mapping)

# 转换为整数类型
df['label'] = df['label'].astype(int)

# 删除原始 type 列并将 label 重命名为 type
df.drop(columns=['type'], inplace=True)
df.rename(columns={'label': 'type'}, inplace=True)

# 保存新的 CSV 文件
output_path = "/content/drive/MyDrive/PCC2/have_birads/GDPH_SYSUCC/BIRADS_FOLD_4class.csv"
df.to_csv(output_path, index=False)

print("✅ 四分类处理完成，新文件已保存到：", output_path)


✅ 四分类处理完成，新文件已保存到： /content/drive/MyDrive/PCC2/have_birads/GDPH_SYSUCC/BIRADS_FOLD_4class.csv


Data Import

In [2]:
import pandas as pd
import os
import numpy as np
import torch
import matplotlib.image as mpimg
import numpy as np
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
import cv2
from skimage import io, color
from skimage import color as skcolor
from torchvision import transforms
import torchvision.transforms as T
from PIL import Image
import torchvision.transforms.functional as F
!pip install tensorboardX

Collecting tensorboardX
  Downloading tensorboardX-2.6.2.2-py2.py3-none-any.whl.metadata (5.8 kB)
Downloading tensorboardX-2.6.2.2-py2.py3-none-any.whl (101 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/101.7 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━[0m [32m92.2/101.7 kB[0m [31m8.4 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m101.7/101.7 kB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: tensorboardX
Successfully installed tensorboardX-2.6.2.2


5 折交叉验证代码

*DBTDATA*旧的

In [6]:
# resnet50_5fold_training.py

import os
import cv2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from PIL import Image
import matplotlib.image as mpimg

import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as T
import torchvision.models as models
from torch.utils.data import Dataset, DataLoader
from torch.optim.lr_scheduler import ReduceLROnPlateau

from torch.utils.tensorboard import SummaryWriter

# --- Dataset class with 5-fold support ---
class DBTData(Dataset):
    def __init__(self, csv_path, image_root, fold=0, mode='train', crop_to=(224, 224), resize_to=(256, 256), color=True):
        self._resize_to = resize_to
        self._color = color
        self.mode = mode

        df = pd.read_csv(csv_path)
        df["label"] = df["type"]  # 使用已有四分类标签列

        if mode == "train":
            df = df[df["fold"] != fold].reset_index(drop=True)
        elif mode == "valid":
            df = df[df["fold"] == fold].reset_index(drop=True)

        self.df = df
        self.image_root = image_root
        self.x = []
        self.y = []

        for i in range(len(df)):
            img_id = df.at[i, "ID"]
            label = df.at[i, "label"]
            image_path = os.path.join(image_root, img_id + ".png")
            if not os.path.exists(image_path):
                image_path = os.path.join(image_root, img_id + ".jpg")

            if os.path.exists(image_path):
                img = mpimg.imread(image_path)
                if len(img.shape) == 3:
                    img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
                self.x.append(img)
                self.y.append(label)

        self.y = torch.tensor(self.y)

    def __getitem__(self, idx):
        x = self.x[idx]
        y = self.y[idx]
        if self._color:
            x = np.stack([x] * 3, axis=-1)  # Grayscale to RGB

        x = Image.fromarray(x.astype(np.uint8))

        transform = T.Compose([
            T.Resize(self._resize_to),
            T.ToTensor()
        ])

        x = transform(x)
        return x, y

    def __len__(self):
        return len(self.y)

# --- Training and validation ---
def train_fold(fold, csv_path, image_root, num_classes=4, batch_size=32, num_workers=0, device='cuda'):
    print(f"\n======== Fold {fold} ========")

    train_set = DBTData(csv_path, image_root, fold=fold, mode='train')
    valid_set = DBTData(csv_path, image_root, fold=fold, mode='valid')

    train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=num_workers)
    valid_loader = DataLoader(valid_set, batch_size=batch_size, shuffle=False, num_workers=num_workers)

    model = models.resnet50(pretrained=True)
    model.fc = nn.Linear(model.fc.in_features, num_classes)
    model.to(device)

    optimizer = torch.optim.Adam(model.parameters(), lr=1e-4, weight_decay=1e-8)
    scheduler = ReduceLROnPlateau(optimizer, patience=5)
    criterion = nn.CrossEntropyLoss()

    writer = SummaryWriter(log_dir=f'./runs/fold{fold}')

    best_val_loss = float('inf')

    for epoch in range(100):
        model.train()
        train_loss, train_acc = [], []
        for x, y in train_loader:
            x, y = x.to(device), y.to(device)
            optimizer.zero_grad()
            y_pred = model(x)
            loss = criterion(y_pred, y)
            loss.backward()
            optimizer.step()

            train_loss.append(loss.item())
            train_acc.append((y_pred.argmax(dim=1) == y).float().mean().item())

        val_loss, val_acc = [], []
        model.eval()
        with torch.no_grad():
            for x, y in valid_loader:
                x, y = x.to(device), y.to(device)
                y_pred = model(x)
                loss = criterion(y_pred, y)
                val_loss.append(loss.item())
                val_acc.append((y_pred.argmax(dim=1) == y).float().mean().item())

        avg_train_loss = np.mean(train_loss)
        avg_val_loss = np.mean(val_loss)
        avg_train_acc = np.mean(train_acc)
        avg_val_acc = np.mean(val_acc)
        scheduler.step(avg_val_loss)

        print(f"Epoch {epoch}: Train Loss {avg_train_loss:.4f}, Acc {avg_train_acc:.4f} | Val Loss {avg_val_loss:.4f}, Acc {avg_val_acc:.4f}")

        writer.add_scalar('Loss/train', avg_train_loss, epoch)
        writer.add_scalar('Loss/valid', avg_val_loss, epoch)
        writer.add_scalar('Acc/train', avg_train_acc, epoch)
        writer.add_scalar('Acc/valid', avg_val_acc, epoch)

        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            torch.save(model.state_dict(), f"best_model_fold{fold}.pth")

# --- Entry Point ---
if __name__ == '__main__':
    csv_path = "/content/drive/MyDrive/PCC2/have_birads/GDPH_SYSUCC/BIRADS_FOLD_4class.csv"
    image_root = "/content/drive/MyDrive/PCC2/have_birads/GDPH_SYSUCC/data"

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    for fold in range(5):
        train_fold(fold, csv_path, image_root, device=device)




KeyboardInterrupt: 

In [None]:
utils.py

In [3]:
import torch
from torch.utils.data.dataset import Dataset
import torchvision.transforms as transforms
from torchvision.transforms.transforms import CenterCrop, Grayscale
import pandas as pd
from glob import glob
from PIL import Image
import numpy as np
import random
import os
import cv2
import sys
sys.path.append("/content/drive/MyDrive/PCC2/HoVerTrans-main/HoVerTrans-main")


class AddGaussianNoise(object):
    def __init__(self, mean=0.0, variance=1.0, amplitude=1.0, p=1):
        self.mean = mean
        self.variance = variance
        self.amplitude = amplitude
        self.p = p

    def __call__(self, img):
        if random.uniform(0, 1) < self.p:
            img = np.array(img)
            h, w = img.shape
            N = self.amplitude * np.random.normal(loc=self.mean, scale=self.variance, size=(h, w))
            img = N + img
            img[img > 255] = 255
            img = Image.fromarray(img.astype('uint8')).convert('L')
            return img
        else:
            return img

class AddBlur(object):
    def __init__(self, kernel=3, p=1):
        self.kernel = kernel
        self.p = p

    def __call__(self, img):
        if random.uniform(0, 1) < self.p:
            img = np.array(img)
            img = cv2.blur(img, (self.kernel, self.kernel))
            img = Image.fromarray(img.astype('uint8')).convert('L')
            return img
        else:
            return img

class BIRADSDataset(Dataset):
    def __init__(self, root, transform, csv_path):
        super().__init__()
        self.data_root = root
        self.transform = transform
        self.data = pd.read_csv(csv_path)

    def __getitem__(self, index):
        row = self.data.iloc[index]

        # 自动补充扩展名
        base_name = row['ID']
        for ext in ['.jpg', '.png', '.jpeg']:
            file_path = os.path.join(self.data_root, base_name + ext)
            if os.path.exists(file_path):
                break
        else:
            raise FileNotFoundError(f"{base_name} with any known extension not found")

        label = int(row['type'])  # 根据你的CSV列名可能是 'type'

        img = Image.open(file_path)
        if self.transform is not None:
            img = self.transform(img)

        return {'imgs': img, 'labels': label, 'names': base_name}

    def __len__(self):
        return len(self.data)


def get_dataset(imgpath, csvpath, img_size, mode='train'):
    train_transform = transforms.Compose([
        transforms.Resize((img_size, img_size)),
        transforms.Grayscale(),
        transforms.CenterCrop((img_size, img_size)),
        AddGaussianNoise(amplitude=random.uniform(0, 1), p=0.5),
        AddBlur(kernel=3, p=0.5),
        transforms.RandomHorizontalFlip(),
        transforms.ColorJitter(brightness=(0.5, 2), contrast=(0.5, 2)),
        transforms.ToTensor(),
    ])
    test_transform = transforms.Compose([
        transforms.Resize((img_size, img_size)),
        transforms.Grayscale(),
        transforms.ToTensor(),
    ])

    transform = train_transform if mode == 'train' else test_transform
    dataset = BIRADSDataset(imgpath, transform, csvpath)
    return dataset

def confusion_matrix(preds, labels, conf_matrix):
    preds = torch.flatten(preds)
    labels = torch.flatten(labels)
    for p, t in zip(preds, labels):
        conf_matrix[int(p), int(t)] += torch.tensor(1)
    return conf_matrix


config.py

修改

In [4]:
import argparse
import sys
sys.path.append("/content/drive/MyDrive/PCC2/HoVerTrans-main/HoVerTrans-main")
def config(args=None):  # ✅ 添加 args=None
    parser = argparse.ArgumentParser()

    # Paths
    parser.add_argument('--data_path', type=str, required=True, help='Directory containing image files')
    parser.add_argument('--csv_path', type=str, required=True, help='CSV file path with ID and type columns')

    # Output
    parser.add_argument('--model_name', type=str, default='hovertrans')
    parser.add_argument('--model_path', type=str, default='./weight')
    parser.add_argument('--writer_comment', type=str, default='BIRADS4CLASS')
    parser.add_argument('--save_model', type=bool, default=True)

    # Basic training config
    parser.add_argument('--img_size', type=int, default=256)
    parser.add_argument('--batch_size', type=int, default=32)
    parser.add_argument('--class_num', type=int, default=4)  # Four-class classification
    parser.add_argument('--fold', type=int, default=5)
    parser.add_argument('--epochs', type=int, default=150)
    parser.add_argument('--log_step', type=int, default=5)
    parser.add_argument('--lr', type=float, default=0.0001)

    # Model architecture (HoVer-Transformer)
    parser.add_argument('--patch_size', type=list, default=[2, 2, 2, 2])
    parser.add_argument('--hover_size', type=list, default=[2, 2, 2, 2])
    parser.add_argument('--dim', type=list, default=[4, 8, 16, 32])
    parser.add_argument('--depth', type=list, default=[2, 4, 4, 2])
    parser.add_argument('--num_heads', type=list, default=[2, 4, 8, 16])
    parser.add_argument('--num_inner_head', type=list, default=[2, 4, 8, 16])

    # Optimization setup
    parser.add_argument('--loss_function', type=str, default='CE')
    parser.add_argument('--optimizer', type=str, default='AdamW', choices=['SGD', 'Adam', 'AdamW'])
    parser.add_argument('--scheduler', type=str, default='cosine', choices=['cosine', 'step'])
    parser.add_argument('--warmup_epochs', type=int, default=10)
    parser.add_argument('--warmup_decay', type=float, default=0.01)
    parser.add_argument('--min_lr', type=float, default=1e-6)
    parser.add_argument('--step', type=int, default=5)

    return parser.parse_args(args)  # ✅ 用 args 来解析传入的参数


hovertrans.py

In [5]:
import torch
import torch.nn as nn
from torch.nn.init import trunc_normal_
import sys
sys.path.append("/content/drive/MyDrive/PCC2/HoVerTrans-main/HoVerTrans-main")


def drop_path(x, drop_prob: float = 0., training: bool = False):
    if drop_prob == 0. or not training:
        return x
    keep_prob = 1 - drop_prob
    shape = (x.shape[0],) + (1,) * (x.ndim - 1)  # work with diff dim tensors, not just 2D ConvNets
    random_tensor = keep_prob + torch.rand(shape, dtype=x.dtype, device=x.device)
    random_tensor.floor_()  # binarize
    output = x.div(keep_prob) * random_tensor
    return output


class DropPath(nn.Module):
    """Drop paths (Stochastic Depth) per sample  (when applied in main path of residual blocks).
    """
    def __init__(self, drop_prob=None):
        super(DropPath, self).__init__()
        self.drop_prob = drop_prob

    def forward(self, x):
        return drop_path(x, self.drop_prob, self.training)


class Attention(nn.Module):
    def __init__(self, dim, hidden_dim, num_heads=8, qkv_bias=False, attn_drop=0., proj_drop=0.):
        super().__init__()
        self.hidden_dim = hidden_dim
        self.num_heads = num_heads
        head_dim = dim // num_heads
        self.head_dim = head_dim
        self.scale = head_dim ** -0.5

        self.qk = nn.Linear(dim, dim * 2, bias=qkv_bias)
        self.v = nn.Linear(dim, dim, bias=qkv_bias)
        self.attn_drop = nn.Dropout(attn_drop, inplace=False)
        self.proj = nn.Linear(dim, dim)
        self.proj_drop = nn.Dropout(proj_drop, inplace=True)

    def forward(self, x, relative_pos=None):
        B, N, C = x.shape
        qk = self.qk(x).reshape(B, N, 2, self.num_heads, self.head_dim).permute(2, 0, 3, 1, 4)
        q, k = qk.unbind(0)   # make torchscript happy (cannot use tensor as tuple)
        v = self.v(x).reshape(B, N, self.num_heads, -1).permute(0, 2, 1, 3)

        attn = (q @ k.transpose(-2, -1)) * self.scale
        if relative_pos is not None:
            attn += relative_pos
        attn = attn.softmax(dim=-1)
        attn = self.attn_drop(attn)

        x = (attn @ v).transpose(1, 2).reshape(B, N, -1)
        x = self.proj(x)
        x = self.proj_drop(x)
        return x

class Mlp(nn.Module):
    def __init__(self, in_features, hidden_features=None, out_features=None, act_layer=nn.GELU, drop=0.):
        super().__init__()
        out_features = out_features or in_features
        hidden_features = hidden_features or in_features
        drop_probs = (drop, drop)

        self.fc1 = nn.Linear(in_features, hidden_features)
        self.act = act_layer()
        self.drop1 = nn.Dropout(drop_probs[0])
        self.fc2 = nn.Linear(hidden_features, out_features)
        self.drop2 = nn.Dropout(drop_probs[1])

    def forward(self, x):
        x = self.fc1(x)
        x = self.act(x)
        x = self.drop1(x)
        x = self.fc2(x)
        x = self.drop2(x)
        return x

class Merge(nn.Module):
    def __init__(self, in_dim, out_dim, patch_size, in_chans=3):
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_dim*2, in_dim*4, kernel_size=1, stride=1),
            nn.BatchNorm2d(in_dim*4),
            nn.ReLU(inplace=True),
            nn.Conv2d(in_dim*4, in_dim*4, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(in_dim*4),
            nn.ReLU(inplace=True),
            nn.Conv2d(in_dim*4, out_dim, kernel_size=1, stride=1),
            nn.BatchNorm2d(out_dim),
            nn.ReLU(inplace=True),
            )
        self.in_dim = in_dim
        self.patch_size = patch_size
        self.norm_in = nn.LayerNorm(in_dim)
        self.norm_out = nn.LayerNorm(out_dim)

    def forward(self, pixel_embed1, pixel_embed2):
        H_p = W_p = self.patch_size
        W_column = pixel_embed1.size(1)
        BW_column, H_row, _ = pixel_embed2.size()
        B = BW_column // W_column
        assert H_row == W_column

        img1 = pixel_embed1.reshape(B, H_row, W_column, H_p, W_p, self.in_dim).permute(0, 5, 1, 3, 2, 4).reshape(B, self.in_dim, H_row*H_p, W_column*W_p)
        img2 = pixel_embed2.reshape(B, H_row, W_column, H_p, W_p, self.in_dim).permute(0, 5, 1, 3, 2, 4).reshape(B, self.in_dim, H_row*H_p, W_column*W_p)
        img_reshaped = torch.cat([img1, img2], dim=1)
        img_merge = self.conv(img_reshaped)

        return img_merge

class Block(nn.Module):
    def __init__(self, dim, words_in_sentence, patch_size, sentences, in_chans=3, num_heads=2, num_inner_heads=4, mlp_ratio=4.,
            qkv_bias=False, drop=0., attn_drop=0., drop_path=0., act_layer=nn.GELU, norm_layer=nn.LayerNorm):
        super().__init__()
        # Inner transformer
        self.patch_size = patch_size
        words = patch_size * patch_size
        self.norm_in = norm_layer(dim*words)
        self.attn_in1 = Attention(
            dim*words, dim*words, num_heads=num_inner_heads, qkv_bias=qkv_bias,
            attn_drop=attn_drop, proj_drop=drop)
        self.attn_in2 = Attention(
            dim*words, dim*words, num_heads=num_inner_heads, qkv_bias=qkv_bias,
            attn_drop=attn_drop, proj_drop=drop)
        self.norm_mlp_in = norm_layer(dim*words)
        self.mlp_in1 = Mlp(in_features=dim*words, hidden_features=int(dim*words * 4),
            out_features=dim*words, act_layer=act_layer, drop=drop)
        self.mlp_in2 = Mlp(in_features=dim*words, hidden_features=int(dim*words * 4),
            out_features=dim*words, act_layer=act_layer, drop=drop)

        self.norm_proj = norm_layer(dim*words)
        self.proj1 = nn.Linear(dim*words, dim*words, bias=True)
        self.proj2 = nn.Linear(dim*words, dim*words, bias=True)

        # Outer transformer
        self.norm_out = norm_layer(dim * words_in_sentence)
        self.attn_out1 = Attention(
            dim * words_in_sentence, dim * words_in_sentence, num_heads=num_heads, qkv_bias=qkv_bias,
            attn_drop=attn_drop, proj_drop=drop)
        self.attn_out2 = Attention(
            dim * words_in_sentence, dim * words_in_sentence, num_heads=num_heads, qkv_bias=qkv_bias,
            attn_drop=attn_drop, proj_drop=drop)
        self.drop_path = DropPath(drop_path) if drop_path > 0. else nn.Identity()

        self.norm_mlp = norm_layer(dim * words_in_sentence)
        self.mlp1 = Mlp(in_features=dim * words_in_sentence, hidden_features=int(dim * words_in_sentence * mlp_ratio),
            out_features=dim * words_in_sentence, act_layer=act_layer, drop=drop)
        self.mlp2 = Mlp(in_features=dim * words_in_sentence, hidden_features=int(dim * words_in_sentence * mlp_ratio),
            out_features=dim * words_in_sentence, act_layer=act_layer, drop=drop)
        # self.relative_pos1 = nn.Parameter(torch.randn(1, num_heads, sentences, sentences))
        # self.relative_pos2 = nn.Parameter(torch.randn(1, num_heads, sentences, sentences))

    def forward(self, pixel_embed1, pixel_embed2, row_embed, column_embed, relative_pos=None):
        _, W_grid, _ = pixel_embed1.size()
        H_grid = W_grid
        H_p = W_p = self.patch_size
        B, N, C = row_embed.size()

        # outer
        assert N == H_grid
        row_embed = row_embed + self.drop_path(self.attn_out1(self.norm_out(row_embed)))
        row_embed = row_embed + self.drop_path(self.mlp1(self.norm_mlp(row_embed)))

        assert N == W_grid
        column_embed = column_embed + self.drop_path(self.attn_out2(self.norm_out(column_embed)))
        column_embed = column_embed + self.drop_path(self.mlp2(self.norm_mlp(column_embed)))

        # inner
        pixel_embed1 = pixel_embed1 + self.proj1(self.norm_proj(row_embed.reshape(B*H_grid, H_p, W_grid, W_p, -1).transpose(1, 2).reshape(B*H_grid, W_grid, -1)))
        attn_patch1 = self.attn_in1(self.norm_in(pixel_embed1.reshape(B, H_grid*W_grid, -1)))
        pixel_embed1 = pixel_embed1 + self.drop_path(attn_patch1.reshape(B*H_grid, W_grid, -1))
        pixel_embed1 = pixel_embed1 + self.proj2(self.norm_proj(column_embed.reshape(B, W_grid, H_grid, -1).transpose(1, 2).reshape(B*H_grid, W_grid, -1)))
        attn_patch2 = self.attn_in2(self.norm_in(pixel_embed1.reshape(B, H_grid*W_grid, -1)))
        pixel_embed1 = pixel_embed1 + self.drop_path(attn_patch2.reshape(B*H_grid, W_grid, -1))
        pixel_embed1 = pixel_embed1 + self.drop_path(self.mlp_in1(self.norm_mlp_in(pixel_embed1)))

        pixel_embed2 = pixel_embed2 + self.proj2(self.norm_proj(column_embed.reshape(B, W_grid, H_grid, -1).transpose(1, 2).reshape(B*H_grid, W_grid, -1)))
        attn_patch3 = self.attn_in2(self.norm_in(pixel_embed2.reshape(B, H_grid*W_grid, -1)))
        pixel_embed2 = pixel_embed2 + self.drop_path(attn_patch3.reshape(B*H_grid, W_grid, -1))
        pixel_embed2 = pixel_embed2 + self.proj1(self.norm_proj(row_embed.reshape(B*H_grid, H_p, W_grid, W_p, -1).transpose(1, 2).reshape(B*H_grid, W_grid, -1)))
        attn_patch4 = self.attn_in1(self.norm_in(pixel_embed2.reshape(B, H_grid*W_grid, -1)))
        pixel_embed2 = pixel_embed2 + self.drop_path(attn_patch4.reshape(B*H_grid, W_grid, -1))
        pixel_embed2 = pixel_embed2 + self.drop_path(self.mlp_in2(self.norm_mlp_in(pixel_embed2)))

        return pixel_embed1, pixel_embed2, row_embed, column_embed
class ToEmbed(nn.Module):
    def __init__(self, img_size=256, in_chans=3, patch_size=2, dim=8):
        super().__init__()
        img_size_tuple = (img_size, img_size)
        row_patch_size = (patch_size, img_size)
        self.grid_size = (img_size_tuple[0] // row_patch_size[0], img_size_tuple[1] // row_patch_size[1])
        num_patches = self.grid_size[0] * self.grid_size[1]
        self.patch_size = patch_size
        self.img_size = img_size_tuple
        self.num_patches = num_patches
        self.row_patch_size = row_patch_size
        self.dim = dim
        row_pixel = row_patch_size[0] * row_patch_size[1]

        self.unfold = nn.Unfold(kernel_size=patch_size, stride=patch_size)
        self.norm_proj = nn.LayerNorm(row_pixel * dim)
        self.proj1 = nn.Linear(row_pixel * dim, row_pixel * dim)
        self.proj2 = nn.Linear(row_pixel * dim, row_pixel * dim)

    def forward(self, x, pixel_pos=None):
        B, C, H, W = x.shape
        assert H == self.img_size[0]
        assert W == self.img_size[1]

        x = self.unfold(x)
        if pixel_pos is not None:
            x = x + pixel_pos
        x = x.transpose(1, 2).reshape(B , self.num_patches, self.num_patches, self.dim, self.patch_size, self.patch_size)

        pixel_embed = x.permute(0, 1, 2, 4, 5, 3).reshape(B * self.num_patches, -1, self.patch_size*self.patch_size*self.dim)
        row_embed = self.norm_proj(x.permute(0, 1, 4, 2, 5, 3).reshape(B, self.num_patches, -1))
        column_embed =  self.norm_proj(x.permute(0, 2, 1, 4, 5, 3).reshape(B, self.num_patches, -1))

        return pixel_embed, pixel_embed, row_embed, column_embed

class Stage(nn.Module):
    def __init__(self, img_size, patch_size, in_chans, dim, out_dim, num_heads=2, num_inner_head=2, depth=1,
                     mlp_ratio=4., qkv_bias=False, drop_rate=0., attn_drop_rate=0., drop_path_rate=0.1,
                     norm_layer=nn.LayerNorm):
        super().__init__()

        self.pixel_embed = ToEmbed(img_size=img_size, patch_size=patch_size, in_chans=in_chans, dim=dim)
        row_patch_size = self.pixel_embed.row_patch_size
        self.row_pixel = row_patch_size[0] * row_patch_size[1]
        self.patch_pixel = patch_size*patch_size
        self.num_patches = self.pixel_embed.num_patches

        blocks = []
        dpr = [x.item() for x in torch.linspace(0, drop_path_rate, depth)]  # stochastic depth decay rule
        for i in range(depth):
            blocks.append(Block(
                dim=dim, words_in_sentence=self.row_pixel, num_heads=num_heads, num_inner_heads=num_inner_head,
                mlp_ratio=mlp_ratio, qkv_bias=qkv_bias, drop=drop_rate, attn_drop=attn_drop_rate, in_chans=in_chans,
                drop_path=dpr[i], norm_layer=norm_layer, patch_size=patch_size, sentences=self.num_patches))
        self.blocks = nn.ModuleList(blocks)
        self.merge = Merge(in_dim=dim, out_dim=out_dim, patch_size=patch_size, in_chans=in_chans,)

        self.pos_drop = nn.Dropout(p=drop_rate)

    def forward(self, x, pixel_pos=None, row_pos=None, column_pos=None):
        pixel_embed1, pixel_embed2, row_embed, column_embed = self.pixel_embed(x, pixel_pos)
        if row_pos is not None:
            row_embed = row_embed + row_pos
        row_embed = self.pos_drop(row_embed)
        if column_pos is not None:
            column_embed = column_embed + column_pos
        column_embed = self.pos_drop(column_embed)
        for blk in self.blocks:
            pixel_embed1, pixel_embed2, row_embed, column_embed = blk(pixel_embed1, pixel_embed2, row_embed, column_embed)
        img_merge = self.merge(pixel_embed1, pixel_embed2)

        return img_merge

class HoverTrans(nn.Module):
    def __init__(self, img_size=224, patch_size=32, in_chans=3, num_classes=4, embed_dim=768, dim=48, depth=12,
                 num_heads=12, num_inner_head=4, mlp_ratio=4., drop_rate=0., attn_drop_rate=0.,
                 drop_path_rate=0.1, norm_layer=nn.LayerNorm):
        super().__init__()
        self.num_classes = num_classes
        self.embed_dim = embed_dim

        stride = [4, 2, 2, 2]
        self.stage = nn.ModuleList([])
        self.downsample = nn.ModuleList([])
        for i in range(4):
            if i == 0:
                self.stage.append(Stage(img_size=img_size//stride[i], patch_size=patch_size[i], in_chans=in_chans, dim=dim[i], out_dim=dim[i]*2,
                            depth=depth[i], num_heads=num_heads[i], num_inner_head=num_inner_head[i], mlp_ratio=mlp_ratio, qkv_bias=False,
                            drop_rate=drop_rate, attn_drop_rate=attn_drop_rate, drop_path_rate=drop_path_rate, norm_layer=nn.LayerNorm))
                num_patches = self.stage[i].num_patches
                row_pixel = self.stage[i].row_pixel
                patch_pixel = self.stage[i].patch_pixel
                self.row_pos = nn.Parameter(torch.zeros(1, num_patches, row_pixel * dim[i]))
                self.column_pos = nn.Parameter(torch.zeros(1, num_patches, row_pixel * dim[i]))
                self.pixel_pos = nn.Parameter(torch.zeros(1, dim[i]*patch_pixel, num_patches * num_patches))

                self.downsample.append(nn.Sequential(
                            nn.Conv2d(in_chans, in_chans*2, 3, stride=2, padding=1),
                            nn.BatchNorm2d(in_chans*2),
                            nn.ReLU(inplace=True),
                            nn.Conv2d(in_chans*2, in_chans*4, 3, stride=2, padding=1),
                            nn.BatchNorm2d(in_chans*4),
                            nn.ReLU(inplace=True),
                            nn.Conv2d(in_chans*4, dim[i], 3, stride=1, padding=1),
                            nn.BatchNorm2d(dim[i]),
                            nn.ReLU(inplace=True),
                        ))
            else:
                self.stage.append(Stage(img_size=img_size//(2**(i+2)), patch_size=patch_size[i], in_chans=dim[i], dim=dim[i], out_dim=dim[i]*2,
                            depth=depth[i], num_heads=num_heads[i], num_inner_head=num_inner_head[i], mlp_ratio=mlp_ratio, qkv_bias=False,
                            drop_rate=drop_rate, attn_drop_rate=attn_drop_rate, drop_path_rate=drop_path_rate, norm_layer=nn.LayerNorm))
                self.downsample.append(nn.AvgPool2d(kernel_size=stride[i]))

        self.norm = norm_layer(dim[3]*2)
        self.avgpool = nn.AdaptiveAvgPool2d(1)
        self.head = nn.Linear(dim[3]*2, num_classes)

        trunc_normal_(self.row_pos, std=.02)
        trunc_normal_(self.pixel_pos, std=.02)
        trunc_normal_(self.column_pos, std=.02)
        self.apply(self._init_weights)

    def _init_weights(self, m):
        if isinstance(m, nn.Linear):
            trunc_normal_(m.weight, std=.02)
            if isinstance(m, nn.Linear) and m.bias is not None:
                nn.init.constant_(m.bias, 0)
        elif isinstance(m, nn.LayerNorm):
            nn.init.constant_(m.bias, 0)
            nn.init.constant_(m.weight, 1.0)
        if isinstance(m, nn.Conv2d):
            trunc_normal_(m.weight)
            if isinstance(m, nn.Conv2d) and m.bias is not None:
                trunc_normal_(m.weight)
                nn.init.constant_(m.bias, 0)

    def forward_features(self, x):
        img_ds = self.downsample[0](x)
        img_merge = self.stage[0](img_ds, self.pixel_pos, self.row_pos, self.column_pos)
        for i in range(3):
            img_ds = self.downsample[i+1](img_merge)
            img_merge = self.stage[i+1](img_ds)

        return img_merge

    def forward(self, x):
        output = self.forward_features(x)
        output_flat = self.avgpool(output).flatten(1)
        output_flat = self.norm(output_flat)
        output_flat = self.head(output_flat)

        return output_flat

# ✅ 修改默认分类数为 4

def create_model(embed_dim=640, num_classes=4, **kwargs):
    model = HoverTrans(embed_dim=embed_dim, num_classes=num_classes, **kwargs)
    return model

train.py

In [6]:
import sys
import importlib

sys.path.append('/content/drive/MyDrive/PCC2/HoVerTrans-main/HoVerTrans-main')

# 强制重新加载 config 模块
import config
importlib.reload(config)
from config import config  # ✅ 此时 config 是函数，不是变量


In [7]:
import importlib
import hovertrans
importlib.reload(hovertrans)


<module 'hovertrans' from '/content/drive/MyDrive/PCC2/HoVerTrans-main/HoVerTrans-main/hovertrans.py'>

In [8]:
import importlib
import utils
importlib.reload(utils)

<module 'utils' from '/content/drive/MyDrive/PCC2/HoVerTrans-main/HoVerTrans-main/utils.py'>

In [14]:
import importlib
import valid
importlib.reload(valid)

<module 'valid' from '/content/drive/MyDrive/PCC2/HoVerTrans-main/HoVerTrans-main/valid.py'>

In [15]:
import torch
import os
import torch.nn as nn
import utils
from config import config
import numpy as np
import random
from torch.utils.tensorboard import SummaryWriter
from sklearn.model_selection import KFold
from torch.utils.data import DataLoader, SubsetRandomSampler
from valid import valid
from hovertrans import create_model
from utils import confusion_matrix
import math



def train(config, train_loader, test_loader, fold, test_idx):
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
#MODEL
    model = create_model(
        img_size=config.img_size,
        num_classes=config.class_num,
        drop_rate=0.1,
        attn_drop_rate=0.1,
        patch_size=config.patch_size,
        dim=config.dim,
        depth=config.depth,
        num_heads=config.num_heads,
        num_inner_head=config.num_inner_head
    ).to(device)

    criterion = nn.CrossEntropyLoss().to(device)

    if config.optimizer == 'Adam':
        optimizer = torch.optim.Adam(model.parameters(), lr=config.lr)
    elif config.optimizer == 'AdamW':
        optimizer = torch.optim.AdamW(model.parameters(), lr=config.lr)
    elif config.optimizer == 'SGD':
        optimizer = torch.optim.SGD(model.parameters(), lr=config.lr, momentum=0.9, weight_decay=5e-4)

    if config.scheduler == 'cosine':
        lr_lambda = lambda epoch: (epoch * (1 - config.warmup_decay) / config.warmup_epochs + config.warmup_decay) \
            if epoch < config.warmup_epochs else \
            (1 - config.min_lr / config.lr) * 0.5 * (math.cos((epoch - config.warmup_epochs) /
            (config.epochs - config.warmup_epochs) * math.pi) + 1) + config.min_lr / config.lr
        lr_scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda)
    elif config.scheduler == 'step':
        lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=config.step, gamma=0.9)

    writer = SummaryWriter(comment='_' + config.model_name + '_' + config.writer_comment + '_' + str(fold))

    print("START TRAINING")
    best_acc = 0
    ckpt_path = os.path.join(config.model_path, config.model_name, config.writer_comment)
    model_save_path = os.path.join(ckpt_path, str(fold))

    for epoch in range(config.epochs):
        model.train()
        cm = torch.zeros((config.class_num, config.class_num))
        epoch_loss = 0

        for pack in train_loader:
            images = pack['imgs'].to(device)
            if images.shape[1] == 1:
                images = images.expand((-1, 3, -1, -1))
            labels = pack['labels'].to(device)

            output = model(images)
            loss = criterion(output, labels)

            pred = output.argmax(dim=1)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            epoch_loss += loss.item()
            cm = confusion_matrix(pred.detach(), labels.detach(), cm)

        lr_scheduler.step()

        if (epoch + 1) % config.log_step == 0:
            print('[epoch %d]' % epoch)
            with torch.no_grad():
                val_loss, val_acc, sen, spe, auc, pre, f1score = valid(config, model, test_loader, criterion)
            writer.add_scalar('Val/F1score', f1score, epoch)
            writer.add_scalar('Val/Pre', pre, epoch)
            writer.add_scalar('Val/Spe', spe, epoch)
            writer.add_scalar('Val/Sen', sen, epoch)
            writer.add_scalar('Val/AUC', auc, epoch)
            writer.add_scalar('Val/Acc', val_acc, epoch)
            writer.add_scalar('Val/Val_loss', val_loss, epoch)

            if epoch > config.epochs // 4 and val_acc > best_acc:
                best_acc = val_acc
                print("=> saved best model")
                os.makedirs(model_save_path, exist_ok=True)
                if config.save_model:
                    torch.save(model.state_dict(), os.path.join(model_save_path, 'bestmodel.pth'))
                with open(os.path.join(model_save_path, 'result.txt'), 'w') as f:
                    f.write('Best Result:\n')
                    f.write('Acc: %f, Spe: %f, Sen: %f, AUC: %f, Pre: %f, F1score: %f' % (val_acc, spe, sen, auc, pre, f1score))

        if epoch + 1 == config.epochs:
            with torch.no_grad():
                val_loss, val_acc, sen, spe, auc, pre, f1score = valid(config, model, test_loader, criterion)
            if config.save_model:
                torch.save(model.state_dict(), os.path.join(model_save_path, 'last_epoch_model.pth'))
            with open(os.path.join(model_save_path, 'result.txt'), 'a') as f:
                f.write('\nLast Result:\n')
                f.write('Acc: %f, Spe: %f, Sen: %f, AUC: %f, Pre: %f, F1score: %f' % (val_acc, spe, sen, auc, pre, f1score))

        avg_epoch_loss = epoch_loss / len(train_loader)
        writer.add_scalar('Train/LR', optimizer.param_groups[0]['lr'], epoch)
        writer.add_scalar('Train/Acc', cm.diag().sum() / cm.sum(), epoch)
        writer.add_scalar('Train/Avg_epoch_loss', avg_epoch_loss, epoch)

def seed_torch(seed=1):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.enabled = False

if __name__ == '__main__':
    seed_torch(42)
    #加路径
    args = config([
    '--data_path', '/content/drive/MyDrive/PCC2/have_birads/GDPH_SYSUCC/data',
    '--csv_path', '/content/drive/MyDrive/PCC2/have_birads/GDPH_SYSUCC/BIRADS_FOLD_4class.csv'
])

    cv = KFold(n_splits=args.fold, random_state=42, shuffle=True)
    fold = 0

    train_set = utils.get_dataset(args.data_path, args.csv_path, args.img_size, mode='train')
    test_set = utils.get_dataset(args.data_path, args.csv_path, args.img_size, mode='test')

    save_path = os.path.join(args.model_path, args.model_name, args.writer_comment)
    os.makedirs(save_path, exist_ok=True)
    with open(os.path.join(save_path, 'model_info.txt'), 'w') as f:
        f.write(str(args))

    for train_idx, test_idx in cv.split(train_set):
        print("\nCross validation fold %d" % fold)
        train_loader = DataLoader(train_set, batch_size=args.batch_size, sampler=SubsetRandomSampler(train_idx), num_workers=6)
        test_loader = DataLoader(test_set, batch_size=1, sampler=SubsetRandomSampler(test_idx))
        train(args, train_loader, test_loader, fold, test_idx)
        fold += 1


Cross validation fold 0
START TRAINING
[epoch 4]
START VALIDING
[epoch 9]
START VALIDING
[epoch 14]
START VALIDING
[epoch 19]
START VALIDING
[epoch 24]
START VALIDING
[epoch 29]
START VALIDING
[epoch 34]
START VALIDING
[epoch 39]
START VALIDING
=> saved best model
[epoch 44]
START VALIDING
[epoch 49]
START VALIDING
=> saved best model
[epoch 54]
START VALIDING
[epoch 59]
START VALIDING
[epoch 64]
START VALIDING
[epoch 69]
START VALIDING
=> saved best model
[epoch 74]
START VALIDING
[epoch 79]
START VALIDING
[epoch 84]
START VALIDING
[epoch 89]
START VALIDING
=> saved best model
[epoch 94]
START VALIDING
[epoch 99]
START VALIDING
[epoch 104]
START VALIDING
=> saved best model
[epoch 109]
START VALIDING
[epoch 114]
START VALIDING
[epoch 119]
START VALIDING
[epoch 124]
START VALIDING
[epoch 129]
START VALIDING
[epoch 134]
START VALIDING
[epoch 139]
START VALIDING
=> saved best model
[epoch 144]
START VALIDING
[epoch 149]
START VALIDING
START VALIDING

Cross validation fold 1
START TRAINI

把模型数据存到本地

In [16]:
# Step 1: Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

# Step 2: Import shutil for file operations
import shutil
import os

# Step 3: Define source and target paths
source_path = '/content/weight/hovertrans/BIRADS4CLASS'
target_path = '/content/drive/MyDrive/HoVerTrans_Results/BIRADS4CLASS'

# Step 4: Copy the entire folder to Google Drive
if os.path.exists(target_path):
    print("Target already exists. Overwriting...")
    shutil.rmtree(target_path)

shutil.copytree(source_path, target_path)
print("✅ Model files successfully saved to your Google Drive!")


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
✅ Model files successfully saved to your Google Drive!


valid.py

In [None]:
import torch
from utils import confusion_matrix
from sklearn.metrics import roc_auc_score
import sys
import importlib

sys.path.append('/content/drive/MyDrive/PCC2/HoVerTrans-main/HoVerTrans-main')

def valid(config, net, val_loader, criterion):
    device = next(net.parameters()).device
    net.eval()

    print("START VALIDING")
    epoch_loss = 0
    y_true, y_score = [], []

    cm = torch.zeros((config.class_num, config.class_num))
    for i, pack in enumerate(val_loader):
        images = pack['imgs'].to(device)
        if images.shape[1] == 1:
            images = images.expand((-1, 3, -1, -1))
        names = pack['names']
        labels = pack['labels'].to(device)

        output = net(images)
        loss = criterion(output, labels)

        pred = output.argmax(dim=1)
        y_true.append(labels.detach().cpu().item())

        # 使用预测为该样本属于标签1的概率；可根据需要调整 index
        y_score.append(output.softmax(dim=1).detach().cpu().numpy()[0][1])

        cm = confusion_matrix(pred.detach(), labels.detach(), cm)
        epoch_loss += loss.detach().cpu()

    avg_epoch_loss = epoch_loss / len(val_loader)

    acc = cm.diag().sum() / cm.sum()

    # 宏平均 Sensitivity（Recall）和 Specificity
    sen_per_class = cm.diag() / (cm.sum(dim=1) + 1e-6)
    spe_per_class = cm.diag() / (cm.sum(dim=0) + 1e-6)

    sen = sen_per_class.mean().item()
    spe = spe_per_class.mean().item()
    pre = cm.diag().sum() / (cm.sum(dim=1).sum() + 1e-6)  # 宏平均 Precision
    rec = sen
    f1score = 2 * pre * rec / (pre + rec + 1e-6)

    try:
        auc = roc_auc_score(y_true, y_score, multi_class='ovr')  # one-vs-rest AUC
    except:
        auc = 0.0  # 若某一类没有样本，防止 AUC 报错

    return [avg_epoch_loss, acc.item(), sen, spe, auc, pre, f1score]
