# 用FCN来完成语义分割任务


## 1. 什么是语义分割?

语义分割指的是**像素级别**地识别图片,标注出图像中每个像素属于的对象类别.

比如这张经典的图片!

![example1](./images/fcn_example.png)

我们除了识别出自行车和人之外,我们还将他们的边界也进行了描绘.

所以**语义分割可以看做对每一个像素进行分类**


## 2. FCN的简单介绍

为什么要叫 FCN(Fully Convolutional Networks) 呢?

因为在以前的任务中(2015年之前)使用CNN更多地是在卷积层之后会接上若干个全连接层, 将卷积层产生的特征图(feature map)映射成一个固定长度的特征向量（这就丢失了空间信息）。以AlexNet为代表的经典CNN结构适合于图像级的分类和回归任务，因为它们最后都期望得到整个输入图像的一个数值描述（概率），比如AlexNet的ImageNet模型输出一个1000维的向量表示输入图像属于每一类的概率(softmax归一化)。

但是 FCN 不同, FCN对图像进行像素级的分类，从而解决了语义级别的图像分割（semantic segmentation）问题。与经典的CNN在卷积层之后使用全连接层得到固定长度的特征向量进行分类（全联接层＋softmax输出）不同，**FCN可以接受任意尺寸的输入图像，采用反卷积层对最后一个卷积层的feature map进行上采样, 使它恢复到输入图像相同的尺寸**，从而可以对每个像素都产生了一个预测, 同时保留了原始输入图像中的空间信息, 最后在上采样的特征图上进行逐像素分类。


总结一下就是:

**CNN的输入是图像，输出是一个结果，或者说是一个值，一个概率值。**

**FCN提出所追求的是，输入是一张图片是，输出也是一张图片，学习像素到像素的映射 。**

这样的特征刚好符合语义分割任务中的,对像素进行分类.

## 3. FCN的网络架构


![network](./images/FCN_network.png)


整个模型是 end-to-end 的.

具体的流程是: 先**降采样（卷积、池化）**，再**上采样（反卷积）**

降采样是为了更好的获得语义信息(因为不多用几层卷积就没法正确分类)。

但是卷积和池化之后feature map变的很小了，需要还原成原图大小，所以就用上采样（反卷积）的方式把feature map 还原回原图大小。


## 4. FCN中的关键技术

### 4.1 反卷积(Deconvolution) 或者 转置卷积(conv_transpose)

两者其实是等价的,只是称呼上有差异.






In [1]:
# 导入需要的包
import os
import time
import torch
import warnings
import numpy as np
from torch import nn
from PIL import Image

import matplotlib.pyplot as plt
%matplotlib inline

import torch.nn.functional as F
from torch.autograd import Variable
from torch.optim import lr_scheduler
import torchvision.transforms as tfs
from torchvision import datasets, models
from torch.utils.data import Dataset, DataLoader

warnings.filterwarnings("ignore")

In [2]:
#数据路径
voc_root = 'XXX'
def read_images(root=voc_root, train=True):
    txt_fname = root + '/ImageSets/Segmentation/' + ('train.txt' if train else 'val.txt')
    with open(txt_fname, 'r') as f:
        images = f.read().split()
    data = [os.path.join(root, 'JPEGImages', i+'.jpg') for i in images]
    label = [os.path.join(root, 'SegmentationClass', i+'.png') for i in images]
    return data, label

In [3]:
#等比例压缩图片
def scale_high(img, target_high):
    ow, oh = img.size
    if (ow == target_high):
        return img
    h = target_high
    w = int(target_high * ow / oh)
    return img.resize((w, h), Image.BICUBIC)

#中心裁剪
def random_crop(data, label, crop_size):
    transforms = tfs.CenterCrop(crop_size)
    data = transforms(data)
    label = transforms(label)
    return data, label

In [4]:
classes = ['background','aeroplane','bicycle','bird','boat',
           'bottle','bus','car','cat','chair','cow','diningtable',
           'dog','horse','motorbike','person','potted plant',
           'sheep','sofa','train','tv/monitor']

colormap = [[0,0,0],[128,0,0],[0,128,0], [128,128,0], [0,0,128],
            [128,0,128],[0,128,128],[128,128,128],[64,0,0],[192,0,0],
            [64,128,0],[192,128,0],[64,0,128],[192,0,128],
            [64,128,128],[192,128,128],[0,64,0],[128,64,0],
            [0,192,0],[128,192,0],[0,64,128]]

len(classes), len(colormap)

(21, 21)

In [5]:
# 每个像素点有 0 ~ 255 的选择，RGB 三个通道
cm2lbl = np.zeros(256**3) 
for i,cm in enumerate(colormap):
    # 建立索引
    cm2lbl[(cm[0]*256+cm[1])*256+cm[2]] = i 

def image2label(img):
    data = np.array(img, dtype='int32')
    idx = (data[:, :, 0] * 256 + data[:, :, 1]) * 256 + data[:, :, 2]
    # 根据索引得到 label 矩阵
    return np.array(cm2lbl[idx], dtype='int64') 

In [6]:
def img_transforms(img, label, crop_size):
    img, label = random_crop(img, label, crop_size)
    img = scale_high(img,int(img.size[1]/2))
    label = scale_high(label,int(label.size[1]/2))
    img_tfs = tfs.Compose([      
        tfs.ToTensor(),
        tfs.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

    img = img_tfs(img)
    label = image2label(label)
    label = torch.from_numpy(label)
    return img, label

#制作数据集
class VOCSegDataset(Dataset):
    def __init__(self, train, crop_size, transforms):
        self.crop_size = crop_size
        self.transforms = transforms
        data_list, label_list = read_images(train=train)
        print('Read ' + str(len(self.data_list)) + ' images')
        
        
    def __getitem__(self, idx):
        img = self.data_list[idx]
        label = self.label_list[idx]
        img = Image.open(img)
        label = Image.open(label).convert('RGB')
        img, label = self.transforms(img, label, self.crop_size)
        return img, label
    
    def __len__(self):
        return len(self.data_list)

In [7]:
# 实例化数据集
batch_size = 16
input_shape = (512, 512)
train_data = VOCSegDataset(True, input_shape, img_transforms)
testdata = VOCSegDataset(False, input_shape, img_transforms)
trainloader = DataLoader(train_data, batch_size = batch_size, shuffle=True)
testloader = DataLoader(testdata, batch_size = batch_size, shuffle=True)

In [8]:
#初始化权值
def get_upsampling_weight(in_channels, out_channels, kernel_size):
    """Make a 2D bilinear kernel suitable for upsampling"""
    factor = (kernel_size + 1) // 2
    if kernel_size % 2 == 1:
        center = factor - 1
    else:
        center = factor - 0.5
    og = np.ogrid[:kernel_size, :kernel_size]
    filt = (1 - abs(og[0] - center) / factor) * \
           (1 - abs(og[1] - center) / factor)
    weight = np.zeros((in_channels, out_channels, kernel_size, kernel_size),
                      dtype=np.float64)
    weight[range(in_channels), range(out_channels), :, :] = filt
    return torch.from_numpy(weight).float()

In [9]:
#FCN网络结构的定义
n_class = len(classes)
class FCN(nn.Module):
    def __init__(self, n_class=21):
        super(FCN, self).__init__()
        # conv1
        self.conv1_1 = nn.Conv2d(3, 64, 3, padding=100)
        self.relu1_1 = nn.ReLU(inplace=True)
        self.conv1_2 = nn.Conv2d(64, 64, 3, padding=1)
        self.relu1_2 = nn.ReLU(inplace=True)
        self.pool1 = nn.MaxPool2d(2, stride=2, ceil_mode=True)  # 1/2

        # conv2
        self.conv2_1 = nn.Conv2d(64, 128, 3, padding=1)
        self.relu2_1 = nn.ReLU(inplace=True)
        self.conv2_2 = nn.Conv2d(128, 128, 3, padding=1)
        self.relu2_2 = nn.ReLU(inplace=True)
        self.pool2 = nn.MaxPool2d(2, stride=2, ceil_mode=True)  # 1/4

        # conv3
        self.conv3_1 = nn.Conv2d(128, 256, 3, padding=1)
        self.relu3_1 = nn.ReLU(inplace=True)
        self.conv3_2 = nn.Conv2d(256, 256, 3, padding=1)
        self.relu3_2 = nn.ReLU(inplace=True)
        self.conv3_3 = nn.Conv2d(256, 256, 3, padding=1)
        self.relu3_3 = nn.ReLU(inplace=True)
        self.pool3 = nn.MaxPool2d(2, stride=2, ceil_mode=True)  # 1/8

        # conv4
        self.conv4_1 = nn.Conv2d(256, 512, 3, padding=1)
        self.relu4_1 = nn.ReLU(inplace=True)
        self.conv4_2 = nn.Conv2d(512, 512, 3, padding=1)
        self.relu4_2 = nn.ReLU(inplace=True)
        self.conv4_3 = nn.Conv2d(512, 512, 3, padding=1)
        self.relu4_3 = nn.ReLU(inplace=True)
        self.pool4 = nn.MaxPool2d(2, stride=2, ceil_mode=True)  # 1/16

        # conv5
        self.conv5_1 = nn.Conv2d(512, 512, 3, padding=1)
        self.relu5_1 = nn.ReLU(inplace=True)
        self.conv5_2 = nn.Conv2d(512, 512, 3, padding=1)
        self.relu5_2 = nn.ReLU(inplace=True)
        self.conv5_3 = nn.Conv2d(512, 512, 3, padding=1)
        self.relu5_3 = nn.ReLU(inplace=True)
        self.pool5 = nn.MaxPool2d(2, stride=2, ceil_mode=True)  # 1/32

        # fc6
        self.fc6 = nn.Conv2d(512, 4096, 7)
        self.relu6 = nn.ReLU(inplace=True)
        self.drop6 = nn.Dropout2d()

        # fc7
        self.fc7 = nn.Conv2d(4096, 4096, 1)
        self.relu7 = nn.ReLU(inplace=True)
        self.drop7 = nn.Dropout2d()

        self.score_fr = nn.Conv2d(4096, n_class, 1)
        self.score_pool3 = nn.Conv2d(256, n_class, 1)
        self.score_pool4 = nn.Conv2d(512, n_class, 1)

        self.upscore2 = nn.ConvTranspose2d(
            n_class, n_class, 4, stride=2, bias=False)
        self.upscore8 = nn.ConvTranspose2d(
            n_class, n_class, 16, stride=8, bias=False)
        self.upscore_pool4 = nn.ConvTranspose2d(
            n_class, n_class, 4, stride=2, bias=False)

        self._initialize_weights()

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                m.weight.data.zero_()
                if m.bias is not None:
                    m.bias.data.zero_()
            if isinstance(m, nn.ConvTranspose2d):
                assert m.kernel_size[0] == m.kernel_size[1]
                initial_weight = get_upsampling_weight(
                    m.in_channels, m.out_channels, m.kernel_size[0])
                m.weight.data.copy_(initial_weight)

    def forward(self, x):
        h = x
        h = self.relu1_1(self.conv1_1(h))
        h = self.relu1_2(self.conv1_2(h))
        h = self.pool1(h)

        h = self.relu2_1(self.conv2_1(h))
        h = self.relu2_2(self.conv2_2(h))
        h = self.pool2(h)

        h = self.relu3_1(self.conv3_1(h))
        h = self.relu3_2(self.conv3_2(h))
        h = self.relu3_3(self.conv3_3(h))
        h = self.pool3(h)
        pool3 = h  # 1/8

        h = self.relu4_1(self.conv4_1(h))
        h = self.relu4_2(self.conv4_2(h))
        h = self.relu4_3(self.conv4_3(h))
        h = self.pool4(h)
        pool4 = h  # 1/16

        h = self.relu5_1(self.conv5_1(h))
        h = self.relu5_2(self.conv5_2(h))
        h = self.relu5_3(self.conv5_3(h))
        h = self.pool5(h)

        h = self.relu6(self.fc6(h))
        h = self.drop6(h)

        h = self.relu7(self.fc7(h))
        h = self.drop7(h)

        h = self.score_fr(h)
        h = self.upscore2(h)
        upscore2 = h  # 1/16

        h = self.score_pool4(pool4)
        h = h[:, :, 5:5 + upscore2.size()[2], 5:5 + upscore2.size()[3]]
        score_pool4c = h  # 1/16

        h = upscore2 + score_pool4c  # 1/16
        h = self.upscore_pool4(h)
        upscore_pool4 = h  # 1/8

        h = self.score_pool3(pool3)
        h = h[:, :,
              9:9 + upscore_pool4.size()[2],
              9:9 + upscore_pool4.size()[3]]
        score_pool3c = h  # 1/8

        h = upscore_pool4 + score_pool3c  # 1/8

        h = self.upscore8(h)
        h = h[:, :, 31:31 + x.size()[2], 31:31 + x.size()[3]].contiguous()

        return h

In [10]:
#模型实例化 
model = FCN(n_class)
use_gpu = False
use_gpu = torch.cuda.is_available()
if use_gpu:
    model = model.cuda()

In [11]:
model

FCN (
  (conv1_1): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(100, 100))
  (relu1_1): ReLU (inplace)
  (conv1_2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (relu1_2): ReLU (inplace)
  (pool1): MaxPool2d (size=(2, 2), stride=(2, 2), dilation=(1, 1))
  (conv2_1): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (relu2_1): ReLU (inplace)
  (conv2_2): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (relu2_2): ReLU (inplace)
  (pool2): MaxPool2d (size=(2, 2), stride=(2, 2), dilation=(1, 1))
  (conv3_1): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (relu3_1): ReLU (inplace)
  (conv3_2): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (relu3_2): ReLU (inplace)
  (conv3_3): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (relu3_3): ReLU (inplace)
  (pool3): MaxPool2d (size=(2, 2), stride=(2, 2), dilation=(1, 1))
  (conv4_1): Conv2d(256, 512, ke

In [12]:
num_epoches = 100
learning_rate = 0.001
weight_decay=1e-4
criterion = nn.NLLLoss2d()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)

In [13]:
#IoU的值定义：(候选区域)Region Proposal与（标记区域）Ground Truth的窗口的交集比并集的比值，如果IoU低于0.5，那么相当于目标还是没有检测到
def _fast_hist(label_true, label_pred, n_class):
    mask = (label_true >= 0) & (label_true < n_class)
    hist = np.bincount(
        n_class * label_true[mask].astype(int) +
        label_pred[mask], minlength=n_class ** 2).reshape(n_class, n_class)
    return hist

def label_accuracy_score(label_trues, label_preds, n_class):
    """Returns accuracy score evaluation result.
      - overall accuracy
      - mean accuracy
      - mean IU
      - fwavacc
    """
    hist = np.zeros((n_class, n_class))
    for lt, lp in zip(label_trues, label_preds):
        hist += _fast_hist(lt.flatten(), lp.flatten(), n_class)
    acc = np.diag(hist).sum() / hist.sum()
    acc_cls = np.diag(hist) / hist.sum(axis=1)
    acc_cls = np.nanmean(acc_cls)
    
    iu = np.diag(hist) / (hist.sum(axis=1) + hist.sum(axis=0) - np.diag(hist))
    mean_iu = np.nanmean(iu)
    freq = hist.sum(axis=1) / hist.sum()
    fwavacc = (freq[freq > 0] * iu[freq > 0]).sum()
    return acc, acc_cls, mean_iu, fwavacc

In [14]:
#学习率的优化
def exp_lr_scheduler(optimizer, epoch, init_lr=0.001, lr_decay_epoch=30):
    """Decay learning rate by a factor of 0.1 every lr_decay_epoch epochs."""
    lr = init_lr * (0.5**(epoch // lr_decay_epoch))

    if epoch % lr_decay_epoch == 0:
        print('LR is set to {}'.format(lr))

    for param_group in optimizer.param_groups:
        param_group['lr'] = lr

    return optimizer

In [16]:
#模型的训练与测试
def train_test(optimizer):
    #训练
    minibatch_y=[]
    epoch_y=[]
    epochacc_y=[]
    wrong_all=[]
    testacc_y=[]
    testloss_y=[]
    
    for epoch in range(num_epoches):
        print ('- - - - - epoch',epoch+1,'- - - - - - - -')
        train_acc_cls = 0
        epoch_mean_IoU = 0
        train_fwavacc = 0
        
        batch_idx_sum = 0 
        minibatch_loss=0
        epoch_loss_sum=0
        epoch_acc=0
        
        optimizer = exp_lr_scheduler(optimizer, epoch)
        
        for batch_idx, data in enumerate(trainloader , 1):
            
            batch_idx_sum += data[0].size()[0]
            
            if use_gpu:
                im = Variable(data[0]).cuda()
                label = Variable(data[1]).cuda()
            else:
                im = Variable(data[0])
                label = Variable(data[1])
            
            out = model(im)

            out = F.log_softmax(out) 
            print(out.size())
            print(label.size())
        
            minibatch_loss = criterion(out, label)
            
            optimizer.zero_grad()
            
            minibatch_loss.backward()
            
            optimizer.step()
            
            #该列表存放每一个batch的loss值
            minibatch_y.append(minibatch_loss.data[0])
            
            epoch_loss_sum += minibatch_loss.data[0]

            label_pred = out.max(dim=1)[1].data.cpu().numpy()
            
            label_true = label.data.cpu().numpy()
            
            for lbt, lbp in zip(label_true, label_pred):
                acc, acc_cls, mean_iu, fwavacc = label_accuracy_score(lbt, lbp, n_class)
                epoch_acc += acc
                train_acc_cls += acc_cls
                epoch_mean_IoU += mean_iu
                train_fwavacc += fwavacc
                        
            if batch_idx % 20 == 0 :
                print("Train Epoch: {} [{}/{} ({:.0f}%)] ,Loss: {:.6f} ,Accuracy:{:.2f}%, Train Mean IoU:{:.5f}".format(
                    epoch+1,
                    batch_idx_sum,
                    len(train_data),
                    100. * batch_idx_sum / len(train_data),
                    minibatch_loss.data[0],
                    100.*epoch_acc / batch_idx_sum,    
                    epoch_mean_IoU / batch_idx_sum     
                    ))
                
        epoch_loss_average = epoch_loss_sum/len(trainloader)

        epoch_y.append(epoch_loss_average)
        
        epoch_acc_average=epoch_acc/len(train_data)

        epochacc_y.append(epoch_acc_average)
    #############################################################
    #测试
        model.eval()
        step_sum = 0
        test_loss = 0
        test_acc = 0
        test_acc_cls = 0
        test_mean_IoU = 0
        test_fwavacc = 0
        for step, data in enumerate(testloader , 1):
            
            step_sum += data[0].size()[0]
            
            if use_gpu:
                im = Variable(data[0], volatile=True).cuda()
                label = Variable(data[1], volatile=True).cuda()
            else:
                im = Variable(data[0])
                label = Variable(data[1])
            
            out = model(im)
            out = F.log_softmax(out)
            test_loss = criterion(out, label)
            testloss_y.append(test_loss.data[0])

            label_pred = out.max(dim=1)[1].data.cpu().numpy()
            label_true = label.data.cpu().numpy()
            
            for lbt, lbp in zip(label_true, label_pred):
                acc, acc_cls, mean_IoU, fwavacc = label_accuracy_score(lbt, lbp, n_class)
                test_acc += acc
                test_acc_cls += acc_cls
                test_mean_IoU += mean_IoU
                test_fwavacc += fwavacc
        
        testacc=test_acc/len(voc_test)
        
        test_mean_IoU = test_mean_IoU/len(voc_test)
        
        testacc_y.append(testacc)
        
        print("Test_accuracy: {:.2f}% , Test Mean IoU: {:.5f} ".format(100.*testacc, test_mean_IoU))
    return minibatch_y, epoch_y, testloss_y, epochacc_y ,testacc_y

In [17]:
#模型评估
train_start = time.time()
minibatch_y, epoch_y, testloss_y, epochacc_y ,testacc_y = train_test(optimizer)
train_stop = time.time()
train_total = train_stop - train_start
print('Training complete in {:.0f}m {:.0f}s'.format(train_total // 60, train_total % 60))

In [19]:
#模型保存
torch.save(model.state_dict(), './FCN_params.pkl')  

In [20]:
# 预测结果可视化处理
cm = np.array(colormap).astype('uint8')
def predict(im, label): # 预测结果
    im = Variable(im.unsqueeze(0))
    out = model(im)
    pred = out.max(1)[1].squeeze().cpu().data.numpy()
    pred = cm[pred]
    return pred, cm[label.numpy()]
_, figs = plt.subplots(5, 3, figsize=(12, 10))
for i in range(5):
    test_data, test_label = testdata[i]
    pred, label = predict(test_data, test_label)
    figs[i, 0].imshow(Image.open(testdata.data_list[i]))
    figs[i, 0].axes.get_xaxis().set_visible(False)
    figs[i, 0].axes.get_yaxis().set_visible(False)
    figs[i, 1].imshow(label)
    figs[i, 1].axes.get_xaxis().set_visible(False)
    figs[i, 1].axes.get_yaxis().set_visible(False)
    figs[i, 2].imshow(pred)
    figs[i, 2].axes.get_xaxis().set_visible(False)
    figs[i, 2].axes.get_yaxis().set_visible(False)