In [4]:
!pip install torchviz -q
!pip install -q segmentation-model-pytorch
!pip install -q torchsummary

In [2]:
import nummpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.model_selection import tran_test_split

import torch
import torch.nn as nn
from torch.utils.data import Dataset,DateLoader
from torchvision import transform as T
import torchvision
import torch.nn.function as F
from torch.autograd import Variable
from torchviz import make_dot

from PIL import Image
import cv2
import albumentations as A

import time
import os
from tqdm.notebook import tqdm

from torchsummary import summary
import segmentation_models_python as smp

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

## Data Processing

In [8]:
IMAGE_PATH = './mydata/images'
MASK_PATH = './mydata/labels'

In [None]:
n_class=23

In [None]:
# List all pictures in order of id 
def create_df():
    name = []
    for dirname, _, filnames in os.walk(IMAGE_PATH):
        for filename in filenames:
            name.append(filename.split('.')[0])
    return pd.DataFrame({'id': name}, index = np.arange(0, len(name)))
df = create_df()

In [None]:
# data split
X_trainval,X_test = train_test_split(df['id'].values, test_size = 0.1, random_state = 19)
X_train,X_val = train_test_split(X_trainval, test_size = 0.15, random_state = 19)          
print('Train Size : ', len(X_train))
print('Val Size : ', len(X_val))
print('Test Size : ', len(X_test))

In [None]:
for ii in range(3)
    imag = Iamge.open(IMAGE_PATH +df['id'][ii] + '.ipg')
    mask = Iamge.open(MASK_PATH +df['id'][ii] + '.png')
    
    print(np.unique(mask)) # Returns an array of all distinct pixel values in the mask image
    
    # 创建一个包含两个子图的图像窗口（1行，2列）
    fig, axs = plt.subplots(1, 2, figsize=(16,8))
    
    # 在子图中显示原始图像
    axs[0].imshow(img)
    axs[0].set_title('Original Image')
    
    axs[1].imshow(mask,cmap='gray', alpha=0.6)
    axs[1].set_title("Mask Image")
    
    # 移除子图中的 x 和 y 刻度
    for ax in axs:
        ax.set_xticks([])
        ax.set_yticks([])
    plt.show()

In [None]:
LABEL_NAMES = ['***', '***', ... ]

COLOR_MAPPING = {
    (0, 0, 0): 0,            # person
    (192, 128, 128): 1,      # Bike
    (0, 128, 0): 2,          # Car
    ...
}

def plot_color_mapping(COLOR_MAPPING, LABEL_NAMES):
    fig, ax = plt.subplots(1, 1, figsize=(5, 5))
    
    for i, ((r, g, b), class_id) in enumerate(COLOR_MAPPING.items()):
        color = [r/225, g/225, b/225]
        rect = plt.Rectang((0, i), 1, 1, color=color)
        ax.add_patch(rect)
        
        text = f"{class_id}: {LABEL_NAMES[class_id]}"
        ax.text(1.2, i+0.5, text, verticalalignment='center')
        
        ax.axis('off')
        
        ax.set_aspect('equal')
        
        ax.set_xlim([0, 2])
        ax.set_ylim([0, len(COLOR_MAPPING)])
        
        ax.invert_yaxis()
        
        plt.show()
        
plot_color_mapping(COLOR_MAPPING, LABEL_NAMES)        

In [None]:
# 彩色掩码图像 mask_rgb 转换为一个单通道的整数掩码图像 mask
def convert_mask(mask_rgb):
    mask = np.zeros((mask_rgb.shape[0], mask_rgb,shape[1]), dtype=np.uint8)
    
    for rgb, idx in COLOR_MAPPING,items():
        mask[(mask_rgb == rgb).all(axis=2)] =idx
    
    return mask

In [None]:
class Aeroscape_DroneDataset(Dataset):
    
    def _init_(self, img_path, msk_path, X, mean, std, transform=None, patch=False):
        self.img_path = img_path
        self.mask_path = mask_path
        self.X = X  # 图像文件名列表
        self.transform = transform # 数据增强变化
        self.patches = patch       # 裁剪图像为小块
        self.mean = mean           #标准化均值
        self.std = std             #标准化标准差
        
    def _len_(self):
        return len(self.X)        # 返回数据集中的样本数量
    
    def _getitem_(self, idx):
        img = cv2.imread(self.img_path + self.X[idx] + '.jpg')
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        mask_rgb = cv2.imred(self.mask_path + self.X[idx] + '.png') 
        mask = convert_mask(mask_rgb)
        
        if self.transform is not None:
            aug = self.transform(image=img, mask=mask)
            img = Image.fromarray(aug['image'])
            mask = aug['mask']
            
        if self.transform is None:
            img = Image.fromarray(img)
        # 将图像转换为 PyTorch 张量并进行标准化
        t = T.Compose([T.ToTensor(), T.Normalize(self.mean, self.std)])
        img = t(img)
        mask = torch.from_numpy(mask).long()
        
        if self.patches:
            img, mask = self.titles(img, mask) #裁剪
            
        return img, mask    

In [None]:
mean = [0.485, 0.456, 0.406] 
std = [0.229, 0.224, 0.225]

t_train = A.compose([A.resize(704,1056, interpolation=cv2.INTER_NEAREST), A.HorizontalFlip(), A.VerticalFlip(),
                   A.GridDistortion(p=0.2), A.RandomBrightnessContrast((0,0,5),(0,0,5)),
                   A.GaussNoise()])
t_val = A.compose([A.resize(704,1056, interpolation=cv2.INTER_NEAREST), A.HorizontalFlip(), 
                   A.GridDistortion(p=0.2)])

train_set = Aeroscape_DroneDataset(IMAGE_PATH, MASK_PATH, X_train, mean, std, t_train, patch=False)
t_val = Aeroscape_DroneDataset(IMAGE_PATH, MASK_PATH, X_val, mean, std, t_val, patch=False)

# dataloader
batch_size = 3

train_loader = DaraLoader(train_set, batch_size = batch_size, shuffle = True)
val_loader = DaraLoader(val_set, batch_size = batch_size, shuffle = True)

## Model Creating

In [None]:
model = smp.Unet('mobilenet_v2', encoder_weights= 'imagenet', classes=23, activation=None,
                encoder_depth=5, decoder_channels=[256, 128, 64, 32, 16])

## Model Training

In [None]:
def pixel_accuracy(output, mask):
    with torch.no_grad():
        output = torch.argmax(F.softmax(output, dim=1), dim=1)
        correct = torch.eq(output, mask).int()
        accuracy = float(correct.sum()) / float(correct.numel())
    return accuracy  

In [None]:
def mIoU(pred_mask, mask, smooth=1e-10, n_classes=23):
       with torch.no_grad():
        # 将模型输出的概率分布转换为类别标签    
        pred_mask =F.softmax(pred_mask, dim=1)
        pred_mask =torch.argmax(pred_mask, dim=1)
        
        #将预测掩码和真实掩码展平为一维数组
        pred_mask = pred_mask.contiguous().view(-1)
        mask = mask.contiguous().view(-1)
        
        iou_per_class = []
        for clas in range(0, n_classes):
            true_classes = pred_mask == clas
            true_label = mask == clas
            
            if true_label.long().sum().item() == 0:
                iou_per_class.append(np.nan)
            else:
                intersect = torch.logical_and(true_class, true_label).sum().float().item()
                union = torch.logical_or(true_class, true_label).sum().float().item()
                
                iou = (intersect + smooth) / (union + smooth)
                iou_per_class.append(iou)
        return np.nanmean(iou_per_class)          

In [None]:
def get_lr(optimizer):
    for param_group in optimizer.param_groups:
        return param_group['lr']
    
def fit(epochs, model, train_loader, val_loader, criterion, optimizer, scheduler,patch=False):
    torch.cuda.empty_cache( ) 
    train_losses = [] 
    test_losses = []
    val_iou = []; val_acc = []
    train_iou = []; train_acc = [] 
    lrs =[]
    min_loss = np.inf
    decrease = 1 ; not_improve=0 #用于计算损失下降次数用于计算损失不降低次数
    
    model.to(device)
    fit_time = time.time()
    for e in range(epochs):
        since = time.time() 
        running_loss =0 
        iou_score =0 
        accuracy =0
        # 设置模型为训练模式 
        model.train( )
        for i, data in enumerate(tqdm(train_loader)):
            #获取训练数据
            image_tiles, mask_tiles = data 
            if patch:
                bs, n_tiles, c, h, w = image_tiles.size()

                image_tiles = image_tiles.view(-1,c,h，w)
                mask_tiles = mask_tiles.view(-1，h, w)

            image = image_tiles.to(device); mask = mask_tiles.to(device);
            #前向传播
            output = model(image)
            loss = criterion(output, mask)
            #评估指标
            iou_score += mIoU(output,mask)
            accuracy += pixel_accuracy(output, mask)
            #反向传播和优化 
            loss.backward( )
            optimizer.step() #更新权重
            optimizer.zero_grad() #梯度清零
            #调整学习率
            lrs.append(get_lr(optimizer)) 
            scheduler.step( )

            running_loss += loss.item()
        
        else:
            model.eval() 
            test_loss = 0 
            test_accuracy = 0
            val_iou_score = 0
            #验证
            with torch.no_grad():
                for i, data in enumerate(tqdm(val_loader)):
                    #reshape to 9 patches from single image, delete batch size 
                    image_tiles, mask_tiles = data
                
                    if patch:
                        bs, n_tiles,c,h,w = image_tiles.size()

                        image_tiles = image_tiles.view(-1,c，h，w)
                        mask_tiles = mask_tiles.view(-1,h， w)

                    image = image_tiles.to(device); mask = mask_tiles.to(device);
                    output = model(image)
                    #评估指标
                    val_iou_score += mIoU(output,mask)
                    test_accuracy += pixel_accuracy(output, mask)
                    #loss
                    loss = criterion(output, mask) 
                    test_loss += loss.item()
                
            #记录每个批次的平均训练损失和验证损失
            train_losses.append(running_loss/len(train_loader)) 
            test_losses.append(test_loss/len(val_loader))
            

            #判断是否保存模型
            if min_loss > (test_loss/len(val_loader)):
                print('Loss Decreasing.. {:.3f} >> {:.3f} '.format(min_loss,(test_loss/len(val_loader)))) 
                min_loss = (test_loss/len(val_loader)) 
                decrease += 1
                if decrease %5 ==0:
                    print('saving model...')
                    torch.save(model,'Unet-Mobilenet_v2_mIoU-{:.3f}.pt'.format(val_iou_score/len(val_loader)))
            
            # 判断是否停止训练
            if (tesT_loss/len(val_loader))> min_loss:
                not_improve += 1
                min_loss = (test_loss/len(val_loader))
                print(f'Loss Not Decrease for {not_improve} time') 
                if not_improve == 7:
                    print('Loss not decrease for 7 times, Stop Training') 
                    break
                    
            #iou
            val_iou.append(val_iou_score/len(val_loader)) 
            train_iou.append(iou_score/len(train_loader)) 
            train_acc.append(accuracy/len(train_loader)) 
            val_acc.append(test_accuracy/ len(val_loader)) 
            print("Epoch:{}/{}..".format(e+1, epochs),
                  "Train Loss: {:.3f}..".format(running_loss/len(train_loader)),
                  "Val Loss: {:.3f}..".format(test_loss/len(val_loader)),
                  "Train mIou:{:.3f}..".format(iou_score/len(train_loader)),
                  "Val mIoU: {:.3f}..".format(val_iou_score/len(val_loader)),
                  "Train Acc:{:.3f}..".format(accuracy/len(train_loader)),
                  "Val Acc:{:.3f}..".format(test_accuracy/len(val_loader))，
                  "Time: {:.2f}m".format((time.time()-since)/60))

 
    history ={'train_loss': train_losses, 'val_loss': test_losses,
              'train_miou':train_iou, 'val_miou':val_iou,
              'train_acc' :train_acc, 'val_acc':val_acc,
              'lrs': lrs}
    print('Total time: {:.2f} m' .format((time.time()- fit_time)/60)) 
    return history


In [None]:
max_lr=1e-3 
epoch = 20
weight_decay =1e-4

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters()，lr=max_lr, weight_decay=weight_decay) 
sched = torch.optim.lr_scheduler.0neCycleLR(optimizer,max_ir, epochs=epoch,
                                            steps_per_epoch=len(train_loader))

history = fit(epoch, model, train_loader, val_loader, criterion, optimizer, sched)

In [None]:
torch.save(model, 'Unet-Mobilenet.pt')

In [None]:
def plot_loss(history):
    plt.plot(history['val_loss'], label='val', marker='o')
    plt.plot( history['train_loss']， label='train', marker='o') 
    plt.title('Loss per epoch'); plt.ylabel('loss'); 
    plt.xlabel('epoch' )
    plt.legend(), plt.grid()
    plt.show() 

def plot_score(history):
    plt.plot(history['train_miou'], label='train_miou', marker='*')
    plt.plot( history['val_miou']， label='train', marker='*') 
    plt.title('Score per epoch'); plt.ylabel('mean IoU'); 
    plt.xlabel('epoch' )
    plt.legend(), plt.grid()
    plt.show()   

def plot_acc(history):
    plt.plot(history['train_acc'], label='train_accuracy', marker='*')
    plt.plot( history['val_acc']， label='train_accuracy', marker='*') 
    plt.title('Accuracy per epoch'); plt.ylabel('Accuracy'); 
    plt.xlabel('epoch' )
    plt.legend(), plt.grid()
    plt.show()

## Model Evaluation

In [None]:
class DroneTestDataset(Dataset):
    
    def __init__(self, img_path, mask_path,X, transform=None):
        
        self.img_path = img_path 
        self.mask_path = mask_path 
        self.X = X
        self.transform = transform
        
    def _len__(self):
        return len(self.X)
    
    def __getitem_(self,idx):
        img = cv2.imread(self.img_path + self.X[idx] + '.jpg') 
        img =cv2.cvtColor(img，cv2.COLOR_BGR2RGB)
        #mask =cv2.imread(self.mask_path + self.X[idx] + '.png'，cv2.IMREAD_GRAYSCALE) 
        mask_rgb = cv2.imread(self.mask_path + self.x[idx] + '.png') 
        mask = convert_mask(mask_rgb)
        
        if self.transform is not None:
            aug = self.transform(image=img,mask=mask) 
            img = Image.fromarray(aug['image']) 
            mask = aug['mask']
            
        if self.transform is None:
            img = Image.fromarray(img)
            
        mask = torch.from_numpy(mask).long() 
        
        return img, mask

t_test = A.Resize(768，1152，interpolation=cv2.INTER_NEAREST)
test_set = DroneTestDataset(IMAGE_PATH，MASK_PATH，X_test,transform=t_test)

## Results Visualizasion

In [None]:
#预测函数
def predict_image_mask_miou(model, image, mask, mean=[0.485，0.456，0.406],std=[0.229，0.224，0.225]):
    model.eval()
    t=T.Compose([T.ToTensor(),T.Normalize(mean，std)]) 
    image = t(image)
    model.to(device); inage=image.to(device) 
    mask = mask.to(device) 
    with torch.no_grad( ):
         
            image = image.unsqueeze(0) 
            mask = mask.unsqueeze(0)
    
            output = model(image)
            score = mIoU(output, mask)
            masked = torch.argmax(output,dim=1) 
            masked = masked.cpu().squeeze(0) 
    return masked,score

In [None]:
def predict_image_mask_pixel(model,image, mask, mean=[0.485，0.456，0.406]，std=[0.229，0.224，0.225]):
    model.eval()
    t = T.Compose([T.ToTensor(),T.Normalize(mean,std)]) 
    image = t(image)
    model.to(device);image=image.to(device)
    mask = mask.to(device) 
    with torch.no_grad( ):
        
        image = image.unsqueeze(0) 
        mask = mask.unsqueeze(0)
        
        output = model(image)
        acc = pixel_accuracy(output, mask)
        masked = torch.argmax(output,dim=1)
        masked = masked.cpu().squeeze(0)
    return masked, acc

In [None]:
image,mask = test_set[1]
pred_mask, score = predict_image_mask_miou(model, image, mask)

In [None]:
def miou_score(model,test_set):
    score_iou = []
    for i in tqdm(range(len(test_set))):
        img, mask = test_set[i]
        pred_mask, score = predict_image_mask_miou(model, img, mask) 
        score_iou.append(score) 
    return score_iou

In [None]:
mob_miou = miou_score(model,test_set)

In [None]:
def pixel_acc(model,test_set):
    accurracy = []
    for i in tqdm(range(len(test_set))):
        img, mask = test_set[i]
        pred_mask, acc = predict_image_mask_pixel(model, img, mask) 
        score_iou.append(acc) 
    return accurracy

In [None]:
mob_acc = pixel_acc(model,test_set)

In [None]:
fig，(ax1，ax2，ax3) = plt.subplots(1,3,figsize=(20,10)) 
ax1.imshow(image)
ax1.set_title('Picture');

ax2.imshow(mask)
ax2.set_title('Ground truth') 
ax2.set_axis_off() 

ax3.imshow(pred_mask)
ax3.set_title('UNet-MobileNet| mIoU {:.3f}'.format(score)) 
ax3.set_axis_off()

In [None]:
image2,mask2 = test_set[1]
pred_mask2, score2 = predict_image_mask_pixel(model, image2, mask2)

fig，(ax1，ax2，ax3) = plt.subplots(1,3,figsize=(20,10)) 
ax1.imshow(image2)
ax1.set_title('Picture');

ax2.imshow(mask2)
ax2.set_title('Ground truth') 
ax2.set_axis_off() 

ax3.imshow(pred_mask2)
ax3.set_title('UNet-MobileNet| mIoU {:.3f}'.format(score2)) 
ax3.set_axis_off()

In [None]:
print('Test Set mioU', np.mean(mob_miou))

In [None]:
print('Test Set Pixel Accuracy', np.mean(mob_acc))