In [1]:
import numpy as np 
import pandas as pd 
import os
import torch as t
from torch.utils.data import Dataset
import torchvision.transforms.functional as ff
from PIL import Image
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader

先验知识：
    classmethod和staticmethod有什么不同？
    classmethod必须将类对象的引用作为第一个参数，而staticmethod可以根本没有参数

In [2]:
class LabelProcessor:
    #静态方法的应用场景：在构造对象之前，就先从类中获取到一定的信息来判断如何初始化实例（另一种构造函数）
    def __init__(self,file_path):
        self.colormap = self.read_color_map(file_path)
        self.cm2lbl = self.encode_label_pix(self.colormap)
    
    #静态方法不许哟啊self参数
    #节约内存，不用每个实例都实例化方法
    #实际和C++的静态函数没有区别，可以直接通过类名来调用
    @staticmethod
    def read_color_map(file_path):
        pd_label_color = pd.read_csv(file_path, sep=',')
        colormap = []
        for i in range(len(pd_label_color.index)):
            # 通过行号索引行数据
            tmp = pd_label_color.iloc[i]
            color = []
            color.append(tmp['r'])
            color.append(tmp['g'])
            color.append(tmp['b'])
            colormap.append(color)
        return colormap
    

    @staticmethod
    def encode_label_pix(colormap):
        cm2lbl = np.zeros(256 ** 3)
        for i,cm in enumerate(colormap):
            cm2lbl[(cm[0] * 256 + cm[1]) * 256 + cm[2]] = i
        return cm2lbl
  
    #按照Label_dict对比找到对应类别序号
    def encode_label_img(self,img):
        data = np.array(img, dtype='int32')
        idx = (data[:, :, 0] * 256 + data[:, :, 1]) * 256 + data[:, :, 2]
        return np.array(self.cm2lbl[idx], dtype='int64')

In [3]:
class CamvidDataset(Dataset):
    def __init__(self,file_path=[],crop_size=None):
        """
            filepath(list):数据和标签路径，列表元素第一个为图片路径，第二个为标签路径
        """
        #1.正确读入图片和标签路径
        if len(file_path)!=2:
            raise ValueError("同时需要图片拟合标签文件夹的路径，图片路径在前")
        self.img_path = file_path[0]
        self.label_path = file_path[1]
        
        #2. 从路径中取出图片和标签数据的文件名保持到两个列表当中（程序中的数据来源）
        #文件夹路径->文件列表
        self.imgs=self.read_file(self.img_path)
        self.labels=self.read_file(self.label_path)
        self.crop_size=crop_size
    
    def __getitem__(self,index):
        img = self.imgs[index]
        label = self.labels[index]
        img = Image.open(img)
        label = Image.open(label).convert('RGB')
        img,label=self.center_crop(img,label,self.crop_size)
        img,label=self.img_transform(img,label)
        sample = {'img':img,'label':label}
        return sample
                                          
    def __len__(self):
        return len(self.imgs)
                            
    def read_file(self,path):
        """从文件读取数据"""
        file_list=os.listdir(path)
        file_path_list=[os.path.join(path,img) for img in file_list]            
        file_path_list.sort()
        return file_path_list
                        
    def center_crop(self,data,label,crop_size):
        data = ff.center_crop(data, crop_size)
        label = ff.center_crop(label, crop_size)
        return data, label                   
                                        
    def img_transform(self, img, label):
        #numpy->tensor  label从RGB图改到类别图
        label = np.array(label)
        label = Image.fromarray(label.astype('uint8'))

        transform_img = transforms.Compose(
            [
                transforms.ToTensor(),
                transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
            ]
        )
        img = transform_img(img)
        label = label_processor.encode_label_img(label)
        #1.声明类时这里不会执行，所以不会报错
        #2.在运行到此处时，label_processor已经被初始化，所以即可使用
        label = t.from_numpy(label)

        return img, label
                                          
                                          
                                          

In [4]:
TRAIN_ROOT = '/kaggle/input/camvid/CamVid/train'
TRAIN_LABEL = '/kaggle/input/camvid/CamVid/train_labels'
VAL_ROOT = '/kaggle/input/camvid/CamVid/val'
VAL_LABEL = '/kaggle/input/camvid/CamVid/val_labels'
TEST_ROOT = '/kaggle/input/camvid/CamVid/test'
TEST_LABEL = '/kaggle/input/camvid/CamVid/test_labels'
CLASS_DICT='/kaggle/input/camvid/CamVid/class_dict.csv'
crop_size=(352,480)
Cam_train = CamvidDataset([TRAIN_ROOT, TRAIN_LABEL],crop_size)
Cam_val = CamvidDataset([VAL_ROOT, VAL_LABEL],crop_size)
Cam_test = CamvidDataset([TEST_ROOT, TEST_LABEL],crop_size)
label_processor = LabelProcessor(CLASS_DICT)

Example for load data

Example Image show

In [5]:
img = Image.open(Cam_train.imgs[0])
label = Image.open(Cam_train.labels[0])
plt.subplot(1,2,1)
plt.imshow(img)
plt.subplot(1,2,2)
plt.imshow(label)
#裁剪前
img,label=Cam_train.center_crop(img,label,crop_size)
plt.subplot(1,2,1)
plt.imshow(img)
plt.subplot(1,2,2)
plt.imshow(label)
#裁剪之后的结果

In [6]:
def bilinear_kernel(in_channels, out_channels, kernel_size):
    """Define a bilinear kernel according to in channels and out channels.
    Returns:
        return a bilinear filter tensor
    """
    factor = (kernel_size + 1) // 2
    if kernel_size % 2 == 1:
        center = factor - 1
    else:
        center = factor - 0.5
    og = np.ogrid[:kernel_size, :kernel_size]
    bilinear_filter = (1 - abs(og[0] - center) / factor) * (1 - abs(og[1] - center) / factor)
    weight = np.zeros((in_channels, out_channels, kernel_size, kernel_size), dtype=np.float32)
    weight[range(in_channels), range(out_channels), :, :] = bilinear_filter
    return t.from_numpy(weight)

In [7]:
from torchvision import models
from torch import nn
pretrained_net = models.vgg16_bn(pretrained=False)

Upsample上采样只能把图片尺寸变大，而不能把通道数还愿为小通道，所以需要另写1**1卷积来缩小通道数 conv_trans1

In [None]:
#pretrained_net.features
#-----------model-------------------------- 

In [8]:
class FCN(nn.Module):
    def __init__(self,num_classes):
        super().__init__()
        
        self.stage1 = pretrained_net.features[:7]
        self.stage2 = pretrained_net.features[7:14]
        self.stage3 = pretrained_net.features[14:24]
        self.stage4 = pretrained_net.features[24:34]
        self.stage5 = pretrained_net.features[34:]
        
        #降维
        self.scores1 = nn.Conv2d(512,num_classes,1)  #input_channel out_channel kernel_size
        self.scores2 = nn.Conv2d(512,num_classes,1)  
        self.scores3 = nn.Conv2d(128,num_classes,1)
        
        self.conv_trans1 = nn.Conv2d(512,256,1)
        self.conv_trans2 = nn.Conv2d(256,num_classes,1)
        
        self.upsample_8x = nn.ConvTranspose2d(num_classes, num_classes, 16, 8, 4, bias=False)
        self.upsample_8x.weight.data = bilinear_kernel(num_classes, num_classes, 16)

        self.upsample_2x_1 = nn.ConvTranspose2d(512, 512, 4, 2, 1, bias=False)
        self.upsample_2x_1.weight.data = bilinear_kernel(512, 512, 4)

        self.upsample_2x_2 = nn.ConvTranspose2d(256, 256, 4, 2, 1, bias=False)
        self.upsample_2x_2.weight.data = bilinear_kernel(256, 256, 4)
        
    def forward(self,x):     #352 480  3
        s1 = self.stage1(x)   #176 240  64 
        s2 = self.stage2(s1)  #88  120 128
        s3 = self.stage3(s2)  #44  60  256  
        s4 = self.stage4(s3)  #22  30  512
        s5 = self.stage5(s4)  #11  15  512
        
    
        s5=self.upsample_2x_1(s5)  #22 30 512  放大
        add1=s4+s5   
        
     
        add1=self.conv_trans1(add1)   # 22 30 256 先改深度再改大小
        add1=self.upsample_2x_2(add1)  #44 60 256
        add2=add1+s3
        
        add2=self.conv_trans2(add2) #22 30 12
        output=self.upsample_8x(add2) #352 480 12
        return output
        

In [9]:
from __future__ import division

import six  #python2 -python3
def calc_semantic_segmentation_confusion(pred_labels, gt_labels):
    pred_labels = iter(pred_labels)  #(352 480)

    gt_labels = iter(gt_labels)  #(352 480)
    #初始化
    n_class = 12
    confusion = np.zeros((n_class, n_class), dtype=np.int64)
    #构造
    for pred_label, gt_label in six.moves.zip(pred_labels, gt_labels):
        if pred_label.ndim != 2 or gt_label.ndim != 2:
            raise ValueError('ndim of labels should be two.')
        if pred_label.shape != gt_label.shape:
            raise ValueError('Shape of ground truth and prediction should'
                             ' be same.')
        pred_label = pred_label.flatten()  #(168960,)
        gt_label = gt_label.flatten()  #(168960,)

        # Dynamically expand the confusion matrix if necessary.
        lb_max = np.max((pred_label, gt_label))
        # print(lb_max)
        if lb_max >= n_class:
            expanded_confusion = np.zeros(
                (lb_max + 1, lb_max + 1), dtype=np.int64)
            expanded_confusion[0:n_class, 0:n_class] = confusion

            n_class = lb_max + 1
            confusion = expanded_confusion

        # Count statistics from valid pixels.  极度巧妙 × class_nums 正好使得每个ij能够对应.
        mask = gt_label >= 0
        confusion += np.bincount(
            n_class * gt_label[mask].astype(int) +
            pred_label[mask], minlength=n_class ** 2).reshape((n_class, n_class))

    for iter_ in (pred_labels, gt_labels):
        # This code assumes any iterator does not contain None as its items.
        if next(iter_, None) is not None:
            raise ValueError('Length of input iterables need to be same')

    # confusion = np.delete(confusion, 11, axis=0)
    # confusion = np.delete(confusion, 11, axis=1)
    return confusion


def calc_semantic_segmentation_iou(confusion):
   
    
    iou_denominator = (confusion.sum(axis=1) + confusion.sum(axis=0)
                       - np.diag(confusion))
    iou = np.diag(confusion) / iou_denominator
    return iou[:-1]
    # return iou


def eval_semantic_segmentation(pred_labels, gt_labels):
  
    confusion = calc_semantic_segmentation_confusion(
        pred_labels, gt_labels)
    iou = calc_semantic_segmentation_iou(confusion)
    pixel_accuracy = np.diag(confusion).sum() / confusion.sum()
    class_accuracy = np.diag(confusion) / (np.sum(confusion, axis=1) + 1e-10)

    return {'iou': iou, 'miou': np.nanmean(iou),
            'pixel_accuracy': pixel_accuracy,
            'class_accuracy': class_accuracy,
            'mean_class_accuracy': np.nanmean(class_accuracy[:-1])}
            # 'mean_class_accuracy': np.nanmean(class_accuracy)}

In [30]:
import torch.nn.functional as F
from torch import optim 
from torch.autograd import Variable
from datetime import datetime

#1.data
train_data = DataLoader(Cam_train,batch_size=2,shuffle=True,num_workers=4)
val_data = DataLoader(Cam_val,batch_size=2,shuffle=True,num_workers=4)

#2.equires
device = t.device('cuda')  if t.cuda.is_available() else t.device('cpu')
net = FCN(32)
net = net.to(device)
criterion = nn.NLLLoss().to(device)
optimizer = optim.Adam(net.parameters(),lr=1e-4)


eval_miou_list = []
best = [0]

------------------Train---------------------------

In [None]:
#500个epoch
prec_time=datetime.now()
for epoch in range(50):
    if epoch%10==0 and epoch!=0:
        for group in optimizer.param_groups:
            group['lr']*=0.25
    train_loss = 0  #一个epoch的所有损失和
    train_acc = 0 
    train_miou = 0
    
    #open 
    net=net.train()
   
    
    #i,sample一个epoch
    for i, sample in enumerate(train_data):
        
        imgdata = Variable(sample['img'].to(device))
        imglabel = Variable(sample['label'].long().to(device))

        optimizer.zero_grad()
        out = net(imgdata)
        out = F.log_softmax(out, dim=1)

        loss = criterion(out, imglabel)

        loss.backward()
        optimizer.step()
        train_loss = loss.item() + train_loss

        pre_label = out.max(dim=1)[1].data.cpu().numpy()
        pre_label = [i for i in pre_label]

        true_label = imglabel.data.cpu().numpy()
        true_label = [i for i in true_label]

        eval_metrix = eval_semantic_segmentation(pre_label, true_label)
        train_acc = eval_metrix['mean_class_accuracy'] + train_acc
        train_miou = eval_metrix['miou'] + train_miou
        
    net = net.eval()
    eval_loss = 0
    eval_acc = 0
    eval_miou = 0
    eval_class_acc = 0
    #一个验证机epoch
    for j, sample in enumerate(val_data):
        valImg = Variable(sample['img'].to(device))
        valLabel = Variable(sample['label'].long().to(device))

        out = net(valImg)
        out = F.log_softmax(out, dim=1)
        loss = criterion(out, valLabel)
        eval_loss = loss.item() + eval_loss
        pre_label = out.max(dim=1)[1].data.cpu().numpy()
        pre_label = [i for i in pre_label]

        true_label = valLabel.data.cpu().numpy()
        true_label = [i for i in true_label]

        eval_metrics = eval_semantic_segmentation(pre_label, true_label)
        eval_acc = eval_metrics['mean_class_accuracy'] + eval_acc
        eval_miou = eval_metrics['miou'] + eval_miou
    
    cur_time = datetime.now()
    h, remainder = divmod((cur_time - prec_time).seconds, 3600)
    m, s = divmod(remainder, 60)
    #LOG PRINT------------------------------------------------
    epoch_str = ('|Epoch|: {}\n|Train Loss|: {:.5f}\n|Train Acc|: {:.5f}\n|Train Mean IU|: {:.5f}\n'
                '|Valid Loss|: {:.5f}\n|Valid Acc|: {:.5f}\n|Valid Mean IU|: {:.5f}\n'.format(
                epoch, train_loss / len(train_data), train_acc / len(train_data), train_miou / len(train_data)
                ,eval_loss / len(train_data), eval_acc/len(val_data),
                eval_miou/len(val_data)))
    time_str = 'Time: {:.0f}:{:.0f}:{:.0f}'.format(h, m, s)
    print(epoch_str + "\n" + time_str)
    #SAVE------------------------------------------------------
    #eval_miou越大越好
    if (max(best) <= eval_miou/len(val_data)):
        best.append(eval_miou/len(val_data))
        t.save(net.state_dict(),  'version1.pth')

----------------------------TEST--------------------------------------

In [None]:
ls

In [31]:
import torch as t
import torch.nn.functional as F
from torch.autograd import Variable
from torch.utils.data import DataLoader


device = t.device('cuda') if t.cuda.is_available() else t.device('cpu')

BATCH_SIZE = 4
miou_list = [0]
test_data = DataLoader(Cam_test, batch_size=BATCH_SIZE, shuffle=True, num_workers=4)
net = FCN(32)
net.eval()
net.to(device)
net.load_state_dict(t.load('version1.pth'))

train_acc = 0
train_miou = 0
train_class_acc = 0
train_mpa = 0
error = 0

for i, sample in enumerate(test_data):
	data = Variable(sample['img']).to(device)
	label = Variable(sample['label']).to(device)
	out = net(data)
	out = F.log_softmax(out, dim=1)

	pre_label = out.max(dim=1)[1].data.cpu().numpy()
	pre_label = [i for i in pre_label]

	true_label = label.data.cpu().numpy()
	true_label = [i for i in true_label]

	eval_metrix = eval_semantic_segmentation(pre_label, true_label)
	train_acc = eval_metrix['mean_class_accuracy'] + train_acc
	train_miou = eval_metrix['miou'] + train_miou
	train_mpa = eval_metrix['pixel_accuracy'] + train_mpa
	if len(eval_metrix['class_accuracy']) < 12:
		eval_metrix['class_accuracy'] = 0
		train_class_acc = train_class_acc + eval_metrix['class_accuracy']
		error += 1
	else:
		train_class_acc = train_class_acc + eval_metrix['class_accuracy']

	print(eval_metrix['class_accuracy'], '================', i)


epoch_str = ('test_acc :{:.5f} ,test_miou:{:.5f}, test_mpa:{:.5f}, test_class_acc :{:}'.format(train_acc /(len(test_data)-error),
															train_miou/(len(test_data)-error), train_mpa/(len(test_data)-error),
															train_class_acc/(len(test_data)-error)))

if train_miou/(len(test_data)-error) > max(miou_list):
	miou_list.append(train_miou/(len(test_data)-error))
	print(epoch_str+'==========last')

Predict

In [32]:
device = t.device('cuda') if t.cuda.is_available() else t.device('cpu')

test_data = DataLoader(Cam_test, batch_size=1, shuffle=True, num_workers=4)

net = FCN(32).to(device)
net.load_state_dict(t.load("version1.pth"))
net.eval()

pd_label_color = pd.read_csv('/kaggle/input/camvid/CamVid/class_dict.csv', sep=',')
name_value = pd_label_color['name'].values
num_class = len(name_value)
colormap = []
for i in range(num_class):
	tmp = pd_label_color.iloc[i]
	color = []
	color.append(tmp['r'])
	color.append(tmp['g'])
	color.append(tmp['b'])
	colormap.append(color)

cm = np.array(colormap).astype('uint8')

dir = "/kaggle/working/output/"

for i, sample in enumerate(test_data):
	valImg = sample['img'].to(device)
	valLabel = sample['label'].long().to(device)
	out = net(valImg)
	out = F.log_softmax(out, dim=1)
	pre_label = out.max(1)[1].squeeze().cpu().data.numpy()
	pre = cm[pre_label]
	pre1 = Image.fromarray(pre)
	pre1.save(dir + str(i) + '.png')
	print('Done')