In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
from torchvision import transforms
from torch.utils import data
import numpy as np 
import matplotlib.pyplot as plt 
import os
import glob 
from PIL import Image
import time

In [2]:
# 数据展示 
plt.figure(figsize=(12,8))
label_img = Image.open('./data/annotations/annotations/trimaps/Abyssinian_3.png')
label_np_img = np.array(label_img)
img = Image.open('./data/images/images/Abyssinian_3.jpg')
np_img = np.array(img)
# plt.subplot(1,2,1)
# plt.imshow(np_img)
# plt.subplot(1,2,2)
# plt.imshow(label_np_img,cmap='gray')

# print('原始图像尺寸',np_img.shape)
# print('标签图像尺寸',lab|el_np_img.shape)
# print('标签图像分类值',np.unique(label_np_img))
torch.squeeze(torch.tensor(label_np_img,dtype=torch.int64))-1 

FileNotFoundError: [Errno 2] No such file or directory: './data/annotations/annotations/trimaps/Abyssinian_3.png'

<Figure size 1200x800 with 0 Axes>

In [15]:
# 路径处理  
images = glob.glob('data/images/images/*.jpg')
annotations = [os.path.join('./data/annotations/annotations/trimaps',img_name.split('\\')[-1].replace('jpg','png')) for img_name in images]
len(images)

7390

In [4]:

# # 数据集分割
np.random.seed(42)
index = np.random.permutation(len(images))
images = np.array(images)[index]
annotations = np.array(annotations)[index]

sep = int(len(images)*0.8)
train_images = images[:sep]
train_label = annotations[:sep]

test_images = images[sep:]
test_label = annotations[sep:]

In [5]:
# 制作训练作用的数据
transform = transforms.Compose([
    transforms.Resize((256,256)), # 修改尺寸
    transforms.ToTensor()         # 张量
]) 

# 数据处理工具

class ox_dataset(data.Dataset):
    def __init__(self,img_paths,anno_paths):
        self.imgs = img_paths
        self.annos = anno_paths
    
    def __getitem__(self,index):
        img = self.imgs[index]
        anno = self.annos[index]
        # 处理原始图形
        '''
        1. 读成数组
        2. 转成 RGB 通道
        3. 转成 张量 格式
        '''
        pil_img  = Image.open(img)
        pil_img = pil_img.convert('RGB')
        img_tensor = transform(pil_img)
        
        # 处理语义分割图像
        '''
        1. 读取
        2. 重新定义为 256 * 256
        3. 张量...
        '''
        pil_anno = Image.open(anno)
        pil_anno = pil_anno.resize((256,256))
        anno_tensor = torch.tensor(np.array(pil_anno),dtype=torch.int64)
        
        return img_tensor,torch.squeeze(anno_tensor) - 1 
        
    def __len__(self):
        return len(self.imgs)



In [18]:
# 投喂数量
BATCH_SIZE = 16
# 数据 batch——size 打包
train_dataset = ox_dataset(train_images,train_label)
test_dataset = ox_dataset(test_images,test_label)

train_dl = data.DataLoader(
    train_dataset,
    batch_size=BATCH_SIZE,
    shuffle=True
)

test_dl = data.DataLoader(
    test_dataset,
    batch_size=BATCH_SIZE
)

# img_batch,anno_batch = next(iter(train_dl))
# img = img_batch[0].permute(1,2,0).numpy()
# plt.subplot(1,2,1)
# plt.imshow(img)

# anno = anno_batch[0]
# plt.subplot(1,2,2)
# plt.imshow(anno)

In [19]:
# 特征提取（下采样）
class Downsample(nn.Module):
    def __init__(self,in_channels,out_channels):
        super(Downsample,self).__init__()
        self.conv_relu = nn.Sequential(
            nn.Conv2d(in_channels,out_channels,kernel_size=3,padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels,out_channels,kernel_size=3,padding=1),
            nn.ReLU(inplace=True),
        )
        
        self.pool = nn.MaxPool2d(kernel_size=2)
    
    def forward(self,x,is_pool=True):
        if is_pool:
            x = self.pool(x)
        x = self.conv_relu(x)
        return x
        
            

In [20]:
# 上采样 （解码，还原）
class Upsample(nn.Module):
    def __init__(self,channels):
        super(Upsample,self).__init__()
        self.conv_relu = nn.Sequential(
            nn.Conv2d(2*channels,channels,kernel_size=3,padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(channels,channels,kernel_size=3,padding=1),
            nn.ReLU(inplace=True),
        )
        
        self.upconv_relu = nn.Sequential(
            nn.ConvTranspose2d(
                channels,
                channels//2,
                kernel_size=3,
                stride=2,
                padding=1,
                output_padding=1),
            nn.ReLU(inplace=True)
        )
    def forward(self,x):
        x = self.conv_relu(x)
        x = self.upconv_relu(x)
        return x

In [21]:
class Unet_model(nn.Module):
    def __init__(self):
        super(Unet_model,self).__init__()
        # 特征提取组件的搭建完成
        self.down1 = Downsample(3,64)
        self.down2 = Downsample(64,128)
        self.down3 = Downsample(128,256)
        self.down4 = Downsample(256,512)
        self.down5 = Downsample(512,1024)
        
        # 上采样
        self.up = nn.Sequential(
            nn.ConvTranspose2d(
                1024,
                512,
                kernel_size=3,
                stride=2,
                padding=1,
                output_padding=1),
            nn.ReLU(inplace=True)
        )
        self.up1 = Upsample(512)
        self.up2 = Upsample(256)
        self.up3 = Upsample(128)
        self.conv_2 = Downsample(128,64)
        # 输出层
        self.last = nn.Conv2d(64,3,kernel_size=1)
    
    # 前向传播
    def forward(self,x):
        x1 = self.down1(x,is_pool=False)
        x2 = self.down2(x1)
        x3 = self.down3(x2)
        x4 = self.down4(x3)
        x5 = self.down5(x4)
        x5 = self.up(x5)
        x5 = torch.cat([x4,x5],dim=1)
        x5 = self.up1(x5)
        x5 = torch.cat([x3,x5],dim=1)
        x5 = self.up2(x5)
        x5 = torch.cat([x2,x5],dim=1)
        x5 = self.up3(x5)
        x5 = torch.cat([x1,x5],dim=1)
        x5 = self.conv_2(x5,is_pool=False)
        x5 = self.last(x5)
        
        return x5

In [22]:
model = Unet_model()
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(),lr=0.0005)
device =torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

In [46]:
# 定义训练函数
def train(dataloader):
    # 训练集的总数据
    size = len(dataloader.dataset)
    # 共计的 ***批数 batch 的个数
    num_batches = len(dataloader)
    # 损失值数据，识别数据（容器）
    train_loss,correct = 0,0
    # 训练模式
    model.train()
    for X,y in dataloader:
        batch_start_time = time.time()
        # 计算误差
        X,y = X.to(device),y.to(device)
        pred = model(X)
        loss = loss_fn(pred,y)
        # 反向传递
        optimizer.zero_grad()  # 固定操作（梯度归0）
        loss.backward()
        # 优化器
        optimizer.step()
        batch_end_time = time.time()
        # 记录器
        with torch.no_grad():
            pred = torch.argmax(pred,dim=1)
            # 汇总预测正确的‘像素个数’
            correct += (pred == y).type(torch.float).sum().item()
            train_loss += loss.item()
            print('训练数据的预测对的像素个数为：{};每16个需消耗{}秒'.format(correct,round(batch_end_time-batch_start_time,2)))
    train_loss /= num_batches
    correct /= size * 256 * 256 # 除以总的样本数，每张图片有256 * 256个像素
    return train_loss,correct

# 定义测试函数
def test():
     # 训练集的总数据
    size = len(dataloader.dataset)
    # 共计的 ***批数 batch 的个数
    num_batches = len(dataloader)
    # 训练模式
    model.eval()
    test_loss,correct = 0,0
    # 记录器
    with torch.no_grad():
        for X,y in dataloader:
            X,y = X.to(device),y.to(device)
            pred = model(X)
            test_loss = loss_fn(pred,y).item()
            pred = torch.argmax(pred,dim=1)
            correct += (pred == y).type(torch.float).sum().item()
            
    test_loss /= num_batches
    correct /= size * 256 * 256 # 除以总的样本数，每张图片有256 * 256个像素
    return test_loss,correct

In [47]:
# 定义训练函数
def fit(epochs,train_dl,test_dl):
    train_acc = []
    train_loss = []
    test_acc = []
    test_loss = []
    best_acc = 0.0
    for epoch in range(epochs):
        start  =  time.time()
        epoch_loss,epoch_acc = train(dataloader=train_dl)
        epoch_test_loss,epoch_test_acc = test(dataloader=test_dl)
        end = time.time()
        times = end - start 
        train_acc.append(epoch_acc)
        train_loss.append(epoch_loss)
        test_acc.append(epoch_test_acc)
        test_loss.append(epoch_test_loss)
        print('训练epoch{},训练集损失值:{:.2f},训练集的准确率:{:.2f}%,测试集损失值:{:.2f},测试集的准确率:{:.2f}%,消耗时间：{:.2f}s'.
              format(epoch+1,epoch_loss,epoch_acc*100,epoch_test_loss,epoch_test_acc*100,times))   
        if epoch_test_acc > best_acc:  # 如果当前的test_acc 高于best_acc 执行保存
        best_acc = epoch_test_acc
        best_model_wts = copy.deepcopy(model.state_dict())
    return train_loss,test_loss,train_acc,test_acc   

In [None]:
# 训练 
fit(50,train_dl,test_dl)

训练数据的预测对的像素个数为：527134.0;每16个需消耗24.44秒
训练数据的预测对的像素个数为：1182395.0;每16个需消耗25.28秒
训练数据的预测对的像素个数为：1811977.0;每16个需消耗25.09秒


In [None]:
torch.save(model.state_dict(),'Unet_best.pth')