In [9]:
import os
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image

class UltrasoundDataset(Dataset):
    def __init__(self, image_dir, mask_dir, transform=None):
        self.image_dir = image_dir
        self.mask_dir = mask_dir
        self.transform = transform
        self.images = os.listdir(image_dir)

    def __len__(self):
        return len(self.images)

class UltrasoundDataset(Dataset):
    def __init__(self, image_dir, mask_dir, transform=None):
        self.image_dir = image_dir
        self.mask_dir = mask_dir
        self.transform = transform
        self.images = os.listdir(image_dir)

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        try:
            img_name = self.images[idx]
            img_path = os.path.join(self.image_dir, img_name)
            mask_path = os.path.join(self.mask_dir, img_name.replace(".png", ".png"))
            
            if not os.path.exists(img_path):
                raise FileNotFoundError(f"图像文件不存在: {img_path}")
            if not os.path.exists(mask_path):
                raise FileNotFoundError(f"掩码文件不存在: {mask_path}")
            
            image = Image.open(img_path).convert("L")  # 转换为灰度图
            mask = Image.open(mask_path).convert("L")  # 确保掩码也是灰度图
            
            if self.transform:
                image = self.transform(image)
                mask = self.transform(mask)
            
            # 将掩码转换为整数类型，并确保值非负
            mask = torch.from_numpy(np.array(mask)).long()
            mask = torch.clamp(mask, min=0, max=5)  # 假设类别范围是0-5
            
            # 将掩码转换为one-hot编码
            mask = torch.nn.functional.one_hot(mask.squeeze().long(), num_classes=6)
            mask = mask.permute(2, 0, 1).float()  # 更改为[C, H, W]格式
            
            return image, mask
        except Exception as e:
            print(f"处理索引 {idx} 的图像时出错: {str(e)}")
            raise e

# 定义转换
transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5])
])

# 创建数据集和数据加载器
train_dataset = UltrasoundDataset(image_dir=r"C:\Users\Ryan\Desktop\標記測試\unet\dataset\left\img", mask_dir=r"C:\Users\Ryan\Desktop\標記測試\unet\dataset\left\mask", transform=transform)
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True, num_workers=0)  # 将num_workers设置为0

# 创建验证数据集和数据加载器
val_dataset = UltrasoundDataset(image_dir=r"C:\Users\Ryan\Desktop\標記測試\unet\dataset\right\img", mask_dir=r"C:\Users\Ryan\Desktop\標記測試\unet\dataset\right\mask", transform=transform)
val_loader = DataLoader(val_dataset, batch_size=4, shuffle=False, num_workers=0)

# 示例用法
for batch_images, batch_masks in train_loader:
    print("批次图像形状:", batch_images.shape)
    print("批次掩码形状:", batch_masks.shape)
    break  # 仅检查第一个批次

批次图像形状: torch.Size([4, 1, 256, 256])
批次掩码形状: torch.Size([4, 6, 256, 256])


In [10]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import transforms

# Define the U-Net model
class UNet(nn.Module):
    def __init__(self, num_classes):
        super(UNet, self).__init__()
        
        def double_conv(in_channels, out_channels):
            return nn.Sequential(
                nn.Conv2d(in_channels, out_channels, 3, padding=1),
                nn.ReLU(inplace=True),
                nn.Conv2d(out_channels, out_channels, 3, padding=1),
                nn.ReLU(inplace=True)
            )
        
        self.down1 = double_conv(1, 64)
        self.down2 = double_conv(64, 128)
        self.down3 = double_conv(128, 256)
        self.down4 = double_conv(256, 512)
        
        self.maxpool = nn.MaxPool2d(2)
        
        self.up1 = double_conv(512+256, 256)
        self.up2 = double_conv(256+128, 128)
        self.up3 = double_conv(128+64, 64)
        self.up4 = double_conv(64, 64)
        
        self.upsample = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
        
        self.outconv = nn.Conv2d(64, num_classes, 1)

    def forward(self, x):
        conv1 = self.down1(x)
        x = self.maxpool(conv1)
        
        conv2 = self.down2(x)
        x = self.maxpool(conv2)
        
        conv3 = self.down3(x)
        x = self.maxpool(conv3)
        
        x = self.down4(x)
        
        x = self.upsample(x)
        x = torch.cat([x, conv3], dim=1)
        x = self.up1(x)
        
        x = self.upsample(x)
        x = torch.cat([x, conv2], dim=1)
        x = self.up2(x)
        
        x = self.upsample(x)
        x = torch.cat([x, conv1], dim=1)
        x = self.up3(x)
        
        x = self.up4(x)
        x = self.outconv(x)
        return x

# Dice loss function
def dice_loss(pred, target, smooth = 1.):
    pred = pred.contiguous()
    target = target.contiguous()    
    intersection = (pred * target).sum(dim=2).sum(dim=2)
    loss = (1 - ((2. * intersection + smooth) / (pred.sum(dim=2).sum(dim=2) + target.sum(dim=2).sum(dim=2) + smooth)))
    return loss.mean()

# Training function
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    
    for epoch in range(num_epochs):
        model.train()
        train_loss = 0.0
        
        for images, masks in train_loader:
            images, masks = images.to(device), masks.to(device)
            
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, masks)
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item() * images.size(0)
        
        train_loss = train_loss / len(train_loader.dataset)
        
        # Validation
        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for images, masks in val_loader:
                images, masks = images.to(device), masks.to(device)
                outputs = model(images)
                loss = criterion(outputs, masks)
                val_loss += loss.item() * images.size(0)
        
        val_loss = val_loss / len(val_loader.dataset)
        
        print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')

# Set up the model, loss function, and optimizer
num_classes = 6  # Adjust based on your dataset
model = UNet(num_classes)
criterion = dice_loss
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Assuming you have already set up your dataset and data loaders
# train_dataset, val_dataset = ...
# train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=4, shuffle=False, num_workers=0)

# Train the model
num_epochs = 50
train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs)

# Save the trained model
torch.save(model.state_dict(), 'unet_model.pth')

KeyboardInterrupt: 

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

def dice_coefficient(pred, target, smooth=1e-6):
    pred = pred.contiguous()
    target = target.contiguous()    
    intersection = (pred * target).sum(dim=2).sum(dim=2)
    dice = (2. * intersection + smooth) / (pred.sum(dim=2).sum(dim=2) + target.sum(dim=2).sum(dim=2) + smooth)
    return dice.mean()

def iou_score(pred, target, smooth=1e-6):
    pred = pred.contiguous()
    target = target.contiguous()
    intersection = (pred * target).sum(dim=2).sum(dim=2)
    total = (pred + target).sum(dim=2).sum(dim=2)
    union = total - intersection 
    iou = (intersection + smooth) / (union + smooth)
    return iou.mean()

def evaluate_model(model, data_loader, device):
    model.eval()
    dice_scores = []
    iou_scores = []
    accuracy_scores = []
    precision_scores = []
    recall_scores = []
    f1_scores = []

    with torch.no_grad():
        for images, masks in data_loader:
            images, masks = images.to(device), masks.to(device)
            outputs = model(images)
            
            # Convert outputs to binary predictions
            preds = torch.argmax(outputs, dim=1)
            
            # Calculate Dice coefficient and IoU
            dice = dice_coefficient(preds, masks)
            iou = iou_score(preds, masks)
            
            # Convert tensors to numpy arrays for sklearn metrics
            preds_np = preds.cpu().numpy().flatten()
            masks_np = masks.cpu().numpy().flatten()
            
            # Calculate additional metrics
            accuracy = accuracy_score(masks_np, preds_np)
            precision = precision_score(masks_np, preds_np, average='weighted')
            recall = recall_score(masks_np, preds_np, average='weighted')
            f1 = f1_score(masks_np, preds_np, average='weighted')
            
            dice_scores.append(dice.item())
            iou_scores.append(iou.item())
            accuracy_scores.append(accuracy)
            precision_scores.append(precision)
            recall_scores.append(recall)
            f1_scores.append(f1)

    return {
        'dice': np.mean(dice_scores),
        'iou': np.mean(iou_scores),
        'accuracy': np.mean(accuracy_scores),
        'precision': np.mean(precision_scores),
        'recall': np.mean(recall_scores),
        'f1': np.mean(f1_scores)
    }

# Load the trained model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = UNet(num_classes=6)  # Adjust num_classes as needed
model.load_state_dict(torch.load('unet_model.pth'))
model.to(device)

# Assuming you have already set up your validation and test datasets and data loaders
# val_loader = DataLoader(val_dataset, batch_size=4, shuffle=False)
# test_loader = DataLoader(test_dataset, batch_size=4, shuffle=False)

# Evaluate on validation set
val_results = evaluate_model(model, val_loader, device)
print("Validation Results:")
for metric, value in val_results.items():
    print(f"{metric.capitalize()}: {value:.4f}")

# Evaluate on test set
test_results = evaluate_model(model, test_loader, device)
print("\nTest Results:")
for metric, value in test_results.items():
    print(f"{metric.capitalize()}: {value:.4f}")

# Optionally, you can save the results to a file
import json
with open('evaluation_results.json', 'w') as f:
    json.dump({'validation': val_results, 'test': test_results}, f, indent=4)