Import Libraries

In [1]:
import os
import numpy as np
import matplotlib.pyplot as plt
from glob import glob
from torch.utils.data import DataLoader
from torch.utils.data import Dataset as BaseDataset
from PIL import Image
import torch
import random 
import segmentation_models_pytorch as smp
from segmentation_models_pytorch import utils
import albumentations as albu
%matplotlib inline
import cv2
import py7zr

  from .autonotebook import tqdm as notebook_tqdm
INFO:albumentations.check_version:A new version of Albumentations is available: 1.4.13 (you have 1.4.11). Upgrade using: pip install --upgrade albumentations


Define Hyperparameters

In [2]:
# Encoder architecture for the segmentation model
ENCODER = 'resnet18'

# Use ImageNet pretrained weights for the encoder initialization
ENCODER_WEIGHTS = 'imagenet'

# Activation function applied to the output layer
ACTIVATION = 'softmax2d'

# Determine device to use (GPU if available, otherwise CPU)
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Other hyperparameters
batch_size = 2
EPOCHS = 100
lr = 1e-4
INFER_HEIGHT = 640
INFER_WIDTH = 640
# Number of epochs after which to decrease the learning rate
LR_DECREASE_STEP = 15

# Factor by which to decrease the learning rate LR_DECREASE_STEP epochs
LR_DECREASE_COEF = 2

# step size for quality when training models
step_size = 5

In [3]:
# Dataset class and visualization functions
import os
from PIL import Image
import json
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from torch.utils.data import Dataset as BaseDataset
from torchvision import transforms
import matplotlib.pyplot as plt
from glob import glob
import cv2
CLASSES = [
            "background",    # Class 0: Background
            "bulldozer",     # Class 1: Bulldozer
            "car",           # Class 2: Car
            "caterpillar",   # Class 3: Caterpillar
            "crane",         # Class 4: Crane
            "crusher",       # Class 5: Crusher
            "driller",       # Class 6: Driller
            "excavator",     # Class 7: Excavator
            "human",         # Class 8: Human
            "roller",        # Class 9: Roller
            "tractor",       # Class 10: Tractor
            "truck"          # Class 11: Truck
         ] 
colors_imshow = {
    "background": np.array([0, 0, 0]),
    "bulldozer": np.array([0, 183, 235]),
    "car": np.array([255, 255, 0]),
    "caterpillar": np.array([0, 16, 235]),
    "crane": np.array([199, 252, 0]),
    "crusher": np.array([255, 0, 140]),
    "driller": np.array([14, 122, 254]),
    "excavator": np.array([255, 171, 171]),
    "human": np.array([254, 0, 86]),
    "roller": np.array([255, 0, 255]),
    "tractor": np.array([128, 128, 0]),
    "truck": np.array([134, 34, 255]),
}
def _colorize_mask(mask: np.ndarray):
    """Colorizes a single-channel mask into a multichannel mask using predefined colors.

    Args:
        mask (np.ndarray): Single-channel mask where each pixel denotes a class.

    Returns:
        np.ndarray: Colorized mask with RGB channels representing different classes.
        dict: Dictionary containing square ratios of each class in the mask.
    """
    mask = mask.squeeze()  # Ensure the mask is squeezed to remove singleton dimensions
    colored_mask = np.zeros((*mask.shape, 3), dtype=np.int64)  # Initialize an empty colored mask
    square_ratios = {}  # Dictionary to store square ratios of each class

    # Iterate over each class code and corresponding class name
    for cls_code, cls in enumerate(CLASSES):
        cls_mask = mask == cls_code  # Create a mask for the current class
        square_ratios[cls] = cls_mask.sum() / cls_mask.size  # Calculate square ratio for the class
        colored_mask += np.multiply.outer(cls_mask, colors_imshow[cls]).astype(np.int64)  # Colorize the mask

    return colored_mask, square_ratios  # Return the colorized mask and square ratios

def reverse_normalize(img, mean, std):
    """Reverse normalization of an image.

    Args:
        img (np.ndarray): Normalized image.
        mean (list): Mean values used for normalization.
        std (list): Standard deviation values used for normalization.

    Returns:
        np.ndarray: Unnormalized image.
    """
    img = img * np.array(std) + np.array(mean)  # Reverse normalization
    return img  # Return the unnormalized image

def visualize_predicts(img: np.ndarray, mask_gt: np.ndarray, mask_pred: np.ndarray, normalized=False):
    """Visualizes the original image, ground truth mask, and predicted mask.

    Args:
        img (np.ndarray): Original image.
        mask_gt (np.ndarray): Ground truth mask.
        mask_pred (np.ndarray): Predicted mask.
        normalized (bool, optional): Whether the image is normalized. Defaults to False.
    """
    _, axes = plt.subplots(1, 3, figsize=(10, 5))  # Create subplots for image, GT mask, and predicted mask
    img = img.transpose(1, 2, 0)  # Transpose image dimensions to match matplotlib's format

    if normalized:
        # Reverse the normalization to get the unnormalized image
        img = reverse_normalize(img, mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])

    axes[0].imshow(img)  # Plot the original image
    mask_gt, square_ratios = _colorize_mask(mask_gt)  # Colorize the ground truth mask
    title = "Areas:\n" + "\n".join([f"{cls}: {square_ratios[cls]*100:.1f}%" for cls in CLASSES])
    axes[1].imshow(mask_gt, cmap="twilight")  # Plot the colorized ground truth mask
    axes[1].set_title(f"GT mask\n" + title)  # Set title for ground truth mask subplot

    mask_pred, square_ratios = _colorize_mask(mask_pred)  # Colorize the predicted mask
    title = "Areas:\n" + "\n".join([f"{cls}: {square_ratios[cls]*100:.1f}%" for cls in CLASSES])
    axes[2].imshow(mask_pred, cmap="twilight")  # Plot the colorized predicted mask
    axes[2].set_title(f"PRED mask\n" + title)  # Set title for predicted mask subplot

    plt.tight_layout()  # Adjust layout to prevent overlap
    plt.show()  # Display the plot

def visualize_multichannel_mask(image, multichannel_mask, class_names):
    # Dictionary mapping class names to RGB colors for visualization
    colors_imshow = {
        "background": np.array([0, 0, 0]),
        "bulldozer": np.array([0, 183, 235]),
        "car": np.array([255, 255, 0]),
        "caterpillar": np.array([0, 16, 235]),
        "crane": np.array([199, 252, 0]),
        "crusher": np.array([255, 0, 140]),
        "driller": np.array([14, 122, 254]),
        "excavator": np.array([255, 171, 171]),
        "human": np.array([254, 0, 86]),
        "roller": np.array([255, 0, 255]),
        "tractor": np.array([128, 128, 0]),
        "truck": np.array([134, 34, 255]),
    }

    # Helper function to convert multichannel mask to single channel and calculate class ratios
    def _convert_multichannel2singlechannel(mc_mask):
        sc_mask = np.zeros((mc_mask.shape[0], mc_mask.shape[1], 3), dtype=np.int64)
        square_ratios = {}

        # Iterate over each channel in the multichannel mask
        for i, singlechannel_mask in enumerate(mc_mask.transpose(2, 0, 1)):
            cls = class_names[i]  # Get the class name from class_names list
            singlechannel_mask = singlechannel_mask.squeeze()  # Remove singleton dimensions if any

            # Calculate ratio of pixels belonging to the class
            square_ratios[cls] = singlechannel_mask.sum() / singlechannel_mask.size

            # Generate the single channel mask by multiplying the mask with the color and adding to sc_mask
            sc_mask += np.multiply.outer(singlechannel_mask > 0, colors_imshow[cls]).astype(np.int64)

        # Generate the title for the visualization based on class ratios
        title = "Areas: " + "\n".join([f"{cls}: {square_ratios[cls]*100:.1f}%" for cls in class_names])
        return sc_mask, title

    # Create subplots for image and mask visualization
    _, axes = plt.subplots(1, 2, figsize=(10, 5))
    
    # Display the original image
    axes[0].imshow(image)
    
    # Convert multichannel mask to single channel and display alongside the image
    mask_to_show, title = _convert_multichannel2singlechannel(multichannel_mask)
    axes[1].imshow(mask_to_show.astype(np.uint8))  # Convert mask to uint8 for display
    axes[1].set_title(title)  # Set title displaying class ratios
    
    # Adjust layout for better visualization
    plt.tight_layout()
    plt.show()



            
# create class for dataset
class Excavators(BaseDataset):
    def __init__(self, images_dir, masks_dir, labels_dir, augmentation=None, preprocessing=None, compression=None):
        """
        Initializes the Dataset object.

        Args:
            images_dir (str): Directory containing the input images.
            masks_dir (str): Directory containing the corresponding masks.
            augmentation (callable, optional): Optional augmentation function to be applied to images and masks.
            preprocessing (callable, optional): Optional preprocessing function to be applied to images and masks.
        """
        # Load and sort paths to images and masks
        self.images_paths, self.masks_paths = self._get_sorted_paths(images_dir, masks_dir)
        print(f"Loaded {len(self.images_paths)} images from {images_dir}")
        print(f"Loaded {len(self.masks_paths)} masks from {masks_dir}")

        # Load class colors from a file
        self.cls_colors = self._get_classes_colors(labels_dir)

        # Store augmentation and preprocessing functions
        self.augmentation = augmentation
        self.preprocessing = preprocessing
        self.compression = compression

    def _get_sorted_paths(self, images_dir, masks_dir):
        """
        Retrieves and sorts paths to images and masks.

        Args:
            images_dir (str): Directory containing the input images.
            masks_dir (str): Directory containing the corresponding masks.

        Returns:
            tuple: Two lists containing sorted paths to images and masks.
        """
        # Retrieve paths using glob and sort them based on filenames
        images_paths = glob("{}{}{}".format(images_dir,os.sep,"*"))
        masks_paths = glob("{}{}{}".format(masks_dir,os.sep,"*"))
        
        images_paths.sort(key=lambda x: os.path.basename(x))
        masks_paths.sort(key=lambda x: os.path.basename(x))
        
        return images_paths, masks_paths

    def _get_classes_colors(self, label_colors_dir):
        """
        Loads class colors from a text file.

        Args:
            label_colors_dir (str): Path to the file containing class colors.

        Returns:
            dict: Dictionary mapping class names to RGB color tuples.
        """
        cls_colors = {}
        with open(label_colors_dir) as file:
            while line := file.readline():
                B, G, R, label = line.rstrip().split()
                cls_colors[label] = (int(R), int(G), int(B))  # Store colors as RGB tuple
        CLASSES = [
            "background",    # Class 0: Background
            "bulldozer",     # Class 1: Bulldozer
            "car",           # Class 2: Car
            "caterpillar",   # Class 3: Caterpillar
            "crane",         # Class 4: Crane
            "crusher",       # Class 5: Crusher
            "driller",       # Class 6: Driller
            "excavator",     # Class 7: Excavator
            "human",         # Class 8: Human
            "roller",        # Class 9: Roller
            "tractor",       # Class 10: Tractor
            "truck"          # Class 11: Truck
        ] 
        # Order colors according to predefined class order (CLASSES)
        keyorder = CLASSES
        cls_colors_ordered = {}
        for k in keyorder:
            if k in cls_colors:
                cls_colors_ordered[k] = cls_colors[k]
            elif k == "background":
                cls_colors_ordered[k] = (0, 0, 0)  # Black background if not specified
            else:
                raise ValueError(f"Unexpected label {k}, cls colors: {cls_colors}")

        return cls_colors_ordered

    def __getitem__(self, i):
        """
        Retrieves an image and its corresponding masks from the dataset.

        Args:
            i (int): Index of the image and mask pair to retrieve.

        Returns:
            tuple: Processed image and masks as NumPy arrays.
        """
        # Load image and mask from the paths
        image_path = self.images_paths[i]
        mask_path = self.masks_paths[i]
        image = np.array(Image.open(image_path).convert('RGB'))
        mask = np.array(Image.open(mask_path).convert('RGB'))

        # Initialize multichannel masks
        masks = np.zeros((mask.shape[0], mask.shape[1], len(self.cls_colors)), dtype=np.float32)
        
        # Generate masks for each class based on color
        for idx, (cls_name, color) in enumerate(self.cls_colors.items()):
            # Define lower boundary for the color (in RGB)
            lower = np.array(color, dtype=np.uint8)
            
            # Create a mask for the current class
            class_mask = np.all(mask == lower, axis=-1)
            
            # Convert to binary mask (0 or 1 values)
            class_mask = class_mask.astype(np.float32)
            
            # Assign the mask to the corresponding channel in masks
            masks[:, :, idx] = class_mask
        
        # Apply augmentations if provided
        if self.augmentation:
            sample = self.augmentation(image=image, mask=masks)
            image, masks = sample["image"], sample["mask"]

        # Apply preprocessing if provided
        if self.preprocessing:
            sample = self.preprocessing(image=image, mask=masks)
            image, masks = sample["image"], sample["mask"]

        return image, masks

    def __len__(self):
        """
        Returns the number of images in the dataset.

        Returns:
            int: Number of images in the dataset.
        """
        return len(self.images_paths)

In [4]:
# create instances of classes
# define relevant directories for the dataset class
training_data_directory = "{}{}{}{}{}{}".format("excavator_dataset_w_masks2",os.sep,"dataset",os.sep,"Train",os.sep)
training_masks_directory = "{}{}{}{}{}{}".format("excavator_dataset_w_masks2",os.sep,"dataset",os.sep,"Trainannot",os.sep)
labels_directory = "{}{}{}{}{}".format("excavator_dataset_w_masks2",os.sep,"dataset",os.sep,"label_colors.txt")
val_data_directory = "{}{}{}{}{}{}".format("excavator_dataset_w_masks2",os.sep,"dataset",os.sep,"Validation",os.sep)
val_masks_directory = "{}{}{}{}{}{}".format("excavator_dataset_w_masks2",os.sep,"dataset",os.sep,"Validationannot",os.sep)
test_data_directory = "{}{}{}{}{}{}".format("excavator_dataset_w_masks2",os.sep,"dataset",os.sep,"Test",os.sep)
test_masks_directory = "{}{}{}{}{}{}".format("excavator_dataset_w_masks2",os.sep,"dataset",os.sep,"Testannot",os.sep)

#create instances of training and validation datasets
training_dataset = Excavators(training_data_directory, training_masks_directory, labels_directory)
validation_dataset = Excavators(val_data_directory, val_masks_directory, labels_directory)

Loaded 616 images from excavator_dataset_w_masks2\dataset\Train\
Loaded 616 masks from excavator_dataset_w_masks2\dataset\Trainannot\
Loaded 116 images from excavator_dataset_w_masks2\dataset\Validation\
Loaded 116 masks from excavator_dataset_w_masks2\dataset\Validationannot\


Augmentation of Datasets

In [5]:
def get_training_augmentation():
    """
    Returns augmentation pipeline for training images.
    """
    train_transform = [
        albu.HorizontalFlip(p=0.5),  # Horizontal flip with 50% probability

        albu.LongestMaxSize(max_size=INFER_HEIGHT, always_apply=True),  # Resize the longest side of the image to INFER_HEIGHT
        albu.PadIfNeeded(min_height=int(INFER_HEIGHT*1.1), min_width=int(INFER_WIDTH*1.1), border_mode=2, always_apply=True),  # Pad the image if needed to ensure the minimum height and width
        albu.RandomCrop(height=INFER_HEIGHT, width=INFER_WIDTH, always_apply=True),  # Randomly crop the image to INFER_HEIGHT x INFER_WIDTH

        albu.OneOf(
            [
                albu.Sharpen(alpha=(0.1, 0.2), lightness=(0.1, 0.2), p=0.5),  # Apply sharpening with probability 50%
                albu.Blur(blur_limit=[1, 3], p=0.5),  # Apply blurring with probability 50%
                albu.GaussNoise(var_limit=(1, 5), p=0.5),  # Apply Gaussian noise with probability 50%
            ],
            p=0.7,  # Probability of applying any augmentation from the OneOf list
        ),
        albu.OneOf(
            [
                albu.RandomBrightnessContrast(brightness_limit=0.1, contrast_limit=0.1, p=0.5),  # Random brightness/contrast adjustment with probability 50%
                albu.HueSaturationValue(hue_shift_limit=5, sat_shift_limit=10, val_shift_limit=5, p=0.5),  # Random hue/saturation/value adjustment with probability 50%
                albu.RGBShift(r_shift_limit=10, g_shift_limit=10, b_shift_limit=10, p=0.5),  # Random RGB shift with probability 50%
            ],
            p=0.7,  # Probability of applying any color augmentation from the OneOf list
        ),
    ]
    return albu.Compose(train_transform)


def get_validation_augmentation():
    """
    Returns augmentation pipeline for validation/testing images.
    """
    test_transform = [
        albu.LongestMaxSize(max_size=INFER_HEIGHT, always_apply=True),  # Resize the longest side of the image to INFER_HEIGHT
        albu.PadIfNeeded(min_height=INFER_HEIGHT, min_width=INFER_WIDTH, border_mode=2, always_apply=True),  # Pad the image if needed to ensure height=INFER_HEIGHT and width=INFER_WIDTH
        albu.CenterCrop(height=INFER_HEIGHT, width=INFER_WIDTH, always_apply=True),  # Crop the center of the image to INFER_HEIGHT x INFER_WIDTH
    ]
    return albu.Compose(test_transform)


def to_tensor(x, **kwargs):
    """
    Convert image/mask to tensor format.
    
    Args:
        x (numpy.ndarray): Input image or mask.
        kwargs: Additional arguments (not used in this function).

    Returns:
        numpy.ndarray: Transposed and converted to float32 tensor.
    """
    return x.transpose(2, 0, 1).astype('float32')


def get_preprocessing(preprocessing_fn):
    """
    Constructs preprocessing transform.

    Args:
        preprocessing_fn (callable): Data normalization function.

    Returns:
        albumentations.Compose: Preprocessing transform.
    """
    _transform = [
        albu.Lambda(image=preprocessing_fn),  # Apply preprocessing function to image
        albu.Lambda(image=to_tensor, mask=to_tensor),  # Convert image and mask to tensor format
    ]
    return albu.Compose(_transform)

In [6]:
# define preprocessing
preprocessing_fn = smp.encoders.get_preprocessing_fn(ENCODER, ENCODER_WEIGHTS)
# Classic approach: divide pixel values by 255 to normalize between [0, 1]
# preprocessing_fn = lambda img, **kwargs: img.astype("float32") / 255
preprocessing_fn

functools.partial(<function preprocess_input at 0x000002144FAE2D40>, input_space='RGB', input_range=[0, 1], mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])

Training

In [7]:
# training loop for a single model
# define training loop function
def training_loop(model, optimizer, train_loader,val_loader, train_epoch, valid_epoch, save_path, jit_save_path):
    #TRAININGTYME
    max_score = 0

    loss_logs = {"train": [], "val": []}
    metric_logs = {"train": [], "val": []}
    for i in range(0, EPOCHS):
        
        print('\nEpoch: {}'.format(i))
        train_logs = train_epoch.run(train_loader)
        train_loss, train_metric_IOU = list(train_logs.values())
        loss_logs["train"].append(train_loss)
        metric_logs["train"].append(train_metric_IOU)

        valid_logs = valid_epoch.run(val_loader)
        val_loss, val_metric_IOU = list(valid_logs.values())
        loss_logs["val"].append(val_loss)
        metric_logs["val"].append(val_metric_IOU)
        
        # do something (save model, change lr, etc.)
        if max_score < valid_logs['iou_score']:
            max_score = valid_logs['iou_score']
            torch.save(model, save_path)
            # Save the model with JIT
            # Create a tensor with the specified dimensions
            trace_image = torch.randn(batch_size, 3, INFER_HEIGHT, INFER_WIDTH)
            # Trace the model using the example input
            traced_model = torch.jit.trace(model, trace_image.to(DEVICE))
            torch.jit.save(traced_model, jit_save_path)
            print('Model saved!')

        print("LR:", optimizer.param_groups[0]['lr'])
        if i > 0 and i % LR_DECREASE_STEP == 0:
            print('Decrease decoder learning rate')
            optimizer.param_groups[0]['lr'] /= LR_DECREASE_COEF
    return loss_logs, metric_logs

In [8]:
def DLV3_training_loop():
    DLV3_directory = 'models/best_models_DLV3/'
    os.makedirs(DLV3_directory, exist_ok=True)
    for q in range (0, 101, step_size):
        model_DLV3 = smp.DeepLabV3(encoder_name=ENCODER, encoder_weights=ENCODER_WEIGHTS, classes=len(CLASSES), activation=ACTIVATION)
        # datasets for training
        training_data_directory = "{}{}{}{}{}{}{}{}".format("excavator_dataset_w_masks2",os.sep,"dataset",os.sep,q,os.sep,"Train",os.sep)
        training_masks_directory = "{}{}{}{}{}{}".format("excavator_dataset_w_masks2",os.sep,"dataset",os.sep,"Trainannot",os.sep)

        val_data_directory = "{}{}{}{}{}{}{}{}".format("excavator_dataset_w_masks2",os.sep,"dataset",os.sep,q,os.sep,"Validation",os.sep)
        val_masks_directory= "{}{}{}{}{}{}".format("excavator_dataset_w_masks2",os.sep,"dataset",os.sep,"Validationannot",os.sep)
        labels_directory = "{}{}{}{}{}".format("excavator_dataset_w_masks2",os.sep,"dataset",os.sep,"label_colors.txt")
        # Create a training dataset using images from X_TRAIN_DIR and masks from Y_TRAIN_DIR.
        # Add augmentations defined by get_training_augmentation() and apply preprocessing defined by get_preprocessing(preprocessing_fn).
        training_dataset = Excavators(
            training_data_directory, 
            training_masks_directory, 
            labels_directory,
            augmentation=get_training_augmentation(), 
            preprocessing=get_preprocessing(preprocessing_fn)
        )

        # Create a validation dataset using images from X_VALID_DIR and masks from Y_VALID_DIR.
        # Add validation augmentations defined by get_validation_augmentation() and apply the same preprocessing function as the training dataset.
        valid_dataset = Excavators(
            val_data_directory, 
            val_masks_directory, 
            labels_directory,
            augmentation=get_validation_augmentation(), 
            preprocessing=get_preprocessing(preprocessing_fn)
        )

        # create training and validation dataloaders
        train_loader = DataLoader(training_dataset, batch_size=batch_size, shuffle=True)
        val_loader = DataLoader(valid_dataset, batch_size= 1, shuffle=False)
        
        # Define the criterion, metric and optimizer for training the model.
        criterion = utils.losses.DiceLoss()
        metric = [utils.metrics.IoU()]
        optimizer = torch.optim.Adam([ 
            dict(params=model_DLV3.parameters(), lr=lr),  # Use Adam optimizer with initial learning rate INIT_LR
        ])

        # define train and valid epoch for 1 model
        train_epoch = utils.train.TrainEpoch(
            model_DLV3,                    # Model to be trained
            loss= criterion,                # Loss function to compute during training
            metrics=metric,          # Metrics to compute during training (e.g., F-score, IoU)
            optimizer=optimizer,      # Optimizer used to update model parameters
            device=DEVICE,            # Device (CPU or GPU) where the training takes place
            verbose=True,             # Print training progress (verbose mode)
        )

        # Create an epoch runner for validation.
        # This object encapsulates the validation loop over the samples in the dataloader.
        valid_epoch = utils.train.ValidEpoch(
            model_DLV3,                    # Model to be evaluated
            loss= criterion,                # Loss function to compute during validation
            metrics= metric,          # Metrics to compute during validation
            device=DEVICE,            # Device (CPU or GPU) where the validation takes place
            verbose=True,             # Print validation progress (verbose mode)
        )

        # train and define save directories for one model
        DLV3_save_dir = "{}{}{}{}{}{}".format("models",os.sep,"best_models_DLV3",os.sep,q,".pth")
        DLV3_jit_save_dir = "{}{}{}{}{}{}".format("models",os.sep,"best_models_DLV3",os.sep,q,".pt") 
        loss_logs_DLV3, metric_logs_DLV3 = training_loop(model_DLV3, optimizer, train_loader,val_loader, train_epoch, valid_epoch, DLV3_save_dir, DLV3_jit_save_dir)
        # Plotting training and validation losses and metrics
        fig, axes = plt.subplots(1, 2, figsize=(10,4))
        axes[0].plot(loss_logs_DLV3["train"], label = "train")
        axes[0].plot(loss_logs_DLV3["val"], label = "val")
        axes[0].set_title(f"losses - Dice")

        axes[1].plot(metric_logs_DLV3["train"], label = "train")
        axes[1].plot(metric_logs_DLV3["val"], label = "val")
        axes[1].set_title(f"IOU")

        # Adding a main title for both plots
        plt.suptitle(f"DLV3 model trained on quality = {q} ", fontsize=16)

        # Adjusting layout to make room for the main title
        plt.tight_layout(rect=[0, 0, 1, 0.95])

        # [ax.legend() for ax in axes]

In [9]:
# Run training loop
DLV3_training_loop()

Loaded 616 images from excavator_dataset_w_masks2\dataset\0\Train\
Loaded 616 masks from excavator_dataset_w_masks2\dataset\Trainannot\
Loaded 116 images from excavator_dataset_w_masks2\dataset\0\Validation\
Loaded 116 masks from excavator_dataset_w_masks2\dataset\Validationannot\

Epoch: 0
train: 100%|██████████| 308/308 [03:45<00:00,  1.37it/s, dice_loss - 0.4391, iou_score - 0.5154] 
valid: 100%|██████████| 116/116 [00:33<00:00,  3.42it/s, dice_loss - 0.2984, iou_score - 0.6039]


  if h % output_stride != 0 or w % output_stride != 0:


Model saved!
LR: 0.0001

Epoch: 1
train:  28%|██▊       | 86/308 [01:04<02:46,  1.33it/s, dice_loss - 0.2726, iou_score - 0.6375]


KeyboardInterrupt: 

DLV3 Evaluation

In [13]:
# create compressed testing dataset:
test_data_directory = "{}{}{}{}{}{}".format("excavator_dataset_w_masks2",os.sep,"dataset",os.sep,"Test",os.sep)
test_masks_directory = "{}{}{}{}{}{}".format("excavator_dataset_w_masks2",os.sep,"dataset",os.sep,"Testannot",os.sep)
# create dataset object
test_dataset = Excavators(
    test_data_directory,  # Directory containing test images
    test_masks_directory,  # Directory containing test masks
    labels_directory,
    augmentation=get_validation_augmentation(),  # Apply validation-specific augmentations
    preprocessing=get_preprocessing(preprocessing_fn)  # Preprocessing function for normalization
)
# create dataloader
test_dataloader = DataLoader(test_dataset)
# define MSE function to compare compressed and non compressed images
def calculate_mse(original, compressed):
    # Ensure the images have the same shape
    if original.shape != compressed.shape:
        raise ValueError("Original and compressed images must have the same dimensions.")
    
    # Convert inputs to np arrays
    original_np = np.array(original)
    compressed_np = np.array(compressed)
    # Calculate the Mean Squared Error
    mse = np.mean((original_np - compressed_np) ** 2)
    return mse


def calc_avg_mse(mse_values):
    # Convert to NumPy array if it's a list
    mse_array = np.array(mse_values)
    
    # Calculate the average MSE
    average_mse = np.mean(mse_array)
    
    return average_mse

Loaded 51 images from excavator_dataset_w_masks2\dataset\Test\
Loaded 51 masks from excavator_dataset_w_masks2\dataset\Testannot\


In [14]:
def test_DLV3():
    for q in range(0, 101, step_size):
        # load best instance of model
        model_path = "{}{}{}{}{}{}".format("models",os.sep,"best_models_DLV3",os.sep,q,".pth")
        model_DLV3 = torch.load(model_path)
        model_DLV3.eval()

        # Define the criterion and metric for training the model.
        criterion = utils.losses.DiceLoss()
        metric = [utils.metrics.IoU()] 

        # define evaluation epochs
        eval_epoch = utils.train.ValidEpoch(
            model=model_DLV3,            # Model to be evaluated
            loss=criterion,         # Loss function
            metrics=metric,        # List of metrics
            device=DEVICE,          # Device (CPU or GPU)
            verbose=True            # Print evaluation progress
        )

        base_test_dir = "{}{}{}{}{}".format("excavator_dataset_w_masks2",os.sep,"dataset",os.sep,"TestCompressed")
        cor_mse = {} # quality:MSE dictionary
        IOU_scores ={} # quality: IOU dictionary
        for i in range(0, 101, 1):
            comp_test_dir = os.path.join(base_test_dir,str(i))
            print(comp_test_dir)
            # create test dataset
            comp_test_dataset = Excavators(
                comp_test_dir,  # Directory containing test images
                test_masks_directory,  # Directory containing test masks
                labels_directory,
                augmentation=get_validation_augmentation(),  # Apply validation-specific augmentations
                preprocessing=get_preprocessing(preprocessing_fn)  # Preprocessing function for normalization
                )
            mse_values =[]
            for j in range(len(comp_test_dataset)):
                comp_image,_ = comp_test_dataset[j]
                original_image, _ = test_dataset[j]
                mse = calculate_mse(original_image, comp_image)
                mse_values.append(mse)
            cor_mse[i] = calc_avg_mse(mse_values)
            print('MSE: ', cor_mse[i])

            

            test_loader = DataLoader(comp_test_dataset)
            eval_logs = eval_epoch.run(test_loader)
            eval_loss, eval_metric_IOU = list(eval_logs.values())
            IOU_scores[i] = eval_metric_IOU

            # Print evaluation results
            print(f"Evaluation Loss: {eval_loss}")
            print(f"Evaluation IoU: {eval_metric_IOU}")  

        mse_iou_dictionary = {value: IOU_scores[key] for key, value in cor_mse.items()}
        # Sort the dictionary by its keys (x values) in ascending order
        sorted_mse_iou_dict = {k: v for k, v in sorted(mse_iou_dictionary.items())}

        # create a graph
        x1 = list(sorted_mse_iou_dict.keys()) # MSE VALUES
        y1 = list(sorted_mse_iou_dict.values()) # IOU VALUES
        plt.figure(figsize=(7, 5)) # Set the figure size
        plt.ylim(0, 1)
        plt.plot(x1, y1)  # Plot with markers
        plt.xlabel('Mean-Squared-Error')  # Label for X-axis
        plt.ylabel('IOU')  # Label for Y-axis
        plt.title(f'MSE-IOU Graph using a DLV3 model trained on a dataset of quality ={q}')  # Title of the graph
        plt.grid(True)  # Show grid
        plt.show() 

        # plot quality over IOU
        x2 = list(IOU_scores.keys()) # quality VALUES
        y2 = list(IOU_scores.values()) # IOU VALUES
        plt.figure(figsize=(7, 5)) # Set the figure size
        plt.plot(x2, y2)  # 'marker='o'' can be removed if you don't want the dots
        plt.xlabel('Quality Factor')
        plt.ylabel('IOU')
        plt.title(f'IOU-Quality using a DLV3 model trained on a dataset of quality ={q}')
        plt.grid(True)
        plt.show()

        # Test on normal dataset
        test_data_directory = "{}{}{}{}{}{}".format("excavator_dataset_w_masks2",os.sep,"dataset",os.sep,"Test",os.sep)
        normal_test_dataset = Excavators(
            test_data_directory,  # Directory containing test images
            test_masks_directory,  # Directory containing test masks
            labels_directory,
            augmentation=get_validation_augmentation(),  # Apply validation-specific augmentations
            preprocessing=get_preprocessing(preprocessing_fn)  # Preprocessing function for normalization
            )
        normal_test_loader = DataLoader(normal_test_dataset)
        eval_logs = eval_epoch.run(normal_test_loader)
        eval_loss, eval_metric_IOU = list(eval_logs.values())

        # Print evaluation results
        print(f"DLV3 model trained on dataset of quality= {q} \ntest on uncompressed dataset")
        print(f"Evaluation Loss: {eval_loss}")
        print(f"Evaluation IoU: {eval_metric_IOU}")

In [15]:
test_DLV3()

excavator_dataset_w_masks2\dataset\TestCompressed\0
Loaded 51 images from excavator_dataset_w_masks2\dataset\TestCompressed\0
Loaded 51 masks from excavator_dataset_w_masks2\dataset\Testannot\


KeyboardInterrupt: 