In [None]:
import os
import pickle
import time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm

from PIL import Image, ImageDraw, ImageFont
import cv2
import albumentations as A

import torchvision
import torch
import torch.nn as nn
import torch.nn.functional as F
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms as T
from torch.autograd import Variable
from torchsummary import summary
from torch.utils.data import Dataset, DataLoader

In [None]:
!pip install segmentation_models_pytorch



In [None]:
import segmentation_models_pytorch as smp

In [None]:
class DroneDataset(Dataset):

    def __init__(self, img_path, mask_path, X, mean, std, transform=None, patch=False, img_post='.jpg', mask_post='.png'):
        self.img_path = img_path
        self.mask_path = mask_path
        self.X = X
        self.transform = transform
        self.patches = patch
        self.mean = mean
        self.std = std
        self.img_post = img_post
        self.mask_post = mask_post

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        img = cv2.imread(self.img_path + '/' + self.X[idx] + self.img_post)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        mask = cv2.imread(self.mask_path + '/' + self.X[idx] + self.mask_post, cv2.IMREAD_GRAYSCALE)

        if self.transform is not None:
            aug = self.transform(image=img, mask=mask)
            img = Image.fromarray(aug['image'])
            mask = aug['mask']

        if self.transform is None:
            img = Image.fromarray(img)

        t = T.Compose([T.ToTensor(), T.Normalize(self.mean, self.std)])
        img = t(img)
        mask = torch.from_numpy(mask).long()

        if self.patches:
            img, mask = self.tiles(img, mask)

        return img, mask

    def tiles(self, img, mask):

        img_patches = img.unfold(1, 512, 512).unfold(2, 768, 768)
        img_patches = img_patches.contiguous().view(3, -1, 512, 768)
        img_patches = img_patches.permute(1, 0, 2, 3)

        mask_patches = mask.unfold(0, 512, 512).unfold(1, 768, 768)
        mask_patches = mask_patches.contiguous().view(-1, 512, 768)

        return img_patches, mask_patches


class DroneTestDataset(Dataset):

    def __init__(self, img_path, mask_path, X, transform=None, img_post='.jpg', mask_post='.png'):
        self.img_path = img_path
        self.mask_path = mask_path
        self.X = X
        self.transform = transform
        self.img_post = img_post
        self.mask_post = mask_post

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        img = cv2.imread(self.img_path + '/' + self.X[idx] + self.img_post)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        mask = cv2.imread(self.mask_path + '/' + self.X[idx] + self.mask_post, cv2.IMREAD_GRAYSCALE)

        if self.transform is not None:
            aug = self.transform(image=img, mask=mask)
            img = Image.fromarray(aug['image'])
            mask = aug['mask']

        if self.transform is None:
            img = Image.fromarray(img)

        mask = torch.from_numpy(mask).long()

        return img, mask

In [None]:
# Metric calculation helper fucntions

def pixel_accuracy(output, mask):
    with torch.no_grad():
        output = torch.argmax(F.softmax(output, dim=1), dim=1)
        correct = torch.eq(output, mask).int()
        accuracy = float(correct.sum()) / float(correct.numel())
    return accuracy


def mIoU(pred_mask, mask, smooth=1e-10, n_classes=23):
    with torch.no_grad():
        pred_mask = F.softmax(pred_mask, dim=1)
        pred_mask = torch.argmax(pred_mask, dim=1)
        pred_mask = pred_mask.contiguous().view(-1)
        mask = mask.contiguous().view(-1)

        iou_per_class = []
        for clas in range(0, n_classes): #loop per pixel class
            true_class = pred_mask == clas
            true_label = mask == clas

            if true_label.long().sum().item() == 0: #no exist label in this loop
                iou_per_class.append(np.nan)
            else:
                intersect = torch.logical_and(true_class, true_label).sum().float().item()
                union = torch.logical_or(true_class, true_label).sum().float().item()

                iou = (intersect + smooth) / (union +smooth)
                iou_per_class.append(iou)
        return np.nanmean(iou_per_class)



In [None]:
# Plotting helper functions

def plot_loss(history):
    plt.plot(history['val_loss'], label='val', marker='o')
    plt.plot(history['train_loss'], label='train', marker='o')
    plt.title('Loss per epoch');
    plt.ylabel('loss');
    plt.xlabel('epoch')
    plt.legend(), plt.grid()
    plt.show()


def plot_score(history):
    plt.plot(history['train_miou'], label='train_mIoU', marker='*')
    plt.plot(history['val_miou'], label='val_mIoU', marker='*')
    plt.title('Score per epoch');
    plt.ylabel('mean IoU')
    plt.xlabel('epoch')
    plt.legend(), plt.grid()
    plt.show()


def plot_acc(history):
    plt.plot(history['train_acc'], label='train_accuracy', marker='*')
    plt.plot(history['val_acc'], label='val_accuracy', marker='*')
    plt.title('Accuracy per epoch');
    plt.ylabel('Accuracy')
    plt.xlabel('epoch')
    plt.legend(), plt.grid()
    plt.show()

In [None]:
# Training helper functions


def get_lr(optimizer):
    for param_group in optimizer.param_groups:
        return param_group['lr']


def fit(epochs, model, train_loader, val_loader, criterion, optimizer, scheduler, device, model_name, model_save_folder, patch=False):
    torch.cuda.empty_cache()
    train_losses = []
    test_losses = []
    val_iou = []
    val_acc = []
    train_iou = []
    train_acc = []
    lrs = []
    min_loss = np.inf
    decrease = 1
    not_improve = 0

    model.to(device)
    fit_time = time.time()
    for e in range(epochs):
        since = time.time()
        running_loss = 0
        iou_score = 0
        accuracy = 0
        # training loop
        model.train()
        for i, data in enumerate(tqdm(train_loader)):
            # training phase
            image_tiles, mask_tiles = data
            if patch:
                bs, n_tiles, c, h, w = image_tiles.size()

                image_tiles = image_tiles.view(-1, c, h, w)
                mask_tiles = mask_tiles.view(-1, h, w)

            image = image_tiles.to(device);
            mask = mask_tiles.to(device);
            # forward
            output = model(image)
            loss = criterion(output, mask)
            # evaluation metrics
            iou_score += mIoU(output, mask)
            accuracy += pixel_accuracy(output, mask)
            # backward
            loss.backward()
            optimizer.step()  # update weight
            optimizer.zero_grad()  # reset gradient

            # step the learning rate
            lrs.append(get_lr(optimizer))
            scheduler.step()

            running_loss += loss.item()

        else:
            model.eval()
            test_loss = 0
            test_accuracy = 0
            val_iou_score = 0
            # validation loop
            with torch.no_grad():
                for i, data in enumerate(tqdm(val_loader)):
                    # reshape to 9 patches from single image, delete batch size
                    image_tiles, mask_tiles = data

                    if patch:
                        bs, n_tiles, c, h, w = image_tiles.size()

                        image_tiles = image_tiles.view(-1, c, h, w)
                        mask_tiles = mask_tiles.view(-1, h, w)

                    image = image_tiles.to(device);
                    mask = mask_tiles.to(device);
                    output = model(image)
                    # evaluation metrics
                    val_iou_score += mIoU(output, mask)
                    test_accuracy += pixel_accuracy(output, mask)
                    # loss
                    loss = criterion(output, mask)
                    test_loss += loss.item()

            # calculatio mean for each batch
            train_losses.append(running_loss / len(train_loader))
            test_losses.append(test_loss / len(val_loader))

            if min_loss > (test_loss / len(val_loader)):
                print('Loss Decreasing.. {:.3f} >> {:.3f} '.format(min_loss, (test_loss / len(val_loader))))
                min_loss = (test_loss / len(val_loader))
                decrease += 1
                if decrease % 5 == 0:
                    print('saving model...')
                    torch.save(model, f'{model_save_folder}/{model_name}_mIOU{str(round(val_iou_score/len(val_loader), 3)).split(".")[1]}_whole_model.pt')
                    torch.save(model.state_dict(), f'{model_save_folder}/{model_name}_mIOU{str(round(val_iou_score/len(val_loader), 3)).split(".")[1]}_state_dict.pt')


            if (test_loss / len(val_loader)) > min_loss:
                not_improve += 1
                min_loss = (test_loss / len(val_loader))
                print(f'Loss Not Decrease for {not_improve} time')
                if not_improve == 7:
                    print('Loss not decrease for 7 times, Stop Training')
                    break

            # iou
            val_iou.append(val_iou_score / len(val_loader))
            train_iou.append(iou_score / len(train_loader))
            train_acc.append(accuracy / len(train_loader))
            val_acc.append(test_accuracy / len(val_loader))
            print("Epoch:{}/{}..".format(e + 1, epochs),
                  "Train Loss: {:.3f}..".format(running_loss / len(train_loader)),
                  "Val Loss: {:.3f}..".format(test_loss / len(val_loader)),
                  "Train mIoU:{:.3f}..".format(iou_score / len(train_loader)),
                  "Val mIoU: {:.3f}..".format(val_iou_score / len(val_loader)),
                  "Train Acc:{:.3f}..".format(accuracy / len(train_loader)),
                  "Val Acc:{:.3f}..".format(test_accuracy / len(val_loader)),
                  "Time: {:.2f}m".format((time.time() - since) / 60))

    history = {'train_loss': train_losses, 'val_loss': test_losses,
               'train_miou': train_iou, 'val_miou': val_iou,
               'train_acc': train_acc, 'val_acc': val_acc,
               'lrs': lrs}
    print('Total time: {:.2f} m'.format((time.time() - fit_time) / 60))
    return history


def predict_image_mask_miou(model, image, mask, device, mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]):
    model.eval()
    t = T.Compose([T.ToTensor(), T.Normalize(mean, std)])
    image = t(image)
    model.to(device)
    image = image.to(device)
    mask = mask.to(device)
    with torch.no_grad():
        image = image.unsqueeze(0)
        mask = mask.unsqueeze(0)

        output = model(image)
        score = mIoU(output, mask)
        masked = torch.argmax(output, dim=1)
        masked = masked.cpu().squeeze(0)
    return masked, score


def predict_image_mask_pixel(model, image, mask, device, mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]):
    model.eval()
    t = T.Compose([T.ToTensor(), T.Normalize(mean, std)])
    image = t(image)
    model.to(device);
    image = image.to(device)
    mask = mask.to(device)
    with torch.no_grad():
        image = image.unsqueeze(0)
        mask = mask.unsqueeze(0)

        output = model(image)
        acc = pixel_accuracy(output, mask)
        masked = torch.argmax(output, dim=1)
        masked = masked.cpu().squeeze(0)
    return masked, acc


def miou_score(model, test_set, device):
    score_iou = []
    for i in tqdm(range(len(test_set))):
        img, mask = test_set[i]
        pred_mask, score = predict_image_mask_miou(model, img, mask, device)
        score_iou.append(score)
    return score_iou


def pixel_acc_score(model, test_set, device):
    accuracy = []
    for i in tqdm(range(len(test_set))):
        img, mask = test_set[i]
        pred_mask, acc = predict_image_mask_pixel(model, img, mask, device)
        accuracy.append(acc)
    return accuracy

In [None]:
# Utilitary functions

def create_df(image_path):
    name = []
    for dirname, _, filenames in os.walk(image_path):
        for filename in filenames:
            name.append(filename.split('.')[0])

    return pd.DataFrame({'id': name}, index=np.arange(0, len(name)))


def make_folder(dp):
    if not os.path.exists(dp):
        os.mkdir(dp)
    return


def open_class_csv(filepath):
    data = pd.read_csv(filepath)
    data.columns = ['name', 'r', 'g', 'b']

    # remove "conflicting" row
    data = data[data['name'] != 'conflicting']

    return data

def create_image_legend(class_to_rgb, class_labels):
    # Parameters for the layout
    square_size = 50  # Size of each square
    num_classes = len(class_to_rgb)
    legend_width = 200  # Width of the legend
    image_width = square_size + legend_width
    image_height = num_classes * square_size

    # Create a blank image
    img = Image.new('RGB', (image_width, image_height), color='white')
    draw = ImageDraw.Draw(img)

    # Add squares and corresponding labels
    for i, (class_id, rgb) in enumerate(class_to_rgb.items()):
        # Draw the square
        top_left = (0, i * square_size)
        bottom_right = (square_size, (i + 1) * square_size)
        draw.rectangle([top_left, bottom_right], fill=rgb)

        # Add text labels (class name)
        text_position = (square_size + 10, i * square_size + 10)
        draw.text(text_position, class_labels.loc[class_id]['name'], fill='black')

    # Save and display the image
    #img.show()
    img.save('colored_squares_with_legend.png')



In [None]:
# data access to google cloud
from google.colab import drive
drive.mount("/content/drive")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
#!ls
BASE = '/content/drive/MyDrive/ethiopia_project'

DATA_BASE = os.path.join(BASE, 'data/uav_graz/dataset/semantic_drone_dataset')
OUTPUT_BASE = os.path.join(BASE, 'output/uav_graz')

IMAGE_PATH = os.path.join(DATA_BASE, 'original_images')
MASK_PATH = os.path.join(DATA_BASE, 'label_images_semantic')

# DATA_BASE = r'C:\Users\PC\Coding\ethiopia_uav_segmentation\data\uav_addis_01'
# OUTPUT_BASE = r'C:\Users\PC\Coding\ethiopia_uav_segmentation\output\uav_addis_01'

# IMAGE_PATH = DATA_BASE
# MASK_PATH = DATA_BASE

# MODEL_BASE = r'C:\Users\PC\Coding\ethiopia_uav_segmentation\models'

print(DATA_BASE, OUTPUT_BASE)
print(IMAGE_PATH, MASK_PATH)

/content/drive/MyDrive/ethiopia_project/data/uav_graz/dataset/semantic_drone_dataset /content/drive/MyDrive/ethiopia_project/output/uav_graz
/content/drive/MyDrive/ethiopia_project/data/uav_graz/dataset/semantic_drone_dataset/original_images /content/drive/MyDrive/ethiopia_project/data/uav_graz/dataset/semantic_drone_dataset/label_images_semantic


In [None]:
# https://www.kaggle.com/code/ligtfeather/semantic-segmentation-is-easy-with-pytorch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

print(f'Training on device: {device}')

show_example_image = False

# create datasets from the provided imagery
n_classes = 23
df = create_df(image_path=IMAGE_PATH)
print('Total Images: ', len(df))

# create test and train datasets: training 76.5?%), testing (13.5%), validation (10%)
X_trainval, X_test = train_test_split(df['id'].values, test_size=0.1, random_state=19)
X_train, X_val = train_test_split(X_trainval, test_size=0.15, random_state=19)

print('Train Size   : ', len(X_train))
print('Val Size     : ', len(X_val))
print('Test Size    : ', len(X_test))

if show_example_image:
    img = Image.open(IMAGE_PATH + '/' + df['id'][100] + '.jpg')
    mask = Image.open(MASK_PATH + '/' + df['id'][100] + '.png')
    print('Image Size', np.asarray(img).shape)
    print('Mask Size', np.asarray(mask).shape)

    plt.imshow(img)
    plt.imshow(mask, alpha=0.6)
    plt.title('Picture with Mask Appplied')
    plt.show()

# what does this do?
mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]

# create a  datset for re-configuration of Droone Dataset?
t_train = A.Compose([A.Resize(704, 1056, interpolation=cv2.INTER_NEAREST), A.HorizontalFlip(), A.VerticalFlip(),
                      A.GridDistortion(p=0.2), A.RandomBrightnessContrast((0, 0.5), (0, 0.5)),
                      A.GaussNoise()])

t_val = A.Compose([A.Resize(704, 1056, interpolation=cv2.INTER_NEAREST), A.HorizontalFlip(),
                    A.GridDistortion(p=0.2)])

# apply drone dataset configurations
train_set = DroneDataset(IMAGE_PATH, MASK_PATH, X_train, mean, std, t_train, patch=False)
val_set = DroneDataset(IMAGE_PATH, MASK_PATH, X_val, mean, std, t_val, patch=False)

# dataloader
batch_size = 4

train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=True)

model_name = 'Unet-Mobilenet_v2_29102024'

# define model and hyperparameters
if 'Mobilenet' in model_name:
    model = smp.Unet('mobilenet_v2', encoder_weights='imagenet', classes=23, activation=None,
                      encoder_depth=5, decoder_channels=[256, 128, 64, 32, 16])
elif 'Resnet34' in model_name:
    model = smp.Unet('resnet34', encoder_weights='imagenet', classes=23, activation=None,
                      encoder_depth=5)
elif 'Resnext50' in model_name:
    model = smp.Unet('resnext50_32x4d', encoder_weights='imagenet', classes=23, activation=None,
                      encoder_depth=5)

max_lr = 1e-3
epoch = 50  #1  #6 #15
weight_decay = 1e-4

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=max_lr, weight_decay=weight_decay)
sched = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr, epochs=epoch,
                                            steps_per_epoch=len(train_loader))

model_save_folder = os.path.join(BASE, 'models/mobilenet_showcase')
make_folder(model_save_folder)

history = fit(epoch, model, train_loader, val_loader, criterion, optimizer, sched, device,
              model_name=model_name, model_save_folder=model_save_folder)


Total Images:  400
Train Size   :  306
Val Size     :  54
Test Size    :  40


 92%|█████████▏| 71/77 [59:42<04:55, 49.27s/it]

In [None]:
# Inference of the model

# https://pytorch.org/tutorials/beginner/saving_loading_models.html#saving-loading-model-across-devices

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

models_folder = os.listdir(MODEL_BASE)
print(f'available models: {models_folder}')

mode = 'whole_model' #'state_dict'  # 'whole_model'

#model_name = f'Unet-Mobilenet_v2_161024_mIoU385_{mode}.pt'
model_name = f'Unet-resnext50_32x4d_211024_mIOU572_{mode}.pt'
model_folder = 'resnext50'

# model_name = f'Unet-Resnet34_181024_mIoU454_{mode}.pt'

#model_name = f'Unet-resnext50_32x4d_211024_mIOU572_{mode}.pt'
#model_folder = 'resnext50'
MODEL_DICT_PATH = os.path.join(MODEL_BASE, model_folder, model_name)

# create saving directory
output_folder = os.path.join(OUTPUT_BASE, model_name.split('.')[0])
make_folder(output_folder)
make_folder(os.path.join(output_folder, 'predictions'))
make_folder(os.path.join(output_folder, 'predictions_plots'))

class_dict = open_class_csv(r'C:\Users\PC\Coding\ethiopia_uav_segmentation\data\uav_graz\class_dict_seg.csv')
class_colors = {i: (row['r'], row['g'], row['b']) for i, row in class_dict.iterrows()}

#create_image_legend(class_colors, class_dict)

load_state_dict = False
load_whole_model = not load_state_dict

plot_image = True
save_predictions = True
save_plots = True

print('Creating a DataSet')
n_classes = 23
image_hw = (704, 1056)
df = create_df(image_path=IMAGE_PATH)
print('Total Images: ', len(df))

# create test and train datasets: training 76.5?%), testing (13.5%), validation (10%)
test_size = 0.1
predict_all = True
if predict_all:
    test_size = 0.9

X_trainval, X_test = train_test_split(df['id'].values, test_size=test_size, random_state=19)

# load model
print('Loading model')
if 'Mobilenet' in model_name:
    model = smp.Unet('mobilenet_v2', encoder_weights='imagenet', classes=23, activation=None,
                      encoder_depth=5, decoder_channels=[256, 128, 64, 32, 16])
elif 'Resnet34' in model_name:
    model = smp.Unet('resnet34', encoder_weights='imagenet', classes=23, activation=None,
                      encoder_depth=5)
elif 'Resnext50' in model_name:
    model = smp.Unet('resnext50_32x4d', encoder_weights='imagenet', classes=23, activation=None,
                      encoder_depth=5)

print('Loading weights')
# load model state_dict
if load_state_dict:
    model.load_state_dict(torch.load(MODEL_DICT_PATH, weights_only=True))
    model.eval()

if load_whole_model:
    model = torch.load(MODEL_DICT_PATH, weights_only=False, map_location=device)
    model.eval()

# create test dataset
t_test = A.Resize(768, 1152, interpolation=cv2.INTER_NEAREST)
test_set = DroneTestDataset(IMAGE_PATH, MASK_PATH, X_test, transform=t_test, mask_post='.jpg')
#prediction_set = DroneTestDataset(IMAGE_PATH, MASK_PATH, X_trainval, transform=t_test)
pred_image_nr = None #[236] # None
if pred_image_nr is not None:
    test_set = DroneTestDataset(IMAGE_PATH, MASK_PATH, X_trainval, transform=t_test)


all_gt, all_pred = [], []

colored_image_np = np.zeros((768, 1152, 3), dtype=np.uint8)

print('Doing inference...')
for i, (image, mask) in tqdm(enumerate(test_set)):
    if pred_image_nr is not None:
        if i not in pred_image_nr:
            continue
    pred_mask, score = predict_image_mask_miou(model, image, mask, device)
    img_index = test_set.X[i]

    # color prediction mask
    pred_mask_np = pred_mask.numpy()

    for class_value, rgb in class_colors.items():
        colored_image_np[pred_mask_np == class_value] = rgb

    colored_image = Image.fromarray(colored_image_np)

    # calculate confusion values
    all_gt.append(mask.view(-1).cpu().numpy())
    all_pred.append(pred_mask.view(-1).cpu().numpy())

    color_ = True

    if save_predictions:
        if color_:
            colored_image.save(os.path.join(output_folder, 'predictions', f'{img_index}.png'))
        else:
            pred_mask_img = np.array(pred_mask, dtype=np.uint8)
            xx = Image.fromarray(pred_mask_img)
            xx.save(os.path.join(output_folder, 'predictions', f'{img_index}.png'))

    if plot_image:
        fig, (ax1, ax2, ax3) = plt.subplots(1, 3, figsize=(20, 10))
        ax1.imshow(image)
        ax1.set_title('Picture')

        ax2.imshow(mask)
        ax2.set_title('Ground truth')
        ax2.set_axis_off()

        ax3.imshow(pred_mask)
        ax3.set_title('UNet-MobileNet | mIoU {:.3f}'.format(score))
        ax3.set_axis_off()
        if save_plots:
            fig.savefig(os.path.join(output_folder, 'predictions_plots', f'{img_index}.png'))
        else:
            plt.show()

        plt.close(fig=fig)

gt_cm = np.concatenate(all_gt)
pred_cm = np.concatenate(all_pred)
cm = confusion_matrix(gt_cm, pred_cm)
cm_percentage = cm.astype('float') / cm.sum() * 100

annot = np.empty_like(cm_percentage, dtype=object)
for i in range(cm_percentage.shape[0]):
    for j in range(cm_percentage.shape[1]):
        if cm_percentage[i, j] == 0:
            annot[i, j] = '0'  # Display '0' for zero values
        else:
            annot[i, j] = f'{cm_percentage[i, j]:.1f}'  # Display two decimals for non-zero values

# classes: tree, gras, other vegetation, dirt, gravel, rocks, water, paved area, pool, person, dog, car, bicycle,
#          roof, wall, fence, fence-pole, window, door, obstacle
figcm, axcm = plt.subplots(1, 1, figsize=(10, 10))
sns.heatmap(cm_percentage, annot=annot, fmt='', cbar=True, cmap='Blues', xticklabels=class_dict['name'],
            yticklabels=class_dict['name'])
plt.savefig('cm.png')
