# Libraries

In [None]:
pip install -U albumentations

In [None]:
pip install segmentation-models-pytorch

In [None]:
import pandas as pd
import os
from sklearn.model_selection import train_test_split
from PIL import Image
import torch
from torch.utils.data import Dataset
from torchvision import transforms
from torch.utils.data import DataLoader
import albumentations as A
from albumentations.pytorch import ToTensorV2
import matplotlib.pyplot as plt
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
from tqdm import tqdm
import torch.optim as optim
from torch.optim import lr_scheduler
from torchinfo import summary
import cv2

# Data Preparation

In [None]:
# Load the dictionary that contains the classes for semantic segmentation
df = pd.read_csv('/kaggle/input/droneimages/class_dict_seg.csv')

## Here is created a list to store the image paths and a list to store the masks
# Directories for the original images
dataset1_original_images_dir = '/kaggle/input/droneimages/Dataset1/Dataset/original_images'
dataset2_original_images_dir = '/kaggle/input/droneimages/Dataset2/Dataset/original_images'

# Directories for the semantic label images
dataset1_label_images_semantic_dir = '/kaggle/input/droneimages/Dataset1/Dataset/label_images_semantic'
dataset2_label_images_semantic_dir = '/kaggle/input/droneimages/Dataset2/Dataset/label_images_semantic'

# Create a list to store the image paths
image_paths = []

# Add files from Dataset1
for root, dirs, files in os.walk(dataset1_original_images_dir):
    for file in files:
        if file.endswith(('jpg', 'png')):
            image_paths.append(os.path.join(root, file))

# Add files from Dataset2
for root, dirs, files in os.walk(dataset2_original_images_dir):
    for file in files:
        if file.endswith(('jpg', 'png')):
            image_paths.append(os.path.join(root, file))

# Convert the list into a DataFrame
df_images = pd.DataFrame(image_paths)
# Save the DataFrame to a CSV file
df_images.to_csv('/kaggle/working/image_paths.csv', index=False, header=False)

# Same procedure to create a list to store the masks (= labels)
label_paths = []

# Add label files from Dataset1
for root, dirs, files in os.walk(dataset1_label_images_semantic_dir):
    for file in files:
        if file.endswith(('jpg', 'png')):
            label_paths.append(os.path.join(root, file))

# Add label files from Dataset2
for root, dirs, files in os.walk(dataset2_label_images_semantic_dir):
    for file in files:
        if file.endswith(('jpg', 'png')):
            label_paths.append(os.path.join(root, file))

# Convert the list into a DataFrame
df_labels = pd.DataFrame(label_paths)
# Save the DataFrame to a CSV file
df_labels.to_csv('/kaggle/working/label_paths.csv', index=False, header=False)

## Create class_mapping and colors_list from the dictionary
num_classes = len(df)
class_mapping = {}
colors_list = []
for idx, row in df.iterrows():
    class_name = row['name'] # class' name
    class_mapping[class_name] = idx  # class index
    rgb = [row[" r"], row[" g"], row[" b"]] #memorize the color associated to the class
    rgb_normalized = [c / 255.0 for c in rgb] #normalize it
    colors_list.append(rgb_normalized)

## Here the aim is to create a unique structure to contain both the actual images and masks
# Extract just the file names
df_images['file_name'] = df_images[0].apply(lambda x: os.path.basename(x).split('.')[0])
df_labels['file_name'] = df_labels[0].apply(lambda x: os.path.basename(x).split('.')[0])

# Merge based on the file names
data = pd.merge(df_images, df_labels, on='file_name')

# Rename the columns based on the actual structure after merging
data.columns = ['image_path', 'file_name', 'label_path']

# Drop the unnecessary 'file_name' column since we have already paired the images and labels
data = data[['image_path', 'label_path']]

# Split the data into training set (90%) and validation set (10%)
train_data, val_data = train_test_split(data, test_size=0.10, random_state=42)

# Save these splits into CSV files
train_data.to_csv('/kaggle/working/train_set.csv', index=False, header=False)
val_data.to_csv('/kaggle/working/val_set.csv', index=False, header=False)

# Data Preprocessing

In [None]:
## The aim of this class is to load and prepare images and masks
## from the CSVs file for training the model
## It also manages the transformations
class DroneDataset(Dataset):
    def __init__(self, csv_file, transform=None):
        self.data = pd.read_csv(csv_file, header=None)
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_path = self.data.iloc[idx, 0]
        label_path = self.data.iloc[idx, 1]

        image = np.array(Image.open(img_path).convert("RGB"))
        mask = np.array(Image.open(label_path).convert("L"))

        if self.transform:
            augmented = self.transform(image=image, mask=mask)
            image = augmented['image']
            mask = augmented['mask']
        mask = mask.long()

        return image, mask

In [None]:
## Here is defined the training transformations for data augmentation, resizing, normalization
## and converting images and masks to tensors.
transform_train = A.Compose([
    A.Resize(128, 128),
    A.HorizontalFlip(p=0.5),
    A.VerticalFlip(p=0.5),
    A.RandomRotate90(p=0.5),
    A.RandomBrightnessContrast(p=0.4,brightness_limit=(-0.2, 0.2), contrast_limit=(-0.2, 0.2)),
    A.Perspective(scale=(0.02, 0.08), keep_size=True, p=0.3),
    A.Normalize(mean=(0.485, 0.456, 0.406),
                std=(0.229, 0.224, 0.225)),
    ToTensorV2()
])

# No data augmentation for validation set
transform_val = A.Compose([
    A.Resize(128, 128),
    A.Normalize(mean=(0.485, 0.456, 0.406),
                std=(0.229, 0.224, 0.225)),
    ToTensorV2()
])

In [None]:
# Create the datasets
trainset = DroneDataset(csv_file='/kaggle/working/train_set.csv',
                        transform=transform_train)

valset = DroneDataset(csv_file='/kaggle/working/val_set.csv',
                      transform=transform_val)

batch_size = 20

# Create the DataLoaders
trainloader = DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=4,pin_memory=True,prefetch_factor=4)
valloader = DataLoader(valset, batch_size=1, shuffle=False, num_workers=4,pin_memory=True,prefetch_factor=4)

# U-Net

In [None]:
class DoubleConv(nn.Module):

    def __init__(self, in_channels, out_channels, mid_channels=None):
        super().__init__()
        if not mid_channels:
            mid_channels = out_channels
        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, mid_channels, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(mid_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(mid_channels, out_channels, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        return self.double_conv(x)

In [None]:
class Down(nn.Module):
    """Downscaling with maxpool then double conv"""

    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.maxpool_conv = nn.Sequential(
            nn.MaxPool2d(2),
            DoubleConv(in_channels, out_channels)
        )

    def forward(self, x):
        return self.maxpool_conv(x)


In [None]:
class Up(nn.Module):
    """Upscaling then double conv"""

    def __init__(self, in_channels, out_channels, bilinear=True):
        super().__init__()

        # if bilinear, use the normal convolutions to reduce the number of channels
        if bilinear:
            self.up = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
            self.conv = DoubleConv(in_channels, out_channels, in_channels // 2)
        else:
            self.up = nn.ConvTranspose2d(in_channels, in_channels // 2, kernel_size=2, stride=2)
            self.conv = DoubleConv(in_channels, out_channels)

    def forward(self, x1, x2):
        x1 = self.up(x1)
        # input is CHW
        diffY = x2.size()[2] - x1.size()[2]
        diffX = x2.size()[3] - x1.size()[3]

        x1 = F.pad(x1, [diffX // 2, diffX - diffX // 2,
                        diffY // 2, diffY - diffY // 2])
        # if you have padding issues, see
        # https://github.com/HaiyongJiang/U-Net-Pytorch-Unstructured-Buggy/commit/0e854509c2cea854e247a9c615f175f76fbb2e3a
        # https://github.com/xiaopeng-liao/Pytorch-UNet/commit/8ebac70e633bac59fc22bb5195e513d5832fb3bd
        x = torch.cat([x2, x1], dim=1)
        return self.conv(x)


In [None]:
class OutConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(OutConv, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=1)

    def forward(self, x):
        return self.conv(x)


In [None]:
class UNet(nn.Module):
    def __init__(self, n_channels, n_classes, bilinear=False):
        super(UNet, self).__init__()
        self.n_channels = n_channels
        self.n_classes = n_classes
        self.bilinear = bilinear

        self.inc = (DoubleConv(n_channels, 64))
        self.down1 = (Down(64, 128))
        self.down2 = (Down(128, 256))
        self.down3 = (Down(256, 512))
        factor = 2 if bilinear else 1
        self.down4 = (Down(512, 1024 // factor))
        self.up1 = (Up(1024, 512 // factor, bilinear))
        self.up2 = (Up(512, 256 // factor, bilinear))
        self.up3 = (Up(256, 128 // factor, bilinear))
        self.up4 = (Up(128, 64, bilinear))
        self.outc = (OutConv(64, n_classes))

    def forward(self, x):
        x1 = self.inc(x)
        x2 = self.down1(x1)
        x3 = self.down2(x2)
        x4 = self.down3(x3)
        x5 = self.down4(x4)
        x = self.up1(x5, x4)
        x = self.up2(x, x3)
        x = self.up3(x, x2)
        x = self.up4(x, x1)
        logits = self.outc(x)
        return logits

## Ensemble Method

In [None]:
class EnsembleModel(nn.Module):
    def __init__(self, models):
        super(EnsembleModel, self).__init__()
        self.models = nn.ModuleList(models)

    def forward(self, x):
        # Prediction of each model
        outputs = [model(x) for model in self.models]
        # mean
        mean_output = torch.mean(torch.stack(outputs), dim=0)
        return mean_output

# Model

In [None]:
# Define the number of classes (= 24)
num_classes = df.shape[0]

# Initialize the model
model = UNet(n_channels=3, n_classes=num_classes, bilinear=False)

## Here is possible to define a U-Net Model with Transfer Learning
"""
model = smp.Unet(
    encoder_name="mobilenet_v2",
    encoder_weights="imagenet",
    in_channels=3,
    classes=num_classes,
)
"""

## Here is possible to define the Ensemble method
"""
res_unet = smp.Unet(encoder_name="resnet18", encoder_weights="imagenet", classes=num_classes, activation=None)
effnet = smp.Unet(encoder_name="efficientnet-b3", encoder_weights="imagenet", classes=num_classes, activation=None)
vggnet = smp.Unet(encoder_name="vgg11", encoder_weights="imagenet", classes=num_classes, activation=None)
model = EnsembleModel([res_unet, effnet, vggnet])
"""

###  Here is possible to upload the weights for Self Supervised Learning
"""
# Load the pretrained weights
pretrained_dict = torch.load('/kaggle/input/unetweightsssl/unet_model_weights.pth', map_location=device, weights_only=True)

# Important to remove 'module.' prefix from keys
pretrained_dict = {k.replace('module.', ''): v for k, v in pretrained_dict.items()}

# Get the current model state dict
model_dict = model.state_dict()

# Only update the matching keys from the pretrained dict
for k, v in pretrained_dict.items():
    if k in model_dict:
        model_dict[k] = v
        print(f"Weights for layer '{k}' have been loaded.")
    else:
        print(f"Skipping layer '{k}' because it wasn't in the pretext task's UNet.")

# Load the updated state dict into the model
model.load_state_dict(model_dict)
"""

# Define the loss function
criterion = nn.CrossEntropyLoss()

## Here is possible to set as loss the IoU Loss
#criterion = smp.losses.JaccardLoss(mode='multiclass')

# Define the optimizer
optimizer = optim.Adam(model.parameters(), lr=1e-3)

# Define the learning rate scheduler
scheduler = lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=5)

In [None]:
# Check if CUDA is available and count the number of GPUs
if torch.cuda.is_available():
    num_gpus = torch.cuda.device_count()
    print(f'Number of GPUs available: {num_gpus}')
    if num_gpus < 2:
        print("There are less than 2 GPUs detected.")
    device = torch.device('cuda:0')
else:
    device = torch.device('cpu')
    print('GPU is not available. Using CPU.')

In [None]:
## If multiple GPUs are available
if torch.cuda.is_available() and torch.cuda.device_count() > 1:
    print("Using DataParallel for multi-GPU training.")
    model = nn.DataParallel(model, device_ids=[0, 1])

## Move the model to device
model.to(device)
print(f'Model is using device: {device}')

if isinstance(model, nn.DataParallel):
    print(f'Model is parallelized on devices: {model.device_ids}')

In [None]:
## Calculate pixel-wise accuracy
def calculate_accuracy(preds, masks):
    correct = (preds == masks).float()
    acc = correct.sum() / correct.numel()
    return acc.item()


In [None]:
## Calculate the IoU for each class and return the mean of it
def calculate_iou(preds, masks, num_classes):

    ious = []

    # 1D arrays
    preds = preds.view(-1)
    masks = masks.view(-1)

    # For each class
    for cls in range(num_classes):
        # TRUE where predictions and masks match the current class, otherwise FALSE
        pred_inds = preds == cls
        target_inds = masks == cls

        # Intersection: both prediction and mask match the current class
        intersection = (pred_inds[target_inds]).long().sum().item()

        # Union: either prediction or mask matches the current class (Intersection must be considered once!!)
        union = pred_inds.long().sum().item() + target_inds.long().sum().item() - intersection

        # Avoid division for zero
        if union == 0:
            ious.append(float('nan'))
        else:
            # IoU = intersection / union
            ious.append(float(intersection) / float(max(union, 1)))

    # Ignore the NaN values
    ious = [iou for iou in ious if not np.isnan(iou)]

    return np.mean(ious[:-1]) if len(ious) > 0 else float('nan')


# Training

In [None]:
## Training

# Lists for recording training and validation data
train_losses = []
val_losses = []

train_ious = []
val_ious = []

train_accuracies = []
val_accuracies = []

# Training parameters
num_epochs = 150
# Start with no biased value
best_val_loss = float('inf')
# Number of epochs without improvement before stopping training
patience = 10
# Counter for early stopping
epochs_no_improve = 0
# Boolean for triggering early stopping
early_stop = False

for epoch in range(num_epochs):
    if early_stop:
        print("Early stopping activated. Stopping training.")
        break

    print(f'Epoch {epoch+1}/{num_epochs}')
    print('-' * 10)

    # Training Phase
    model.train()
    running_loss = 0.0
    running_iou = 0.0
    running_accuracy = 0.0
    for images, masks in tqdm(trainloader, desc='Training'):
        images = images.to(device)
        masks = masks.to(device)

        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(images)

        # Calculate loss
        loss = criterion(outputs, masks)

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, preds = torch.max(outputs, dim=1)
        # Calculate IoU
        iou = calculate_iou(preds, masks, num_classes)
        running_iou += iou
        # Calculate Pixel-Wise Accuracy
        accuracy = calculate_accuracy(preds, masks)
        running_accuracy += accuracy

    epoch_loss = running_loss / len(trainloader)
    epoch_iou = running_iou / len(trainloader)
    epoch_accuracy = running_accuracy / len(trainloader)
    train_losses.append(epoch_loss)
    train_ious.append(epoch_iou)
    train_accuracies.append(epoch_accuracy)
    print(f'Training - Loss: {epoch_loss:.4f}, IoU: {epoch_iou:.4f}, Accuracy: {epoch_accuracy:.4f}')


    # Validation Phase
    model.eval()
    val_running_loss = 0.0
    val_running_iou = 0.0
    val_running_accuracy = 0.0
    with torch.no_grad():
        for images, masks in tqdm(valloader, desc='Validation'):
            images = images.to(device)
            masks = masks.to(device)

            # Forward pass
            outputs = model(images)

            # Calculate loss
            loss = criterion(outputs, masks)
            val_running_loss += loss.item()

            _, preds = torch.max(outputs, dim=1)
            # Calculate IoU
            iou = calculate_iou(preds, masks, num_classes)
            val_running_iou += iou
            # Calculate Pixel-Wise Accuracy
            accuracy = calculate_accuracy(preds, masks)
            val_running_accuracy += accuracy

    val_epoch_loss = val_running_loss / len(valloader)
    val_epoch_iou = val_running_iou / len(valloader)
    val_epoch_accuracy = val_running_accuracy / len(valloader)
    val_losses.append(val_epoch_loss)
    val_ious.append(val_epoch_iou)
    val_accuracies.append(val_epoch_accuracy)

    print(f'Validation - Loss: {val_epoch_loss:.4f}, IoU: {val_epoch_iou:.4f}, Accuracy: {val_epoch_accuracy:.4f}')

    # Update the scheduler with the validation loss
    scheduler.step(val_epoch_loss)

    # Early Stopping Phase
    if val_epoch_loss < best_val_loss:
        best_val_loss = val_epoch_loss
        epochs_no_improve = 0
        torch.save(model.state_dict(), 'best_unet_model.pth')
        print('Model saved!')
    else:
        epochs_no_improve += 1
        print(f'Validation loss did not improve for {epochs_no_improve} epoch(s).')

    # Early Stopping
    if epochs_no_improve >= patience:
        print(f'Early stopping activated. No improvement in validation loss for {patience} consecutive epochs.')
        early_stop = True

    print()

In [None]:
# Actual number of epochs
actual_epochs = len(train_losses)

epochs = range(1, actual_epochs + 1)
plt.figure(figsize=(18, 6))

# Loss Plot
plt.subplot(1, 3, 1)
plt.plot(epochs, train_losses, 'b-', label='Training Loss')
plt.plot(epochs, val_losses, 'r-', label='Validation Loss')
plt.title('Loss per Epoch')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)

# IoU Plot
plt.subplot(1, 3, 2)
plt.plot(epochs, train_ious, 'b-', label='Training IoU')
plt.plot(epochs, val_ious, 'r-', label='Validation IoU')
plt.title('IoU per Epoch')
plt.xlabel('Epoch')
plt.ylabel('IoU')
plt.legend()
plt.grid(True)

# Accuracy Plot
plt.subplot(1, 3, 3)
plt.plot(epochs, train_accuracies, 'b-', label='Training Accuracy')
plt.plot(epochs, val_accuracies, 'r-', label='Validation Accuracy')
plt.title('Accuracy per Epoch')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.show()

plt.savefig('training_validation_metrics.png')


# Landing and Visualitation

In [None]:
# Function that manages both the visualitation of the predicted mask and determines the landing point for each image
def visualize_and_landing_point(model, dataloader, device, num_classes, colors_list, class_mapping, non_sensitive_classes, output_dir):

    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # Assigns colors to each class in the mask
    def decode_mask(mask, colors_list):
        color_mask = np.zeros((mask.shape[0], mask.shape[1], 3))
        for cls in range(len(colors_list)):
            color_mask[mask == cls] = colors_list[cls]
        return color_mask

    all_classes = set(class_mapping.keys())
    non_sensitive_set = set(non_sensitive_classes)
    sensitive_classes = list(all_classes - non_sensitive_set)

    # For each image in the validation set: the landing point
    for idx, (images, masks) in enumerate(dataloader):

        image = images[0].to(device)
        mask = masks[0].to(device)

        image_batch = image.unsqueeze(0)

        with torch.no_grad():
            output = model(image_batch)

        _, preds = torch.max(output, dim=1)

        image_np = image.cpu().numpy().transpose(1, 2, 0)
        mask_np = mask.cpu().numpy()
        preds_np = preds.squeeze().cpu().numpy()

        mean = np.array([0.485, 0.456, 0.406])
        std = np.array([0.229, 0.224, 0.225])
        # the original image is recreated
        image_np = std * image_np + mean
        image_np = np.clip(image_np, 0, 1)

        # the right colors
        mask_color = decode_mask(mask_np, colors_list)
        preds_color = decode_mask(preds_np, colors_list)

        # from names to idx
        sensitive_indices = [class_mapping[cls] for cls in sensitive_classes]
        # create the mask where 1 sensitive, 0 not-sensitive
        sensitive_mask = np.isin(preds_np, sensitive_indices).astype(np.uint8)



        # Edges are sensible as well
        border_thickness = 3
        H, W = preds_np.shape
        sensitive_mask[0:border_thickness, :] = 1
        sensitive_mask[H - border_thickness:H, :] = 1
        sensitive_mask[:, 0:border_thickness] = 1
        sensitive_mask[:, W - border_thickness:W] = 1


        inverted_mask = 1 - sensitive_mask
        # For each pixel with 1 (no sensitive) calculates the distance from the nearest pixel with 0 (sensitive)
        distance_map = cv2.distanceTransform(inverted_mask, distanceType=cv2.DIST_L2, maskSize=5)

        # The maximum distance is
        max_dist = np.max(distance_map)
        # The coordinates of the pixel at maximum distance is
        max_pos = np.unravel_index(np.argmax(distance_map), distance_map.shape)
        print(f"Image: {idx+1}/{len(dataloader)}, Landing Point: {max_pos}, Distance: {max_dist}")

        landing_point_image = image_np.copy()
        landing_point_image = (landing_point_image * 255).astype(np.uint8)

        # "X" in red
        x_size = 5
        color_x = (255, 0, 0)
        thickness_x = 2
        x, y = max_pos[1], max_pos[0]
        cv2.line(landing_point_image_uint8, (x - x_size, y - x_size), (x + x_size, y + x_size), color_x, thickness=thickness_x)
        cv2.line(landing_point_image_uint8, (x - x_size, y + x_size), (x + x_size, y - x_size), color_x, thickness=thickness_x)


        fig, axs = plt.subplots(1, 4, figsize=(24, 6))
        axs[0].imshow(image_np)
        axs[0].set_title('Original Image')
        axs[0].axis('off')

        axs[1].imshow(mask_color)
        axs[1].set_title('Mask')
        axs[1].axis('off')

        axs[2].imshow(preds_color)
        axs[2].set_title('Prediction')
        axs[2].axis('off')

        axs[3].imshow(landing_point_image)
        axs[3].set_title('Landing Point')
        axs[3].axis('off')

        plt.tight_layout()

        output_path = os.path.join(output_dir, f'prediction_{idx+1}.png')
        plt.savefig(output_path)
        plt.close(fig)

    print(f"The images are saved in {output_dir}")

In [None]:
model.load_state_dict(torch.load('/kaggle/working/best_unet_model.pth'))
model.eval()

non_sensitive_classes = ['grass','paved-area']
output_directory = '/kaggle/working/OutputImages'

visualize_and_landing_point(
    model=model,
    dataloader=valloader,
    device=device,
    num_classes=len(class_mapping),
    colors_list=colors_list,
    class_mapping=class_mapping,
    non_sensitive_classes=non_sensitive_classes,
    output_dir=output_directory
)