# 使用 data_package 做图像分割示例（DRIVE）

本 Notebook 演示如何使用 `ImageSegmentationDataset` 和 `transforms` 来完成：

1. 构建图像与 mask 路径；
2. train / val / test 划分；
3. 定义适合买分割的 transforms（包括随机翻转）；
4. 构建 DataLoader，并检查 image / mask 的 dtype 和 shape。

模型部分这里只写一个骨架，你可以接上自己的 UNet / DeepLabV3。


In [1]:
import torch
from torch.utils.data import DataLoader
from pathlib import Path

from datasets import ImageSegmentationDataset
from transforms import (
    Compose, Resize, RandomHorizontalFlip, RandomVerticalFlip,RandomRotate90,
    ToTensor, Normalize,
)
from utils import train_val_test_split,compute_channel_mean_std

In [2]:
img_dir = Path("../data/DRIVE/training/images")
mask_dir = Path("../data/DRIVE/training/1st_manual")

image_paths = sorted(str(p) for p in img_dir.glob("*.tif"))
mask_paths = sorted(str(p) for p in mask_dir.glob("*.gif"))

print("num images:", len(image_paths))
print("num masks :", len(mask_paths))
print("first image:", image_paths[0])
print("first mask :", mask_paths[0])


num images: 20
num masks : 20
first image: ../data/DRIVE/training/images/21_training.tif
first mask : ../data/DRIVE/training/1st_manual/21_manual1.gif


In [3]:
(
    train_imgs, val_imgs, test_imgs,
    train_masks, val_masks, test_masks
) = train_val_test_split(
    image_paths, mask_paths,
    train_ratio=0.7, val_ratio=0.15, test_ratio=0.15,
    shuffle=True, seed=42,
)

print("train / val / test:", len(train_imgs), len(val_imgs), len(test_imgs))


train / val / test: 14 3 3


In [4]:
# 1. 建一个“没有 Normalize 的”临时 Dataset
tmp_tf = Compose([
    Resize((224, 224)),
    ToTensor(mask_mode="none"),  # 重要：还不能 Normalize
])

tmp_train_ds = ImageSegmentationDataset(
    image_paths=train_imgs,
    mask_paths=train_masks,
    transform=tmp_tf,
)

# 2. 估计 mean / std（可以只用前几百个 batch）
mean, std = compute_channel_mean_std(tmp_train_ds, batch_size=16, max_batches=50)
print("mean:", mean)
print("std :", std)


mean: tensor([0.4877, 0.2693, 0.1641])
std : tensor([0.3408, 0.1859, 0.1067])


In [5]:
# DRIVE 是二类（血管 / 非血管），mask 像素一般是 0 和 255
# 所以这里用 mask_mode="binary" 即可
train_transform = Compose([
    Resize((224, 224)),
    RandomHorizontalFlip(p=0.5),
    RandomVerticalFlip(p=0.5),
    RandomRotate90(p=0.5),
    ToTensor(mask_mode="binary"),
    Normalize(
        mean=mean.tolist(),
        std=std.tolist(),
    ),
])

eval_transform = Compose([
    Resize((224, 224)),
    ToTensor(mask_mode="binary"),
    Normalize(
        mean=mean.tolist(),
        std=std.tolist(),
    ),
])


In [6]:
train_ds = ImageSegmentationDataset(
    image_paths=train_imgs,
    mask_paths=train_masks,
    transform=train_transform,
)

val_ds = ImageSegmentationDataset(
    image_paths=val_imgs,
    mask_paths=val_masks,
    transform=eval_transform,
)

test_ds = ImageSegmentationDataset(
    image_paths=test_imgs,
    mask_paths=test_masks,
    transform=eval_transform,
)

batch_size = 2

train_loader = DataLoader(train_ds, batch_size, shuffle=True)
val_loader = DataLoader(val_ds, batch_size, shuffle=False)
test_loader = DataLoader(test_ds, batch_size, shuffle=False)

for batch in train_loader:
    print("image:", batch["image"].dtype, batch["image"].shape)  # [B, 3, 224, 224]
    print("mask :", batch["mask"].dtype, batch["mask"].shape)    # [B, 224, 224]
    print("mask unique:", batch["mask"].unique())
    print("meta:", batch["meta"])
    break


image: torch.float32 torch.Size([2, 3, 224, 224])
mask : torch.int64 torch.Size([2, 224, 224])
mask unique: tensor([0, 1])
meta: {'image_path': ['../data/DRIVE/training/images/32_training.tif', '../data/DRIVE/training/images/37_training.tif'], 'mask_path': ['../data/DRIVE/training/1st_manual/32_manual1.gif', '../data/DRIVE/training/1st_manual/37_manual1.gif'], 'index': tensor([12, 10])}


In [7]:
import torch.nn as nn

class DummySegModel(nn.Module):
    def __init__(self, num_classes: int):
        super().__init__()
        # 非常简单的 U-Net 风格占位结构，你可换成真正的实现
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 16, 3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),
            nn.Conv2d(16, 32, 3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),
        )
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(32, 16, kernel_size=2, stride=2),
            nn.ReLU(inplace=True),
            nn.ConvTranspose2d(16, num_classes, kernel_size=2, stride=2),
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x  # [B, num_classes, H, W]，配合 CrossEntropyLoss


num_classes = 2  # DRIVE 是二类分割；多类分割时改成对应类别数
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = DummySegModel(num_classes=num_classes).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)


In [8]:
def train_seg_one_epoch(model, loader, optimizer, criterion, device):
    model.train()
    total_loss = 0.0
    total_pixels = 0

    for batch in loader:
        imgs = batch["image"].to(device)              # [B, 3, H, W]
        masks = batch["mask"].to(device)              # [B, H, W]，long，0~C-1

        optimizer.zero_grad()
        logits = model(imgs)                          # [B, C, H, W]
        loss = criterion(logits, masks)

        loss.backward()
        optimizer.step()

        total_loss += loss.item() * imgs.size(0)
        total_pixels += imgs.size(0)

    return total_loss / total_pixels


for epoch in range(2):
    train_loss = train_seg_one_epoch(model, train_loader, optimizer, criterion, device)
    print(f"Epoch {epoch+1}: train_loss={train_loss:.4f}")


Epoch 1: train_loss=0.5875
Epoch 2: train_loss=0.5530
