# **Semantic Segmentation of Aerial Imagery for Building Footprint Extraction Using U-Net: Part II**

## Install required packages
Before starting the building footprint extraction workflow using U-Net, we need to install a few essential Python libraries that are not included by default in Google Colab. These installations are done using the pip package manager, and the --quiet flag suppresses verbose output to keep the notebook clean.

In [None]:
# Install required packages
!pip install torchgeo --quiet
!pip install buildingregulariser --quiet
!pip install torchsummary --quiet

## Import required libraries

In [None]:
# Import libraries
import os
import numpy as np
import pandas as pd
import geopandas as gpd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, random_split
from torchsummary import summary
import matplotlib.pyplot as plt
import seaborn as sns
import rasterio
from rasterio import features
import shapely.geometry as shp_geom
from buildingregulariser import regularize_geodataframe
from sklearn.metrics import f1_score, jaccard_score
import scipy.ndimage as nd

## Mount Google Drive and set the working directory

In [None]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive/')
os.chdir('/content/drive/MyDrive/Chit_TR9207')

## Load the raster image and rasterize building footprints

In [None]:
# Load raster and geojson

# Define the path to the aerial image (GeoTIFF file)
img_path = 'TR9207a.tif'

# Open the raster image using rasterio
with rasterio.open(img_path) as src:
    transform = src.transform               # Get the affine transformation (pixel-to-coordinate mapping)
    height, width = src.height, src.width   # Extract image dimensions (rows and columns)
    raster_crs = src.crs                    # Get the coordinate reference system (CRS) of the raster
    image = src.read()                      # Read the raster data as a NumPy array (shape: [bands, height, width])

# Read building footprint polygons from the GeoJSON file into a GeoDataFrame
gdf = gpd.read_file('Bld_DSG_TR9207.geojson')

# Rasterize the vector geometries into a binary mask
# Each building polygon is assigned a value of 1, and the background is filled with 0
# Rasterize the vector geometries into a binary mask
# Each building polygon is assigned a value of 1, and the background is filled with 0
mask = features.rasterize(
    ((geom, 1) for geom in gdf.geometry),   # Generator of (geometry, value) pairs
    out_shape=(height, width),             # Output mask will match the raster's dimensions
    transform=transform,                   # Use the same affine transform as the raster
    fill=0,                                # Set background (non-building) pixels to 0
    dtype=np.uint8                         # Use 8-bit unsigned integer type for the mask
)

## Define the custom dataset for patch extraction

In [None]:
# Define a custom PyTorch dataset for aerial image building segmentation
class AerialBuildingDataset(torch.utils.data.Dataset):
    def __init__(self, image, mask):
        self.image = image                  # Multi-band aerial image (NumPy array of shape: [bands, height, width])
        self.mask = mask                    # Corresponding binary mask (NumPy array of shape: [height, width])
        self.patch_size = 256               # Define the patch size (e.g., 256x256 pixels)
        self.bands, self.height, self.width = image.shape  # Extract dimensions from the image

        # Generate top-left coordinates for each non-overlapping patch in the image
        self.patch_coords = [
            (i, j)
            for i in range(0, self.height - self.patch_size + 1, self.patch_size)
            for j in range(0, self.width - self.patch_size + 1, self.patch_size)
        ]

    def __len__(self):
        # Return the total number of patches available
        return len(self.patch_coords)

    def __getitem__(self, idx):
        # Get top-left corner (i, j) of the patch based on index
        i, j = self.patch_coords[idx]

        # Extract image patch and corresponding mask patch
        img_patch = self.image[:, i:i+self.patch_size, j:j+self.patch_size]   # Shape: [bands, patch_size, patch_size]
        mask_patch = self.mask[i:i+self.patch_size, j:j+self.patch_size]      # Shape: [patch_size, patch_size]

        # Convert both to PyTorch tensors
        img_patch = torch.tensor(img_patch, dtype=torch.float32)              # Float tensor for image
        mask_patch = torch.tensor(mask_patch, dtype=torch.long)              # Long tensor for mask (for classification)

        return img_patch, mask_patch

## Create the dataset and data loaders

In [None]:
# Create an instance of the custom dataset using the full image and mask
dataset = AerialBuildingDataset(image, mask)

# Define the training set size as 80% of the total dataset
train_size = int(0.8 * len(dataset))

# Define the test set size as the remaining 20%
test_size = len(dataset) - train_size

# Randomly split the dataset into training and testing subsets
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

# Create a DataLoader for the training set
# - batch_size=4: feed 4 patches at a time
# - shuffle=True: randomly shuffle data each epoch to improve generalization
# - drop_last=True: drop the last batch if it's smaller than the batch size
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True, drop_last=True)

# Create a DataLoader for the test set
# - shuffle=False: maintain original order for evaluation consistency
test_loader = DataLoader(test_dataset, batch_size=4, shuffle=False)

## Define the U-Net architecture

In [None]:
# Define U-Net model for semantic segmentation
class UNet(nn.Module):
    def __init__(self):
        super(UNet, self).__init__()

        # Helper function: Convolution -> ReLU -> Convolution -> ReLU
        def CBR(in_ch, out_ch):
            return nn.Sequential(
                nn.Conv2d(in_ch, out_ch, 3, padding=1),  # 1st Conv layer
                nn.ReLU(),                              # ReLU activation
                nn.Conv2d(out_ch, out_ch, 3, padding=1), # 2nd Conv layer
                nn.ReLU()                               # ReLU activation
            )

        # Encoder (Downsampling path)
        self.enc1 = CBR(3, 64)            # First encoder block (input: RGB image)
        self.pool1 = nn.MaxPool2d(2)      # Downsample by factor of 2
        self.enc2 = CBR(64, 128)          # Second encoder block
        self.pool2 = nn.MaxPool2d(2)
        self.enc3 = CBR(128, 256)         # Third encoder block
        self.pool3 = nn.MaxPool2d(2)
        self.enc4 = CBR(256, 512)         # Fourth encoder block
        self.pool4 = nn.MaxPool2d(2)

        # Bottleneck layer (deepest representation)
        self.middle = CBR(512, 1024)      # Captures most abstract features

        # Decoder (Upsampling path)
        self.up4 = nn.ConvTranspose2d(1024, 512, 2, 2)  # Upsample to match encoder-4
        self.dec4 = CBR(1024, 512)                      # Decoder block with skip connection from enc4

        self.up3 = nn.ConvTranspose2d(512, 256, 2, 2)   # Upsample to match encoder-3
        self.dec3 = CBR(512, 256)                       # Decoder block with skip connection from enc3

        self.up2 = nn.ConvTranspose2d(256, 128, 2, 2)   # Upsample to match encoder-2
        self.dec2 = CBR(256, 128)                       # Decoder block with skip connection from enc2

        self.up1 = nn.ConvTranspose2d(128, 64, 2, 2)    # Upsample to match encoder-1
        self.dec1 = CBR(128, 64)                        # Decoder block with skip connection from enc1

        # Final 1x1 convolution to produce 2-channel output (background vs building)
        self.final = nn.Conv2d(64, 2, kernel_size=1)

    def forward(self, x):
        # Encoder path (contracting)
        e1 = self.enc1(x)                # Output shape: [B, 64, H, W]
        e2 = self.enc2(self.pool1(e1))   # [B, 128, H/2, W/2]
        e3 = self.enc3(self.pool2(e2))   # [B, 256, H/4, W/4]
        e4 = self.enc4(self.pool3(e3))   # [B, 512, H/8, W/8]

        # Bottleneck
        m = self.middle(self.pool4(e4))  # [B, 1024, H/16, W/16]

        # Decoder path (expanding) with skip connections
        d4 = self.dec4(torch.cat([self.up4(m), e4], dim=1))  # [B, 512, H/8, W/8]
        d3 = self.dec3(torch.cat([self.up3(d4), e3], dim=1)) # [B, 256, H/4, W/4]
        d2 = self.dec2(torch.cat([self.up2(d3), e2], dim=1)) # [B, 128, H/2, W/2]
        d1 = self.dec1(torch.cat([self.up1(d2), e1], dim=1)) # [B, 64, H, W]

        # Final prediction map
        return self.final(d1)  # Output shape: [B, 2, H, W] (logits for 2 classes)

## Initialize the U-Net model, optimizer, and loss function

In [None]:
# Instantiate the U-Net model and move it to GPU if available, otherwise use CPU
model = UNet().to('cuda' if torch.cuda.is_available() else 'cpu')

# Automatically detect and store the device used (GPU or CPU) for future reference
device = next(model.parameters()).device

# Initialize the Adam optimizer to update model weights during training
optimizer = optim.Adam(model.parameters(), lr=1e-4)

# Define the loss function: CrossEntropyLoss is used for pixel-wise classification
criterion = nn.CrossEntropyLoss()

# Print a summary of the model architecture: layers, output shapes, and parameters
summary(model, input_size=(3, 256, 256))

## Training loop with early stopping

In [None]:
# Training loop with early stopping
# Initialize tracking metrics and early stopping parameters
train_losses = []
test_losses = []
train_accuracies = []
test_accuracies = []

best_val_loss = float('inf')  # Start with worst possible loss
early_stopping_counter = 0
patience = 5  # Allowed consecutive epochs without improvement
best_model_path = 'best_unet_model.pth'  # Path to save best model

for epoch in range(20):
    # Training phase
    model.train()
    total_loss, correct_train, total_train = 0, 0, 0
    for imgs, masks in train_loader:
        # Keep only RGB channels and move to device
        imgs, masks = imgs[:, :3, :, :].to(device), masks.to(device)

        # Forward pass and loss calculation
        optimizer.zero_grad()
        preds = model(imgs)
        loss = criterion(preds, masks)

        # Backpropagation and weight update
        loss.backward()
        optimizer.step()

        # Accumulate metrics
        total_loss += loss.item()
        pred_labels = torch.argmax(preds, dim=1)  # Get class indices
        correct_train += (pred_labels == masks).sum().item()
        total_train += masks.numel()  # Total pixels in batch

    # Calculate epoch training metrics
    train_acc = correct_train / total_train
    train_losses.append(total_loss / len(train_loader))
    train_accuracies.append(train_acc)

    # Evaluation phase
    model.eval()
    test_loss, correct_test, total_test = 0, 0, 0
    with torch.no_grad():  # Disable gradient computation
        for imgs, masks in test_loader:
            imgs, masks = imgs[:, :3, :, :].to(device), masks.to(device)
            preds = model(imgs)
            test_loss += criterion(preds, masks).item()

            # Calculate validation accuracy
            pred_labels = torch.argmax(preds, dim=1)
            correct_test += (pred_labels == masks).sum().item()
            total_test += masks.numel()

    # Calculate epoch validation metrics
    test_acc = correct_test / total_test
    avg_test_loss = test_loss / len(test_loader)
    test_losses.append(avg_test_loss)
    test_accuracies.append(test_acc)

    # Print progress
    print(f"Epoch {epoch+1}: Train Loss = {train_losses[-1]:.4f}, "
          f"Test Loss = {avg_test_loss:.4f}, "
          f"Train Acc = {train_acc:.4f}, Test Acc = {test_acc:.4f}")

    # Early stopping check
    if avg_test_loss < best_val_loss:
        best_val_loss = avg_test_loss
        early_stopping_counter = 0
        torch.save(model.state_dict(), best_model_path)
        print("Model improved. Saving best model.")
    else:
        early_stopping_counter += 1
        print(f"No improvement. Early stopping counter: {early_stopping_counter}/{patience}")
        if early_stopping_counter >= patience:
            print("Early stopping triggered.")
            break

## Visualize training and test loss

In [None]:
# Create a new figure with a custom size for better readability
plt.figure(figsize=(12, 5))

# Plot training and test loss over epochs in the first subplot
plt.subplot(1, 2, 1)  # (1 row, 2 columns, 1st subplot)
plt.plot(train_losses, label='Train Loss')  # Plot training loss values
plt.plot(test_losses, label='Test Loss')    # Plot test loss values
plt.xlabel('Epoch')                         # Label x-axis
plt.ylabel('Loss')                          # Label y-axis
plt.title('Training and Test Loss')         # Set title of the plot
plt.legend()                                # Add legend to distinguish the curves

## Visualize training and test accuracy

In [None]:
# Create the second subplot to plot accuracy values
plt.subplot(1, 2, 2)  # (1 row, 2 columns, 2nd subplot)
plt.plot(train_accuracies, label='Train Accuracy')  # Plot training accuracy over epochs
plt.plot(test_accuracies, label='Test Accuracy')    # Plot test accuracy over epochs
plt.xlabel('Epoch')                                 # Label x-axis as 'Epoch'
plt.ylabel('Accuracy')                              # Label y-axis as 'Accuracy'
plt.title('Training and Test Accuracy')             # Title for the accuracy plot
plt.legend()                                        # Show legend to differentiate curves

# Adjust subplot layout to prevent overlap
plt.tight_layout()

# Display the plots
plt.show()