In [None]:
import os
import cv2
import numpy as np
from torch.utils.data import Dataset


创建自定义数据集类

In [None]:
class BreastCancerDataset(Dataset):
    """
    乳腺癌数据集
    """

    def __init__(self, img_dir, mask_dir, transform=None):
        self.img_dir = img_dir
        self.mask_dir = mask_dir
        self.transform = transform

    def __len__(self):
        return len(os.listdir(self.img_dir))

    def __getitem__(self, index):
        img_path = os.path.join(self.img_dir, f"benign({index+1}).png")
        mask_path = os.path.join(self.mask_dir, f"benign({index+1})_mask.png")

        # 读取图像和MASK
        image = cv2.imread(img_path, cv2.IMREAD_COLOR)
        mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)

        # 数据增强
        if self.transform:
            augmented = self.transform(image=image, mask=mask)
            image = augmented['image']
            mask = augmented['mask']

        # 将图像和MASK转换为PyTorch张量
        image = image.transpose(2, 0, 1)
        mask = mask.reshape(1, mask.shape[0], mask.shape[1])
        image = image.astype(np.float32) / 255.0
        mask = mask.astype(np.float32)

        return image, mask, 0


使用自定义数据集类加载数据并进行数据增强

In [None]:
import albumentations as A
from albumentations.pytorch import ToTensorV2

train_transform = A.Compose([
    A.HorizontalFlip(p=0.5),
    A.RandomBrightnessContrast(p=0.2),
    A.Normalize(),
    ToTensorV2()
])

# 使用您的图像和MASK文件夹路径替换 "your_image_dir" 和 "your_mask_dir"
dataset = BreastCancerDataset("", "", transform=train_transform)


划分数据集为训练集和验证集

In [None]:
from sklearn.model_selection import train_test_split

train_dataset, val_dataset = train_test_split(dataset, test_size=0.2, random_state=42)


定义数据加载器

In [None]:
from torch.utils.data import DataLoader

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False, num_workers=4)


实例化U-Net模型

In [None]:
import torch
import torch.nn as nn
from torchvision.models.segmentation import fcn_resnet50

model = fcn_resnet50(pretrained=False, num_classes=3)
model = model.cuda()


定义损失函数和优化器

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)


编写训练循环

In [None]:
from sklearn.metrics import f1_score

num_epochs = 20

for epoch in range(num_epochs):
    print(f"Epoch {epoch + 1}/{num_epochs}")
    print("-" * 10)

    for phase in ["train", "val"]:
        if phase == "train":
            model.train()
            dataloader = train_loader
        else:
            model.eval()
            dataloader = val_loader

        running_loss = 0.0
        running_corrects = 0
        running_f1_score = 0.0

        for images, masks, labels in dataloader:
            images = images.cuda()
            masks = masks.cuda()
            labels = labels.cuda()

            optimizer.zero_grad()

            with torch.set_grad_enabled(phase == "train"):
                outputs = model(images)
                _, preds = torch.max(outputs, 1)
                loss = criterion(outputs, masks)

                if phase == "train":
                    loss.backward()
                    optimizer.step()

            running_loss += loss.item() * images.size(0)
            running_corrects += torch.sum(preds == masks.data)
            running_f1_score += f1_score(masks.cpu().numpy().ravel(), preds.cpu().numpy().ravel(), average="macro")

        epoch_loss = running_loss / len(dataloader.dataset)
        epoch_acc = running_corrects.double() / len(dataloader.dataset)
        epoch_f1_score = running_f1_score / len(dataloader)

        print(f"{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f} F1: {epoch_f1_score:.4f}")

print("Training complete")


In [None]:
def evaluate(dataloader, model):
    # 设置评估模式
    model.eval()

    running_loss = 0.0
    running_corrects = 0
    running_f1_score = 0.0

    # 遍历数据
    for inputs, masks, labels in dataloader:
        ...

    # 计算epoch loss, acc 和 f1 score
    epoch_loss = running_loss / len(dataloader.dataset)
    epoch_acc = running_corrects.double() / len(dataloader.dataset)
    epoch_f1_score = running_f1_score / len(dataloader)

    return epoch_loss, epoch_acc, epoch_f1_score

for epoch in range(num_epochs):
    # 打印epoch信息
    print(f'Epoch {epoch + 1}/{num_epochs}')
    print('-' * 10)

    for phase in ['train', 'val']:
        # 训练模式
        if phase == 'train':
            model.train()
            dataloader = train_loader

        # 验证模式
        else:
            # 计算验证集loss, acc 和 f1 score
            epoch_loss, epoch_acc, epoch_f1_score = evaluate(val_loader, model)

        # 打印训练或验证指标
        print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc*100:.2f}% F1: {epoch_f1_score*100:.2f}%')

    # 在最后一个epoch,在训练集上计算模型指标,检查过拟合情况
    if epoch == num_epochs - 1:
        epoch_loss, epoch_acc, epoch_f1_score = evaluate(train_loader, model)
        print(f'train Loss: {epoch_loss:.4f} Acc: {epoch_acc*100:.2f}% F1: {epoch_f1_score*100:.2f}%')

    print('Training complete')
    # 保存最终模型参数
    torch.save(model.state_dict(), 'model.pth')
