In [None]:
!pip install ptflops

In [None]:
!pip install albumentations

In [None]:
!pip install -U fvcore

Image preprocessing for training and validation datasets.
The transformations include:
1. Resizing all input images to 512x1024 pixels.
2. Normalizing pixel values using ImageNet statistics
 (mean and standard deviation per channel).
3. Converting images to PyTorch tensors

In [None]:
## Transform ##

train_transform = A.Compose([ A.Resize(height=512, width=1024),A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)), ToTensorV2()
])

val_transform = A.Compose([A.Resize(height=512, width=1024), A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)), ToTensorV2()
])

Set Up and Environment

In [None]:
## Import and setup ##
import os, time, zipfile, gc, glob
import numpy as np
from PIL import Image
from collections import deque

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

from torchvision import transforms
import albumentations as A
from albumentations.pytorch import ToTensorV2

from ptflops import get_model_complexity_info
from google.colab import drive
from torchvision import models
import warnings

from fvcore.nn import FlopCountAnalysis, flop_count_table


drive.flush_and_unmount()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
drive.mount("/content/drive", force_remount=True)


Drive not mounted, so nothing to flush and unmount.
Mounted at /content/drive


Definition Bisenet model with backbone ResNet18 o ResNet101

In [None]:
## Bisenet model with backbone ResNet18 or ResNet101 ##

class resnet18(nn.Module):
    def __init__(self, pretrained=True):
        super().__init__()
        self.features = models.resnet18(pretrained=pretrained)
        self.conv1 = self.features.conv1
        self.bn1 = self.features.bn1
        self.relu = self.features.relu
        self.maxpool1 = self.features.maxpool
        self.layer1 = self.features.layer1
        self.layer2 = self.features.layer2
        self.layer3 = self.features.layer3
        self.layer4 = self.features.layer4

    def forward(self, input):
        x = self.conv1(input)
        x = self.relu(self.bn1(x))
        x = self.maxpool1(x)
        feature1 = self.layer1(x)
        feature2 = self.layer2(feature1)
        feature3 = self.layer3(feature2)
        feature4 = self.layer4(feature3)
        tail = torch.mean(feature4, 3, keepdim=True)
        tail = torch.mean(tail, 2, keepdim=True)
        return feature3, feature4, tail


class resnet101(nn.Module):
    def __init__(self, pretrained=True):
        super().__init__()
        self.features = models.resnet101(pretrained=pretrained)
        self.conv1 = self.features.conv1
        self.bn1 = self.features.bn1
        self.relu = self.features.relu
        self.maxpool1 = self.features.maxpool
        self.layer1 = self.features.layer1
        self.layer2 = self.features.layer2
        self.layer3 = self.features.layer3
        self.layer4 = self.features.layer4

    def forward(self, input):
        x = self.conv1(input)
        x = self.relu(self.bn1(x))
        x = self.maxpool1(x)
        feature1 = self.layer1(x)
        feature2 = self.layer2(feature1)
        feature3 = self.layer3(feature2)
        feature4 = self.layer4(feature3)
        tail = torch.mean(feature4, 3, keepdim=True)
        tail = torch.mean(tail, 2, keepdim=True)
        return feature3, feature4, tail


def build_contextpath(name):
    model = {
        'resnet18': resnet18(pretrained=True),
        'resnet101': resnet101(pretrained=True)
    }
    return model[name]


class ConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size=3, stride=2, padding=1):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=kernel_size,
                               stride=stride, padding=padding, bias=False)
        self.bn = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU()

    def forward(self, input):
        x = self.conv1(input)
        return self.relu(self.bn(x))


class Spatial_path(nn.Module):
    def __init__(self):
        super().__init__()
        self.convblock1 = ConvBlock(3, 64)
        self.convblock2 = ConvBlock(64, 128)
        self.convblock3 = ConvBlock(128, 256)

    def forward(self, input):
        x = self.convblock1(input)
        x = self.convblock2(x)
        x = self.convblock3(x)
        return x


class AttentionRefinementModule(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.in_channels = in_channels # Add this line
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=1)
        self.bn = nn.BatchNorm2d(out_channels)
        self.sigmoid = nn.Sigmoid()
        self.in_channels = in_channels
        self.avgpool = nn.AdaptiveAvgPool2d(output_size=(1, 1))

    def forward(self, input):
        # global average pooling
        x = self.avgpool(input)
        assert self.in_channels == x.size(1), 'in_channels and out_channels should all be {}'.format(x.size(1))
        x = self.conv(x)
        x = self.sigmoid(self.bn(x))
        # x = self.sigmoid(x)
        # channels of input and x should be same
        return torch.mul(input, x)


class FeatureFusionModule(nn.Module):
    def __init__(self, num_classes, in_channels):
        super().__init__()
        # self.in_channels = input_1.channels + input_2.channels
        # resnet101 3328 = 256(from spatial path) + 1024(from context path) + 2048(from context path)
        # resnet18  1024 = 256(from spatial path) + 256(from context path) + 512(from context path)
        self.in_channels = in_channels

        self.convblock = ConvBlock(in_channels, num_classes, stride=1)
        self.conv1 = nn.Conv2d(num_classes, num_classes, kernel_size=1)
        self.relu = nn.ReLU()
        self.conv2 = nn.Conv2d(num_classes, num_classes, kernel_size=1)
        self.sigmoid = nn.Sigmoid()
        self.avgpool = nn.AdaptiveAvgPool2d(output_size=(1, 1))

    def forward(self, input_1, input_2):
        x = torch.cat((input_1, input_2), dim=1)
        assert self.in_channels == x.size(1), 'in_channels of ConvBlock should be {}'.format(x.size(1))
        feature = self.convblock(x)

        x = self.avgpool(feature)
        x = self.relu(self.conv1(x))
        x = self.sigmoid(self.conv2(x))
        x = torch.mul(feature, x)
        return torch.add(x, feature)


class BiSeNet(nn.Module):
    def __init__(self, num_classes, context_path='resnet18'):
        super().__init__()
        # build spatial path
        self.saptial_path = Spatial_path()

        # build context path
        self.context_path = build_contextpath(context_path)

        # build attention refinement module  for resnet 101
        if context_path == 'resnet101':
            self.attention_refinement_module1 = AttentionRefinementModule(1024, 1024)
            self.attention_refinement_module2 = AttentionRefinementModule(2048, 2048)
            # supervision block
            self.supervision1 = nn.Conv2d(1024, num_classes, kernel_size=1)
            self.supervision2 = nn.Conv2d(2048, num_classes, kernel_size=1)
            # build feature fusion module
            self.feature_fusion_module = FeatureFusionModule(num_classes, 3328)

        elif context_path == 'resnet18':
             # build attention refinement module  for resnet 18
            self.attention_refinement_module1 = AttentionRefinementModule(256, 256)
            self.attention_refinement_module2 = AttentionRefinementModule(512, 512)
            # supervision block
            self.supervision1 = nn.Conv2d(256, num_classes, kernel_size=1)
            self.supervision2 = nn.Conv2d(512, num_classes, kernel_size=1)
            # build feature fusion module
            self.feature_fusion_module = FeatureFusionModule(num_classes, 1024)

        # build final convolution
        self.conv = nn.Conv2d(num_classes, num_classes, kernel_size=1)
        self.init_weight()

    def init_weight(self):
        for name, m in self.named_modules():
            if 'context_path' not in name:
                if isinstance(m, nn.Conv2d):
                    nn.init.kaiming_normal_(m.weight, mode='fan_in', nonlinearity='relu')
                elif isinstance(m, nn.BatchNorm2d):
                    m.eps = 1e-5
                    m.momentum = 0.1
                    nn.init.constant_(m.weight, 1)
                    nn.init.constant_(m.bias, 0)

    def forward(self, input):
        # output of spatial path
        sx = self.saptial_path(input)

        # output of context path
        cx1, cx2, tail = self.context_path(input)

        cx1 = self.attention_refinement_module1(cx1)
        cx2 = self.attention_refinement_module2(cx2)
        cx2 = torch.mul(cx2, tail)
        # upsampling
        cx1 = torch.nn.functional.interpolate(cx1, size=sx.size()[-2:], mode='bilinear')
        cx2 = torch.nn.functional.interpolate(cx2, size=sx.size()[-2:], mode='bilinear')
        cx = torch.cat((cx1, cx2), dim=1)

        if self.training:
            cx1_sup = self.supervision1(cx1)
            cx2_sup = self.supervision2(cx2)
            cx1_sup = torch.nn.functional.interpolate(cx1_sup, size=input.size()[-2:], mode='bilinear')
            cx2_sup = torch.nn.functional.interpolate(cx2_sup, size=input.size()[-2:], mode='bilinear')

        # output of feature fusion module
        result = self.feature_fusion_module(sx, cx)

        # upsampling
        result = torch.nn.functional.interpolate(result, scale_factor=8, mode='bilinear')
        result = self.conv(result)

        if self.training:
            return result, cx1_sup, cx2_sup

        return result


def get_bisenet_model(num_classes, context_path='resnet18'):
    return BiSeNet(num_classes=num_classes, context_path=context_path)

Cityscapes Dataset and DataLoader Setup.

This section defines a custom PyTorch Dataset class for loading the Cityscapes
dataset.
DataLoaders are then created for both training and validation.

In [None]:
## Dataset Cityscapes ##

class CityscapesDataset(Dataset):
    def __init__(self, root, split="train", transform=None):
        self.transform = transform
        self.images = sorted(glob.glob(f"{root}/images/{split}/**/*.png", recursive=True))
        self.labels = sorted(glob.glob(f"{root}/gtFine/{split}/**/*_labelTrainIds.png", recursive=True))

        min_lenght = min(len(self.images), len(self.labels))
        self.images = self.images[:min_lenght]
        self.labels = self.labels[:min_lenght]

    def __len__(self):
        # Return the number of samples
        return len(self.images)

    def __getitem__(self, idx):
        img = np.array(Image.open(self.images[idx]).convert("RGB"))
        label = np.array(Image.open(self.labels[idx]), dtype=np.int64)

        if self.transform:
            # Apply the training transform to image and mask
            trasformation_im_lab = self.transform(image=img, mask=label)
            # Extract the transformed image and mask
            img = trasformation_im_lab['image']
            label = trasformation_im_lab['mask']

        # Convert label to LongTensor
        if isinstance(label, np.ndarray):
            label = torch.from_numpy(label).long()
        else:
            label = label.long()

        return img, label


## Create datasets for training and validation ##

train_dataset = CityscapesDataset(root="/content/drive/MyDrive/Cityscapes/Cityspaces", split="train", transform=train_transform)

val_dataset = CityscapesDataset(root="/content/drive/MyDrive/Cityscapes/Cityspaces",split="val", transform=val_transform)


## DataLoader setup ##

train_loader = DataLoader(
    train_dataset,
    batch_size=4,
    shuffle=True,
    num_workers=2,
    pin_memory=True
)

val_loader = DataLoader(
    val_dataset,
    batch_size=4,
    shuffle=False,     # No shuffle for validation
    num_workers=2,
    pin_memory=True
)


Defining the function for the learning rate: polynomial learning rate.

In [None]:
## learning rate ##

def poly_lr_scheduler(optimizer, init_lr, iter, lr_decay_iter=1,
                      max_iter=300, power=0.9):

    """Polynomial decay of learning rate
        :param init_lr is base learning rate
        :param iter is a current iteration
        :param lr_decay_iter how frequently decay occurs, default is 1
        :param max_iter is number of maximum iterations
        :param power is a polymomial power
      Returns the scalar learning rate"""


    lr = init_lr*(1 - iter/max_iter)**power
    optimizer.param_groups[0]['lr'] = lr
    return lr


This section sets up the DeepLab v2 model with a pre-trained ResNet18 backbone,
defines the loss function, and configures the optimizer

In [None]:
## Model, loss function, and optimizer ##

# # Initialize DeepLab v2 model with pre-trained ResNet18 backbone
model = get_bisenet_model(num_classes=19, context_path='resnet18').to(device)

# Cross-entropy loss
criterion = torch.nn.CrossEntropyLoss(ignore_index=255)

# SGD optimizer with momentum and weight decay
optimizer = torch.optim.SGD(model.parameters(), lr=0.0025, momentum=0.9, weight_decay=5e-4)

Checkpoint Management, Device Setup, and AMP Initialization.

This section prepares the training environment by:
   - Creating a directory to save model checkpoints
   - Selecting the computation device (GPU if available, else CPU).
   - Initializing PyTorch’s Automatic Mixed Precision GradScaler

In [None]:
## Setup directories, device, and AMP scaler ##

# Directory to save model checkpoints
checkpoint_dir = "/content/drive/MyDrive/2bMachineLearning/checkpoints"
os.makedirs(checkpoint_dir, exist_ok=True)
max_checkpoints = 2
saved_checkpoints = deque()


# Select device (GPU if available, else CPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Automatic Mixed Precision: GradScaler
scaler = torch.amp.GradScaler(device='cuda') if device.type == 'cuda' else None

## Restore from latest checkpoint if available ##
latest_checkpoint = None
latest_epoch = -1
for fname in os.listdir(checkpoint_dir):
    if fname.startswith("checkpoint_epoch") and fname.endswith(".pt"):
        epoch_num = int(fname.split("_epoch")[1].split(".")[0])
        if epoch_num > latest_epoch:
            latest_epoch = epoch_num
            latest_checkpoint = os.path.join(checkpoint_dir, fname)

start_epoch = 0
if latest_checkpoint:
    checkpoint = torch.load(latest_checkpoint, map_location=device, weights_only=False)     # Load checkpoint

    # Restore model and optimizer state:
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

    # Restore AMP scaler if used
    if scaler and checkpoint.get('scaler_state_dict'):
        scaler.load_state_dict(checkpoint['scaler_state_dict'])

    # Resume from next epoch
    start_epoch = checkpoint['epoch'] + 1
    print(f"Restored from {latest_checkpoint}")
else:
    print(" No checkpoint found. Starting from scratch")


Restored from /content/drive/MyDrive/2bMachineLearning/checkpoints/checkpoint_epoch48.pt


This section presents two functions that
- Compute mean IoUs per class and mIoU
- Measure model latency & FPS

In [None]:
def compute_miou(preds, labels, num_classes=19, device="cuda"):
    """
    Compute the mean Intersection over Union (mIoU) for semantic segmentation.

    This function calculates the IoU for each class and returns both the mean IoU
    and the list of per-class IoUs. It handles the common "void" label (255) by
    excluding it from false positive calculations.
    """


    # Initialize variables for storing: True Positives, False Positives and False Negatives
    tp = torch.zeros(num_classes, dtype=torch.int64, device=device)
    fp = torch.zeros(num_classes, dtype=torch.int64, device=device)
    fn = torch.zeros(num_classes, dtype=torch.int64, device=device)


    # TP, FP, FN for each class
    for cls in range(num_classes):
        # True Positive
        tp[cls] += ((labels == cls) & (preds == cls)).sum()
        # False Positive
        fp[cls] += ((labels != cls) & (labels != 255) & (preds == cls)).sum()
        # False Negative
        fn[cls] += ((labels == cls) & (preds != cls)).sum()

    iou_per_class = []

    # Compute IoU for each class and store in a list
    for cls in range(num_classes):
        denom = tp[cls] + fp[cls] + fn[cls]
        iou = tp[cls].float() / (denom.float() + 1e-10)
        print(f"Class {cls}: TP={tp[cls].item()}, FP={fp[cls].item()}, FN={fn[cls].item()}, IoU={iou.item():.4f}")
        if denom > 0:  # only include classes with at least one pixel
            iou_per_class.append(iou.item())

    mean_iou = np.mean(iou_per_class) if iou_per_class else 0.0
    return mean_iou, iou_per_class


def measure_latency_and_fps(model, device, input_size=(3, 512, 1024), iterations=1000):
    """
    Measure inference latency and FPS of a PyTorch model.
    This function runs the model multiple times on a random input tensor and computes
    the average latency per forward pass and frames per second (FPS).
    """
    model.eval().to(device)

    # Create a random tensor
    image = torch.randn(1, *input_size).to(device)

    latencies = []
    fps_list = []

    with torch.no_grad():
        for _ in range(iterations):
            start = time.time()
            _ = model(image)    # forward pass
            if device == "cuda":
                torch.cuda.synchronize()
            end = time.time()

            elapsed = end - start
            latencies.append(elapsed)
            fps_list.append(1.0 / elapsed)

    mean_latency = np.mean(latencies) * 1000
    std_latency = np.std(latencies) * 1000
    mean_fps = np.mean(fps_list)
    std_fps = np.std(fps_list)

    print(f"Latency : {mean_latency:.2f} ms ± {std_latency:.2f} ms")
    print(f"FPS: {mean_fps:.2f} ± {std_fps:.2f}")


Training of the model

In [None]:

## Variables for training progress ##

num_epochs = 50    # Number of epochs
num_classes = 19    # Number of segmentation classes

# Best mIoU, corresponding checkpoint and list for storing evaluation of loss function during epochs
best_miou = 0.0
best_epoch_ckpt = None
epoch_loss_list = []

## hyperparameters ##

initial_lr = 0.025   # Initial learning rate
alpha = 0.4     # Weight coefficient for auxiliary loss

scaler = torch.cuda.amp.GradScaler()

## training loop ##

for epoch in range(start_epoch,num_epochs):
    model.train()
    total_loss = 0.0
    lr = poly_lr_scheduler(optimizer, initial_lr, epoch)

    for batch_idx, (images, labels) in enumerate(train_loader):
        images = images.to(device, non_blocking=True)
        labels = labels.long().to(device,non_blocking=True)

        optimizer.zero_grad()

        # Forward step
        with torch.cuda.amp.autocast():
            outputs = model(images)

            if isinstance(outputs, (list, tuple)):
                main_pred, *aux_preds = outputs
                main_pred = F.interpolate(main_pred, size=labels.shape[1:],mode='bilinear', align_corners=False)

                # Main loss
                loss = criterion(main_pred, labels)

                # Auxiliary losses
                for aux in aux_preds:
                    aux =F.interpolate(aux, size=labels.shape[1:], mode='bilinear', align_corners=False)
                    loss += alpha * criterion(aux, labels)
            else:
                main_pred = F.interpolate(outputs,size=labels.shape[1:], mode='bilinear', align_corners=False)
                loss = criterion(main_pred, labels)

        # Backward step
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        total_loss += loss.item()

        # Cleanup GPU
        del images,labels, outputs, loss
        torch.cuda.empty_cache()
        gc.collect()


    epoch_loss = total_loss /len(train_loader)
    epoch_loss_list.append(epoch_loss)

    ## Miou on validation set ##

    all_predictions =[]
    all_labels = []

    model.eval()
    with torch.no_grad():
        for images, labels in val_loader:
            images = images.to(device, non_blocking=True)
            labels= labels.long().to(device, non_blocking=True)

            outputs = model(images)
            if isinstance(outputs, (list,tuple)):
                main_pred = outputs[0]
            else:
                main_pred = outputs

            main_pred = F.interpolate(main_pred, size=labels.shape[1:], mode="bilinear", align_corners=False)

            probabilities = torch.nn.functional.softmax(main_pred, dim=1)
            predictions = torch.argmax(probabilities,dim=1)

            all_predictions.append(predictions)
            all_labels.append(labels)

    all_predictions = torch.cat(all_predictions, dim=0).cpu()
    all_labels =torch.cat(all_labels, dim=0).cpu()

    epoch_miou, IoU_per_class = compute_miou(all_predictions, all_labels, num_classes=num_classes)
    print(f"📊 End epoch {epoch+1} — Loss: {epoch_loss:.4f}, mIoU: {epoch_miou:.4f}, LR at the end of epoch: {lr:.6f}")


    ## Saving checkpoints every 3 epochs ##
    if (epoch + 1) % 3 == 0:
        checkpoint_filename = f"checkpoint_epoch{epoch+1}.pt"
        checkpoint_path = os.path.join(checkpoint_dir, checkpoint_filename)

        # Save model, optimizer, scaler, loss, and mIoU
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'scaler_state_dict':scaler.state_dict(),
            'loss': epoch_loss,
            'miou': epoch_miou,
        }, checkpoint_path)
        saved_checkpoints.append(checkpoint_path)
        print(f"Checkpoint saved: {checkpoint_filename}")

        while len(saved_checkpoints) > max_checkpoints:
            old_ckpt =saved_checkpoints.popleft()
            if os.path.exists(old_ckpt):
                os.remove(old_ckpt)
                print(f"Removed old checkpoint: {os.path.basename(old_ckpt)}")

    ## Saving the best model ##
    if epoch_miou > best_miou:
        best_miou =epoch_miou
        best_epoch_ckpt = os.path.join(checkpoint_dir, "2b_best_epoch.pt")

        # Save model, optimizer, scaler, loss, and mIoU as the best checkpoin
        torch.save({
            'epoch': epoch,
            'model_state_dict':model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'scaler_state_dict': scaler.state_dict(),
            'loss': epoch_loss,
            'miou': epoch_miou,
        }, best_epoch_ckpt)
        print(f"New best model saved:{best_epoch_ckpt} con mIoU {best_miou:.4f}")


print(f"epoch_loss_list: {epoch_loss_list}")


## FLOPs, latency e FPS on the trained model ##
model.eval()
image = torch.zeros((1, 3, 512, 1024)).to(device)

# Compute FLOPs for the model
flops = FlopCountAnalysis(model, image)
print(flop_count_table(flops))

# Measure model latency and FPS
measure_latency_and_fps(model, device=device)