In [11]:
import os

import numpy as np
import torch
import torch.nn as nn
from torch.nn import functional as F
from PIL import Image
from torch.utils.data import Dataset, DataLoader
from torchsummary import summary
from torchvision import transforms

In [2]:
# 检查数据集

Data_ROOT_PATH = './DataSet'
train_dir = os.path.join(Data_ROOT_PATH, 'train')
test_dir = os.path.join(Data_ROOT_PATH, 'test')
val_dir = os.path.join(Data_ROOT_PATH, 'val')

train_img_dir = os.path.join(train_dir, 'image')
train_label_dir = os.path.join(train_dir, 'label')

test_img_dir = os.path.join(test_dir, 'image')
test_label_dir = os.path.join(test_dir, 'label')

val_img_dir = os.path.join(val_dir, 'image')
val_label_dir = os.path.join(val_dir, 'label')

# 输出数据集的大小
print('train_img_dir:', len(os.listdir(train_img_dir)))
print('train_label_dir:', len(os.listdir(train_label_dir)))
print('test_img_dir:', len(os.listdir(test_img_dir)))
print('test_label_dir:', len(os.listdir(test_label_dir)))
print('val_img_dir:', len(os.listdir(val_img_dir)))
print('val_label_dir:', len(os.listdir(val_label_dir)))


train_img_dir: 9420
train_label_dir: 9420
test_img_dir: 3848
test_label_dir: 3848
val_img_dir: 1537
val_label_dir: 1537


In [12]:
class ImageDataset(Dataset):
    def __init__(self, img_dir, label_dir, transform=None):
        self.img_dir = img_dir
        self.label_dir = label_dir
        self.transform = transform
        self.images = os.listdir(img_dir)

    def __len__(self):
        return len(self.images)

    def __getitem__(self, index):
        img_path = os.path.join(self.img_dir, self.images[index])
        label_path = os.path.join(self.label_dir, self.images[index])
        image = np.array(Image.open(img_path).convert('RGB'))
        label = np.array(Image.open(label_path).convert('L'), dtype=np.float32)
        label[label == 255.0] = 1.0
        if self.transform is not None:
            augmentations = self.transform(image=image, mask=label)
            image = augmentations['image']
            label = augmentations['mask']
        return image, label


transform = transforms.Compose([
    transforms.ToTensor(),
])


Batch_size = 16


train_loader = DataLoader(
    ImageDataset(train_img_dir, train_label_dir, transform=transform),
    batch_size=Batch_size,
    shuffle=True,
)
val_loader = DataLoader(
    ImageDataset(val_img_dir, val_label_dir, transform=transform),
    batch_size=Batch_size,
    shuffle=True,
)
test_loader = DataLoader(
    ImageDataset(test_img_dir, test_label_dir, transform=transform),
    batch_size=Batch_size,
    shuffle=True,
)

In [13]:
class DoubleConv(nn.Module):
    """(convolution => [BN] => ReLU) * 2"""

    def __init__(self, in_channels, out_channels, mid_channels=None):
        super().__init__()
        if not mid_channels:
            mid_channels = out_channels
        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, mid_channels, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(mid_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(mid_channels, out_channels, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        return self.double_conv(x)


class Down(nn.Module):
    """Downscaling with maxpool then double conv"""

    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.maxpool_conv = nn.Sequential(
            nn.MaxPool2d(2),
            DoubleConv(in_channels, out_channels)
        )

    def forward(self, x):
        return self.maxpool_conv(x)


class Up(nn.Module):
    """Upscaling then double conv"""

    def __init__(self, in_channels, out_channels, bilinear=True):
        super().__init__()

        # if bilinear, use the normal convolutions to reduce the number of channels
        if bilinear:
            self.up = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
            self.conv = DoubleConv(in_channels, out_channels, in_channels // 2)
        else:
            self.up = nn.ConvTranspose2d(in_channels, in_channels // 2, kernel_size=2, stride=2)
            self.conv = DoubleConv(in_channels, out_channels)

    def forward(self, x1, x2):
        x1 = self.up(x1)
        # input is CHW
        diffY = x2.size()[2] - x1.size()[2]
        diffX = x2.size()[3] - x1.size()[3]

        x1 = F.pad(x1, [diffX // 2, diffX - diffX // 2,
                        diffY // 2, diffY - diffY // 2])
        # if you have padding issues, see
        # https://github.com/HaiyongJiang/U-Net-Pytorch-Unstructured-Buggy/commit/0e854509c2cea854e247a9c615f175f76fbb2e3a
        # https://github.com/xiaopeng-liao/Pytorch-UNet/commit/8ebac70e633bac59fc22bb5195e513d5832fb3bd
        x = torch.cat([x2, x1], dim=1)
        return self.conv(x)


class OutConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(OutConv, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=1)

    def forward(self, x):
        return self.conv(x)


class UNet(nn.Module):
    def __init__(self, n_channels, n_classes, bilinear=False):
        super(UNet, self).__init__()
        self.n_channels = n_channels
        self.n_classes = n_classes
        self.bilinear = bilinear

        self.inc = (DoubleConv(n_channels, 64))
        self.down1 = (Down(64, 128))
        self.down2 = (Down(128, 256))
        self.down3 = (Down(256, 512))
        factor = 2 if bilinear else 1
        self.down4 = (Down(512, 1024 // factor))
        self.up1 = (Up(1024, 512 // factor, bilinear))
        self.up2 = (Up(512, 256 // factor, bilinear))
        self.up3 = (Up(256, 128 // factor, bilinear))
        self.up4 = (Up(128, 64, bilinear))
        self.outc = (OutConv(64, n_classes))

    def forward(self, x):
        x1 = self.inc(x)
        x2 = self.down1(x1)
        x3 = self.down2(x2)
        x4 = self.down3(x3)
        x5 = self.down4(x4)
        x = self.up1(x5, x4)
        x = self.up2(x, x3)
        x = self.up3(x, x2)
        x = self.up4(x, x1)
        logits = self.outc(x)
        return logits

    def use_checkpointing(self):
        self.inc = torch.utils.checkpoint(self.inc)
        self.down1 = torch.utils.checkpoint(self.down1)
        self.down2 = torch.utils.checkpoint(self.down2)
        self.down3 = torch.utils.checkpoint(self.down3)
        self.down4 = torch.utils.checkpoint(self.down4)
        self.up1 = torch.utils.checkpoint(self.up1)
        self.up2 = torch.utils.checkpoint(self.up2)
        self.up3 = torch.utils.checkpoint(self.up3)
        self.up4 = torch.utils.checkpoint(self.up4)
        self.outc = torch.utils.checkpoint(self.outc)

In [14]:
# Check for GPU availability
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

In [16]:

# 创建模型实例
model = UNet(n_channels=3, n_classes=2)
# Move the model to the GPU
model = model.to(device)

summary(model, input_size=(3, 512, 512))

# random input
x = torch.randn(1, 3, 512, 512).to(device)
torch_out = model(x)
print(torch_out.shape)

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1         [-1, 64, 512, 512]           1,728
       BatchNorm2d-2         [-1, 64, 512, 512]             128
              ReLU-3         [-1, 64, 512, 512]               0
            Conv2d-4         [-1, 64, 512, 512]          36,864
       BatchNorm2d-5         [-1, 64, 512, 512]             128
              ReLU-6         [-1, 64, 512, 512]               0
        DoubleConv-7         [-1, 64, 512, 512]               0
         MaxPool2d-8         [-1, 64, 256, 256]               0
            Conv2d-9        [-1, 128, 256, 256]          73,728
      BatchNorm2d-10        [-1, 128, 256, 256]             256
             ReLU-11        [-1, 128, 256, 256]               0
           Conv2d-12        [-1, 128, 256, 256]         147,456
      BatchNorm2d-13        [-1, 128, 256, 256]             256
             ReLU-14        [-1, 128, 2