### Se importan las librerías:

In [None]:
import numpy as np 
import pandas as pd

import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
from matplotlib.patches import Rectangle

from sklearn.model_selection import train_test_split

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms as T
import torchvision
import torch.nn.functional as F
from torch.autograd import Variable

from PIL import Image
import cv2
import albumentations as A

from torchgeo.datasets import LoveDA

from GPUtil import showUtilization as gpu_usage

import time
import os

import segmentation_models_pytorch as smp

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("PyTorch utiliza {}".format("GPU" if torch.cuda.is_available() else "CPU"))

### Constantes:

In [None]:
ROOT_PATH = './segmentation-loveDA/'

### Funciones Útiles:

In [None]:
def free_gpu_cache(verbose=False):
    if verbose:
        print("Initial GPU Usage")
        gpu_usage()                             

    torch.cuda.empty_cache()

    if verbose:
        print("GPU Usage after emptying the cache")
        gpu_usage()

In [None]:
class CustomDataset(LoveDA):

    def __getitem__(self, index: int):
        files = self.files[index]
        # Se lee la imagen según su indice
        image = cv2.imread(files["image"])
        # Convierte los colores de la imagen de BGR a RGB  
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        if self.split != "test":
            mask = cv2.imread(files["mask"], cv2.IMREAD_GRAYSCALE)
            sample = {"image": image, "mask": mask}
        else:
            sample = {"image": image}

        if self.transforms is not None:
            sample = self.transforms(**sample)

        sample['image'] = Image.fromarray(sample['image'])
        
        t = T.Compose([T.ToTensor()])
        sample['image'] = t(sample['image'])

        if self.split != "test":
            sample['mask'] = torch.from_numpy(sample['mask']).long()

        return sample

In [None]:
def pixel_accuracy(output, mask):
    with torch.no_grad():
        output = torch.argmax(F.softmax(output, dim=1), dim=1)
        correct = torch.eq(output, mask).int()
        accuracy = float(correct.sum()) / float(correct.numel())
    return accuracy

In [None]:
# mean intersection over union
def mIoU(pred_mask, mask, smooth=1e-10, n_classes=8):
    with torch.no_grad():
        pred_mask = F.softmax(pred_mask, dim=1)
        pred_mask = torch.argmax(pred_mask, dim=1)
        pred_mask = pred_mask.contiguous().view(-1)
        mask = mask.contiguous().view(-1)

        iou_per_class = []
        for clas in range(0, n_classes): #loop per pixel class
            true_class = pred_mask == clas
            true_label = mask == clas

            if true_label.long().sum().item() == 0: #no exist label in this loop
                iou_per_class.append(np.nan)
            else:
                intersect = torch.logical_and(true_class, true_label).sum().float().item()
                union = torch.logical_or(true_class, true_label).sum().float().item()

                iou = (intersect + smooth) / (union +smooth)
                iou_per_class.append(iou)
        return np.nanmean(iou_per_class)

In [None]:
def get_lr(optimizer):
    for param_group in optimizer.param_groups:
        return param_group['lr']

In [None]:
def fit(epochs, model, train_loader, val_loader, criterion, optimizer, scheduler):
    free_gpu_cache()
    train_losses = []
    val_losses = []
    val_iou = []
    val_acc = []
    train_iou = []
    train_acc = []
    lrs = []
    min_loss = np.inf
    not_improve = 0

    model.to(device)
    fit_time = time.time()
    for e in range(epochs):
        since = time.time()
        running_loss = 0
        iou_score = 0
        accuracy = 0
        #training loop
        model.train()
        for i, data in enumerate(train_loader):
            #training phase
            image_tiles = data['image']    
            mask_tiles = data['mask']      
            image = image_tiles.to(device)
            mask = mask_tiles.to(device)
            #forward
            output = model(image)
            loss = criterion(output, mask)
            #evaluation metrics
            iou_score += mIoU(output, mask)
            accuracy += pixel_accuracy(output, mask)
            #backward
            loss.backward()
            optimizer.step() #update weight          
            optimizer.zero_grad() #reset gradient
            
            #step the learning rate
            lrs.append(get_lr(optimizer))
            scheduler.step() 
            
            running_loss += loss.item()
            
        else:
            model.eval()
            val_loss = 0
            val_accuracy = 0
            val_iou_score = 0
            #validation loop
            with torch.no_grad():
                for i, data in enumerate(val_loader):
                    #reshape to 9 patches from single image, delete batch size
                    image_tiles = data['image']            
                    mask_tiles = data['mask']    
                    image = image_tiles.to(device)
                    mask = mask_tiles.to(device)
                    output = model(image)
                    #evaluation metrics
                    val_iou_score +=  mIoU(output, mask)
                    val_accuracy += pixel_accuracy(output, mask)
                    #loss
                    loss = criterion(output, mask)                                  
                    val_loss += loss.item()
            
            #calculatio mean for each batch
            train_losses.append(running_loss/len(train_loader))
            val_losses.append(val_loss/len(val_loader))                    

            if (val_loss/len(val_loader)) > min_loss:
                not_improve += 1
                min_loss = (val_loss/len(val_loader))
                print(f'Loss Not Decrease for {not_improve} time')
                if not_improve == 7:
                    print('Loss not decrease for 7 times, Stop Training')
                    break
            
            #iou
            val_iou.append(val_iou_score/len(val_loader))
            train_iou.append(iou_score/len(train_loader))
            train_acc.append(accuracy/len(train_loader))
            val_acc.append(val_accuracy/ len(val_loader))
            print("Epoch:{}/{}..".format(e+1, epochs),
                  "Train Loss: {:.3f}..".format(running_loss/len(train_loader)),
                  "Val Loss: {:.3f}..".format(val_loss/len(val_loader)),
                  "Train mIoU:{:.3f}..".format(iou_score/len(train_loader)),
                  "Val mIoU: {:.3f}..".format(val_iou_score/len(val_loader)),
                  "Train Acc:{:.3f}..".format(accuracy/len(train_loader)),
                  "Val Acc:{:.3f}..".format(val_accuracy/len(val_loader)),
                  "Time: {:.2f}m".format((time.time()-since)/60))
        
    history = {'train_loss' : train_losses, 'val_loss': val_losses,
               'train_miou' :train_iou, 'val_miou':val_iou,
               'train_acc' :train_acc, 'val_acc':val_acc,
               'lrs': lrs}
    print('Total time: {:.2f} m' .format((time.time()- fit_time)/60))
    return history

In [None]:
def predict_image_mask_miou(model, image, mask):
    model.eval()
    model.to(device)
    image=image.to(device)
    mask = mask.to(device)
    with torch.no_grad():
        
        image = image.unsqueeze(0)
        mask = mask.unsqueeze(0)
        
        output = model(image)
        score = mIoU(output, mask)
        masked = torch.argmax(output, dim=1)
        masked = masked.cpu().squeeze(0)
    return masked, score

In [None]:
def predict_image_mask_pixel(model, image, mask):
    model.eval()
    model.to(device)
    image=image.to(device)
    mask = mask.to(device)
    with torch.no_grad():
        
        image = image.unsqueeze(0)
        mask = mask.unsqueeze(0)
        
        output = model(image)
        acc = pixel_accuracy(output, mask)
        masked = torch.argmax(output, dim=1)
        masked = masked.cpu().squeeze(0)
    return masked, acc

In [None]:
def predict_image_mask(model, image):
    model.eval()
    model.to(device)
    image=image.to(device)
    with torch.no_grad():
        image = image.unsqueeze(0)
        
    output = model(image)
    masked = torch.argmax(output, dim=1)
    masked = masked.cpu().squeeze(0)
    return masked

In [None]:
def miou_score(model, test_set):
    score_iou = []
    for i in range(len(test_set)):
        img, mask = test_set[i]
        pred_mask, score = predict_image_mask_miou(model, img, mask)
        score_iou.append(score)
    return score_iou

In [None]:
def pixel_acc(model, test_set):
    accuracy = []
    for i in range(len(test_set)):
        img, mask = test_set[i]
        pred_mask, acc = predict_image_mask_pixel(model, img, mask)
        accuracy.append(acc)
    return accuracy

In [None]:
def plot_colortable(colors, title, sort_colors=True, emptycols=0):

    cell_width = 212
    cell_height = 22
    swatch_width = 48
    margin = 12
    topmargin = 40

    # Sort colors by hue, saturation, value and name.
    if sort_colors is True:
        by_hsv = sorted((tuple(mcolors.rgb_to_hsv(mcolors.to_rgb(color))),
                         name)
                        for name, color in colors.items())
        names = [name for hsv, name in by_hsv]
    else:
        names = list(colors)

    n = len(names)
    ncols = 4 - emptycols
    nrows = n // ncols + int(n % ncols > 0)

    width = cell_width * 4 + 2 * margin
    height = cell_height * nrows + margin + topmargin
    dpi = 72

    fig, ax = plt.subplots(figsize=(width / dpi, height / dpi), dpi=dpi)
    fig.subplots_adjust(margin/width, margin/height,
                        (width-margin)/width, (height-topmargin)/height)
    ax.set_xlim(0, cell_width * 4)
    ax.set_ylim(cell_height * (nrows-0.5), -cell_height/2.)
    ax.yaxis.set_visible(False)
    ax.xaxis.set_visible(False)
    ax.set_axis_off()
    ax.set_title(title, fontsize=24, loc="left", pad=10)

    for i, name in enumerate(names):
        row = i % nrows
        col = i // nrows
        y = row * cell_height

        swatch_start_x = cell_width * col
        text_pos_x = cell_width * col + swatch_width + 7

        ax.text(text_pos_x, y, name, fontsize=14,
                horizontalalignment='left',
                verticalalignment='center')

        ax.add_patch(
            Rectangle(xy=(swatch_start_x, y-9), width=swatch_width,
                      height=18, facecolor=colors[name], edgecolor='0.7')
        )

    return fig

### Preprocesamiento:

In [None]:
n_classes = 8

Crear los datasets:

In [None]:
transform_train = A.Compose([A.HorizontalFlip(), A.VerticalFlip(), A.GridDistortion(p=0.2), 
                             A.RandomBrightnessContrast((0,0.5),(0,0.5)), A.GaussNoise()])
transform_val = A.Compose([A.HorizontalFlip(), A.GridDistortion(p=0.2)])


#datasets
train_set = CustomDataset(root=ROOT_PATH, split="train", transforms=transform_train)
val_set = CustomDataset(root=ROOT_PATH, split="val", transforms=transform_val)
test_set = CustomDataset(root=ROOT_PATH, split="test")

#dataloader
batch_size= 1

train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=True)               

In [None]:
total_len = len(train_set) + len(val_set) + len(test_set)
print('Train Size:', len(train_set), '({}%)'.format(round((len(train_set)/total_len)*100, 2)))
print('Val Size:', len(val_set), '({}%)'.format(round((len(val_set)/total_len)*100, 2)))
print('Test Size:', len(test_set), '({}%)'.format(round((len(test_set)/total_len)*100, 2)))

In [None]:
data = train_set[5]

t = T.Compose([T.ToPILImage()])
data['image'] = t(data['image'])

print('Tamaño de imagen:', np.asarray(data['image']).shape)
print('Tamaño de máscara:', np.asarray(data['mask']).shape)

fig, (ax1, ax2) = plt.subplots(1,2, figsize=(20,10))
ax1.imshow(data['image'])
ax1.set_title('Imagen')

ax2.imshow(data['mask'])
ax2.set_title('Máscara')
ax2.set_axis_off()

### Modelo:

In [None]:
model = smp.Unet('mobilenet_v2', encoder_weights='imagenet', classes=n_classes, activation=None, 
    encoder_depth=5, decoder_channels=[256, 128, 64, 32, 16])

In [None]:
model

### Entrenamiento:

In [None]:
max_lr = 1e-3
epoch = 15
weight_decay = 1e-4

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=max_lr, weight_decay=weight_decay)
sched = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr, epochs=epoch,
                                            steps_per_epoch=len(train_loader))

history = fit(epoch, model, train_loader, val_loader, criterion, optimizer, sched)

In [None]:
torch.save(model, './models/segmentation-unet-2.pt')

In [None]:
plt.plot(history['val_loss'], label='val', marker='o')
plt.plot( history['train_loss'], label='train', marker='o')
plt.title('Loss per epoch')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(), plt.grid()
plt.show()
    
plt.plot(history['train_miou'], label='train_mIoU', marker='*')
plt.plot(history['val_miou'], label='val_mIoU',  marker='*')
plt.title('Score per epoch')
plt.ylabel('mean IoU')
plt.xlabel('epoch')
plt.legend(), plt.grid()
plt.show()

plt.plot(history['train_acc'], label='train_accuracy', marker='*')
plt.plot(history['val_acc'], label='val_accuracy',  marker='*')
plt.title('Accuracy per epoch')
plt.ylabel('Accuracy')
plt.xlabel('epoch')
plt.legend(), plt.grid()
plt.show()

### Test:

Cargar el modelo:

In [None]:
model = torch.load('./models/segmentation-unet-2.pt')

In [None]:
image, mask = test_set[5]
pred_mask, score = predict_image_mask_miou(model, image, mask)

In [None]:
fig, (ax1, ax2, ax3) = plt.subplots(1,3, figsize=(20,10))
ax1.imshow(image)
ax1.set_title('Imagen')

ax2.imshow(mask)
ax2.set_title('Máscara')
ax2.set_axis_off()

ax3.imshow(pred_mask)
ax3.set_title('Predicción de la máscara'.format(score))
ax3.set_axis_off()

In [None]:
mob_miou = miou_score(model, test_set)
print('Test Set mIoU', np.mean(mob_miou))

In [None]:
mob_acc = pixel_acc(model, test_set)
print('Test Set Pixel Accuracy', np.mean(mob_acc))

### Cargar imágen del proyecto:

In [None]:
# Se lee la imagen según su indice
image = cv2.imread('./data/02/ORTO.tif')
# Convierte los colores de la imagen de BGR a RGB  
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

transform = A.Resize(768, 1152, interpolation=cv2.INTER_NEAREST)

aug = transform(image=image)
image = Image.fromarray(aug['image'])

In [None]:
pred_mask = predict_image_mask(model, image)

In [None]:
fig, (ax1, ax2) = plt.subplots(1,2, figsize=(20,10))
ax1.imshow(image)
ax1.set_title('Imagen')

ax2.imshow(pred_mask)
ax2.set_title('Predicción de la máscara'.format(score))
ax2.set_axis_off()

In [None]:
data = pd.read_csv("./segmentation/class_dict_seg.csv") 
data = data.set_index('name')
data = data.to_dict()
colors = dict()
for key in data[' r'].keys():
    colors[key] = (data[' r'][key]/255, data[' g'][key]/255, data[' b'][key]/255)
plot_colortable(colors, "Colores de la segmentación:")
plt.show()