# 3.- Modeling 

In this notebook, we focus on the image segmentation model. We will use the U-net model (https://github.com/zhixuhao/unet.git).

The U-net model is particularly suitable for this task due to several reasons:

1. **Encoder-Decoder Architecture**: U-net has a symmetric architecture with an encoder to capture context and a decoder for precise localization, which makes it highly effective for segmentation tasks.
2. **Skip Connections**: These connections between the encoder and decoder help preserve spatial information, which is crucial for accurate segmentation.
3. **Data Efficiency**: U-net is designed to work well even with relatively small training datasets, making it ideal for cases where annotated data is limited.
4. **Versatility**: It has been successfully applied to various medical and general image segmentation problems, demonstrating its robustness and adaptability.
5. **State-of-the-Art Performance**: U-net consistently achieves high performance on benchmark segmentation datasets, often outperforming other models in terms of both accuracy and speed.


### Import Libraries

In [4]:
# Image processing and transformation libraries
import cv2
from PIL import Image
import albumentations as A
from albumentations.pytorch import ToTensorV2
import torchvision
from torchvision import models
from torchvision.utils import save_image
import torchvision.transforms.functional as TF

# Scientific computing and data manipulation libraries
import numpy as np
import pandas as pd
import os
import time
import json
import math

# Machine learning and metrics libraries
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

# Image analysis libraries
from skimage import feature, measure, morphology

# PyTorch libraries for deep learning
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torch.utils.tensorboard import SummaryWriter
import torch.multiprocessing as mp

# Visualization libraries
import matplotlib.pyplot as plt
from tqdm import tqdm


### Data Paths 

In [1]:
#Training set path
TRAINDIR_IMGS = '../dataset/train/imgs'
TRAINDIR_MASKS = '../dataset/train/masks'

#Validation set path
VALDIR_IMGS  = '../dataset/val/imgs'
VALDIR_MASKS  = '../dataset/val/masks'

#Test set path
TESTDIR_IMGS  = '../dataset/test_2/imgs'
TESTDIR_MASKS  = '../dataset/test_2/masks'

### Dataset Initialization

In [6]:
class RobotDataset(Dataset):
    def __init__(self, image_dir, mask_dir, transform=None):
        """
        Initializes the dataset with directory paths and an optional transform.
        """
        self.image_dir = image_dir  # Directory containing images
        self.mask_dir = mask_dir    # Directory containing corresponding masks
        self.transform = transform  # Optional transform to be applied on a sample
        self.images = os.listdir(image_dir)  # List of all image filenames in the directory

    def __len__(self):
        return len(self.images)  # Returns the number of images in the dataset

    def __getitem__(self, index):
        try:
            # Constructs the full path for the image and mask
            img_path = os.path.join(self.image_dir, self.images[index])
            mask_path = os.path.join(self.mask_dir, self.images[index].replace(".jpg", ".png"))

            # Reads the image using OpenCV
            image = cv2.imread(img_path)
            if image is None:
                raise ValueError(f"Image not found at {img_path}")
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)  # Converts image from BGR to RGB

            # Reads the mask in grayscale mode
            mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
            if mask is None:
                raise ValueError(f"Mask not found at {mask_path}")
            mask = mask.astype(np.float32)  # Converts mask to float32
            mask[mask == 255.0] = 1.0  # Converts all mask values of 255 to 1

            # Applies transformations if any
            if self.transform:
                augmentations = self.transform(image=image, mask=mask)
                image = augmentations['image']
                mask = augmentations['mask']

            return image, mask  # Returns the image and mask as a tuple
        except Exception as e:
            print(f"An error occurred for index {index}: {e}")
            # Raise the exception to notify the caller about the error
            raise

### Define Utility Functions

In [7]:
def save_checkpoint(state, filename="../models/unet_checkpoint.pth.tar"):
    """
    Saves the current state of the model to a file.

    Parameters:
        state (dict): The state of the model, typically containing model parameters.
        filename (str, optional): The filename where the state will be saved. Defaults to "my_checkpoint.pth.tar".
    """
    print("=> Saving checkpoint")
    torch.save(state, filename)

def load_checkpoint(checkpoint, model):
    """
    Loads the model state from a checkpoint file.

    Parameters:
        checkpoint (dict): The checkpoint containing model state as saved previously.
        model (torch.nn.Module): The model instance where the state will be loaded.
    """
    print("=> Loading checkpoint")
    model.load_state_dict(checkpoint["state_dict"])
    
def worker_init_fn(worker_id):
    """
    Initializes the random seed for each worker to ensure reproducibility.
    The seed is based on the worker ID to ensure different seeds for each worker.
    """
    np.random.seed(np.random.get_state()[1][0] + worker_id)

def get_loaders(train_dir, train_maskdir, val_dir, val_maskdir, test_dir, test_maskdir, batch_size, train_transform, val_transform, test_transform, num_workers=4, pin_memory=True):
    """
    Creates DataLoader objects for the training, validation, and test datasets.
    """
    
    # Initialize the training dataset and DataLoader
    train_ds = RobotDataset(image_dir=train_dir, mask_dir=train_maskdir, transform=train_transform)
    train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, num_workers=num_workers, pin_memory=pin_memory, worker_init_fn=worker_init_fn)

    # Initialize the validation dataset and DataLoader
    val_ds = RobotDataset(image_dir=val_dir, mask_dir=val_maskdir, transform=val_transform)
    val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=False, num_workers=num_workers, pin_memory=pin_memory, worker_init_fn=worker_init_fn)
    
    # Initialize


def check_accuracy(loader, model, device="cuda"):
    """
    Computes the accuracy and Dice score of the model using a given loader.

    Parameters:
        loader (DataLoader): The DataLoader for the dataset to evaluate.
        model (torch.nn.Module): The model to evaluate.
        device (str, optional): The device to use for computation. Defaults to "cuda".

    Returns:
        tuple: A tuple containing the accuracy and Dice score, both multiplied by 100.
    """
    # Initialize counts for true positives, false positives, true negatives, and false negatives
    num_tp = 0
    num_fp = 0
    num_tn = 0
    num_fn = 0
    
    # Set the model to evaluation mode
    model.eval()

    # Disable gradient computation for efficiency
    with torch.no_grad():
        # Iterate over batches in the DataLoader
        for x, y in loader:
            x = x.to(device)  # Move inputs to the specified device
            y = y.to(device).unsqueeze(1)  # Move targets to the device and add channel dimension
            
            # Apply sigmoid to the target to get probabilities
            y = torch.sigmoid(y)
            # Convert probabilities to binary predictions
            y = (y > 0.5).float()
            
            # Get model predictions and convert to binary
            preds = torch.sigmoid(model(x))
            preds = (preds > 0.5).float()

            # Update counts
            num_tp += (preds * y).sum()
            num_tn += ((1 - preds) * (1 - y)).sum()
            num_fp += (preds * (1 - y)).sum()
            num_fn += ((1 - preds) * y).sum()

    # Print the counts for debugging purposes
    print(f"FP: {num_fp}, FN: {num_fn}, TP: {num_tp}, TN: {num_tn}")
    
    # Calculate accuracy
    accuracy = (num_tp + num_tn) / (num_tp + num_tn + num_fp + num_fn)
    # Calculate Dice score
    dice_score = (2 * num_tp) / ((2 * num_tp) + num_fp + num_fn + 1e-8)

    # Set the model back to training mode
    model.train()
    
    # Return accuracy and Dice score, both scaled by 100
    return accuracy * 100, dice_score * 100


def save_predictions_as_imgs(loader, model, folder="saved_images/", device="cuda"):
    """
    Saves the model's predictions as images for each batch in the loader.

    Parameters:
        loader (torch.utils.data.DataLoader): The DataLoader to get batches from.
        model (torch.nn.Module): The model to generate predictions.
        folder (str, optional): The directory where the images will be saved. Defaults to "saved_images/".
        device (str, optional): The device to use for computation. Defaults to "cuda".
    """
    # Ensure the folder exists
    os.makedirs(folder, exist_ok=True)

    # Set the model to evaluation mode
    model.eval()
    
    # Iterate over batches in the DataLoader
    for idx, (x, y) in enumerate(loader):
        x = x.to(device=device)  # Move inputs to the specified device
        
        # Disable gradient computation for efficiency
        with torch.no_grad():
            # Get model predictions and convert to binary
            preds = torch.sigmoid(model(x))
            preds = (preds > 0.5).float()
        
        # Save predictions and ground truth images
        torchvision.utils.save_image(preds, os.path.join(folder, f"pred_{idx}.png"))
        torchvision.utils.save_image(y.unsqueeze(1), os.path.join(folder, f"{idx}.png"))

    # Set the model back to training mode
    model.train()
    
def stats(data, savedir):
    """
    Calculate the mean and standard deviation for features in the provided data and save the results to a JSON file.

    Parameters:
        data (list of list of dict): The input data containing lists of dictionaries with feature values.
        savedir (str): The directory where the resulting statistics JSON file will be saved.

    Returns:
        dict: A dictionary with the mean and standard deviation for each feature.
    """
    # Extract all possible keys (assuming all features are in every dictionary)
    keys = list(data[0][0].keys())
    keys.remove('centroid')  # Remove 'centroid' since it's a tuple and not trivial to calculate stats

    # Create a dictionary to accumulate values for each feature
    features = {key: [] for key in keys}

    # Iterate over each sublist of dictionaries and each dictionary to accumulate feature values
    for sublist in data:
        for dic in sublist:
            for key, value in dic.items():
                # Omit centroids because they are tuples and it's not trivial to calculate mean and std for tuples
                if key != 'centroid':
                    features[key].append(value)

    # Calculate the mean and standard deviation for each feature
    stats_dict = {key: (np.mean(values), np.std(values)) for key, values in features.items()}

    # Create the directory if it does not exist
    if not os.path.exists(savedir):
        os.makedirs(savedir)

    # Save the dictionary to a JSON file
    with open(os.path.join(savedir, 'feature_stats.json'), 'w') as f:
        json.dump(stats_dict, f, indent=4)

    return stats_dict


def get_pieces_features(mask):
    """
    Given a mask path, returns a dictionary with the features of each piece in the image, including the radius of the circumcircle.

    Parameters:
        mask (torch.Tensor): The input mask as a PyTorch tensor.

    Returns:
        list: A list of dictionaries, each containing features of a piece in the mask.
    """
    # Convert the mask from tensor to numpy array and scale to 0-255
    mask = mask.numpy().squeeze()
    mask = (mask * 255).astype(np.uint8)

    # Threshold the image to ensure only the pieces are in white
    _, thresh = cv2.threshold(mask, 127, 255, cv2.THRESH_BINARY)

    # Find all contours on the thresholded image
    contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    
    # Filter out very small contours that are likely noise
    contours = [cnt for cnt in contours if cv2.contourArea(cnt) > 100]

    # Initialize list to hold features of each piece
    pieces_features = []

    # Process each contour to extract features
    for piece_contour in contours:
        # Create a mask of the piece
        piece_mask = np.zeros_like(mask)
        cv2.drawContours(piece_mask, [piece_contour], -1, 255, thickness=cv2.FILLED)

        # Calculate eccentricity using fitEllipse if the contour has enough points
        if piece_contour.shape[0] >= 5:
            (x, y), (MA, ma), angle = cv2.fitEllipse(piece_contour)
            eccentricity = np.sqrt(1 - (MA / ma) ** 2)
        else:
            eccentricity = None

        # Calculate centroid using moments
        M = cv2.moments(piece_contour)
        cx = int(M['m10'] / M['m00'])
        cy = int(M['m01'] / M['m00'])
        centroid = (cx, cy)

        # Calculate the radius of the minimum enclosing circle
        (x, y), radius = cv2.minEnclosingCircle(piece_contour)

        # Compile features into a dictionary
        features = {
            'eccentricity': eccentricity,
            'centroid': centroid,
            'radius': radius
        }

        # Add features of the current piece to the list
        pieces_features.append(features)

    return pieces_features


def save_annotated_image(image, save_path, pieces_features):
    """
    Saves the image to the given path, annotated with the circumcircle and centroid mark for each piece in red for visibility.
    Converts grayscale images to RGB before annotation.
    
    Parameters:
        image (numpy array): The image array.
        save_path (str): Path to save the annotated image.
        pieces_features (list): List of dictionaries containing features of each piece including the radius and centroid.
    """
    # Convert the image from tensor to numpy array and scale to 0-255
    image = image.numpy().transpose(1, 2, 0)
    image = (image * 255).astype(np.uint8)
    
    # Convert grayscale image to RGB if necessary
    if image.ndim == 2 or (image.ndim == 3 and image.shape[2] == 1):
        image = cv2.cvtColor(image, cv2.COLOR_GRAY2RGB)
    
    # Annotate each piece
    for features in pieces_features:
        # Draw the circumcircle in red
        cv2.circle(image, (int(features['centroid'][0]), int(features['centroid'][1])), int(features['radius']), (0, 0, 255), 2)
        # Draw the centroid as a red 'X'
        cv2.drawMarker(image, (int(features['centroid'][0]), int(features['centroid'][1])), (0, 0, 255), markerType=cv2.MARKER_CROSS, markerSize=10, thickness=2)
    
    # Save the annotated image
    cv2.imwrite(save_path, image)

def load_stats(filepath= '../data/calibration_matrixes/feature_stats.json'):
    """
    Load the statistics from a JSON file.
    """
    with open(filepath, 'r') as file:
        stats = json.load(file)
    return stats

def filter_pieces(stats_data, pieces_list, std=9):
    """
    Filters pieces based on the statistical data provided.

    Parameters:
        stats_data (dict): A dictionary with keys as properties and values as (mean, sigma).
        pieces_list (list): A list of dictionaries, where each dictionary contains properties of a piece.
        std (int, optional): The number of standard deviations to use for filtering. Defaults to 9.

    Returns:
        list: A list of dictionaries, each containing 'centroid' and 'radius' of valid pieces.
    """
    valid_pieces = []
    
    # Load statistical data
    stats_data = load_stats(stats_data)
    
    # Iterate over each piece
    for piece in pieces_list:
        valid = True
        
        # Check each statistical property
        for key, (mean, sigma) in stats_data.items():
            if key in piece:  # Only check if the key exists in the piece's data
                value = piece[key]
                # Validate if the value is within the specified number of standard deviations
                if not (mean - std * sigma <= value <= mean + std * sigma):
                    valid = False
                    break
        
        if valid:
            # If all properties are valid, add the centroid and radius to the valid list
            valid_pieces.append({'centroid': piece['centroid'], 'radius': piece['radius']})
    
    return valid_pieces

def postprocess(tensor_prediction, area_threshold):
    """
    Post-processes the tensor prediction to apply morphological operations, contour detection, and watershed segmentation.

    Parameters:
        tensor_prediction (torch.Tensor): The input tensor prediction.
        area_threshold (float): The minimum area threshold to filter contours.

    Returns:
        torch.Tensor: The final processed mask as a tensor.
    """
    # Convert tensor to numpy array
    image = tensor_prediction.squeeze().cpu().numpy()
    
    # Ensure the image is in 8-bit format
    if image.dtype != np.uint8:
        image = np.clip(image * 255, 0, 255).astype(np.uint8)

    # Handle color conversion if necessary
    if len(image.shape) == 2:  # It's a grayscale image
        image_color = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)
    else:
        image_color = image  # It's already a BGR image

    # Define the kernel for morphological operations
    kernel = np.ones((5, 5), np.uint8)

    # Apply morphological opening and closing to reduce noise
    opening = cv2.morphologyEx(image, cv2.MORPH_OPEN, kernel, iterations=2)
    closing = cv2.morphologyEx(opening, cv2.MORPH_CLOSE, kernel)

    # Convert to binary image for contour detection
    _, binary = cv2.threshold(closing, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
    
    # Find contours in the binary image
    contours, _ = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    mask = np.zeros_like(image)

    # Filter and draw contours by area
    for contour in contours:
        if cv2.contourArea(contour) > area_threshold:
            cv2.drawContours(mask, [contour], -1, (255), thickness=cv2.FILLED)

    # Further morphological cleaning
    sure_bg = cv2.dilate(mask, kernel, iterations=3)

    # Distance transformation for foreground segmentation
    dist_transform = cv2.distanceTransform(sure_bg, cv2.DIST_L2, 5)
    _, sure_fg = cv2.threshold(dist_transform, 0.7 * dist_transform.max(), 255, 0)
    sure_fg = np.uint8(sure_fg)

    # Determine unknown region
    unknown = cv2.subtract(sure_bg, sure_fg)

    # Connected components to separate different objects
    _, markers = cv2.connectedComponents(sure_fg)
    markers = markers + 1
    markers[unknown == 255] = 0

    # Apply the watershed algorithm to segment connected parts
    cv2.watershed(image_color, markers)
    image_color[markers == -1] = [255, 0, 0]  # Mark boundaries in red

    # Prepare the final mask in the same format as the input
    final_mask = np.zeros_like(image, dtype=np.uint8)
    final_mask[markers > 1] = 1
    
    # Convert final mask back to tensor format
    final_tensor = torch.from_numpy(final_mask).unsqueeze(0)  # Add batch dimension if necessary

    return final_tensor

def read_labels(labels_path):
    """
    Reads a single line of integer labels from a file and converts them into a numpy array.

    Parameters:
        labels_path (str): The path to the file containing the labels.

    Returns:
        numpy.ndarray: An array of integer labels.
    """
    # Open the file and read the first line
    with open(labels_path, 'r') as file:
        line = file.readline().strip()  # Read and strip any surrounding whitespace
        values = np.array(list(map(int, line.split())))  # Convert the line to a list of integers and then to a numpy array
    return values

def calculate_corrections(differences):
    """
    Calculates the average corrections in the x and y directions based on the provided differences.

    Parameters:
        differences (list of tuples): A list of (dx, dy) tuples representing differences.

    Returns:
        tuple: A tuple (C_x, C_y) representing the average correction in x and y directions.
    """
    # Calculate the total differences in the x and y directions
    total_diff_x = sum(diff[0] for diff in differences)
    total_diff_y = sum(diff[1] for diff in differences)
    
    # Count the number of differences
    count = len(differences)
    
    # Calculate the average correction in x and y directions
    C_x = total_diff_x / count
    C_y = total_diff_y / count
    
    return C_x, C_y

## 3.1  Definition of the U-Net Model Architecture

In [8]:
class DoubleConv(nn.Module):
    """
    A module to perform two consecutive convolution operations followed by batch normalization and ReLU activation.

    Attributes:
        conv (nn.Sequential): A sequential container of two convolutional blocks.

    Parameters:
        in_channels (int): Number of input channels.
        out_channels (int): Number of output channels.
    """
    def __init__(self, in_channels, out_channels):
        super(DoubleConv, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, 3, 1, 1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, 3, 1, 1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
        )

    def forward(self, x):
        """
        Defines the computation performed at every call of the DoubleConv module.

        Parameters:
            x (torch.Tensor): The input data.

        Returns:
            torch.Tensor: The output data after passing through the convolution blocks.
        """
        return self.conv(x)
class UNET(nn.Module):
    """
    U-Net architecture for image segmentation tasks.

    Attributes:
        ups (nn.ModuleList): List of modules used in the decoder path of U-Net.
        downs (nn.ModuleList): List of modules used in the encoder path of U-Net.
        pool (nn.MaxPool2d): Max pooling layer.
        bottleneck (DoubleConv): The bottleneck layer of U-Net.
        final_conv (nn.Conv2d): Final convolutional layer to produce the output segmentation map.

    Parameters:
        in_channels (int): Number of channels in the input image.
        out_channels (int): Number of channels in the output image.
        features (List[int]): Number of features in each layer of the network.
    """
    def __init__(self, in_channels=3, out_channels=1, features=[64, 128, 256, 512]):
        super(UNET, self).__init__()
        self.ups = nn.ModuleList()
        self.downs = nn.ModuleList()
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

        for feature in features:
            self.downs.append(DoubleConv(in_channels, feature))
            in_channels = feature

        for feature in reversed(features):
            self.ups.append(
                nn.ConvTranspose2d(feature*2, feature, kernel_size=2, stride=2)
            )
            self.ups.append(DoubleConv(feature*2, feature))

        self.bottleneck = DoubleConv(features[-1], features[-1]*2)
        self.final_conv = nn.Conv2d(features[0], out_channels, kernel_size=1)

    def forward(self, x):
        """
        Defines the forward pass of the U-Net using skip connections and up-sampling.

        Parameters:
            x (torch.Tensor): The input tensor for the U-Net model.

        Returns:
            torch.Tensor: The output tensor after processing through U-Net.
        """
        skip_connections = []

        for down in self.downs:
            x = down(x)
            skip_connections.append(x)
            x = self.pool(x)

        x = self.bottleneck(x)
        skip_connections = skip_connections[::-1]

        for idx in range(0, len(self.ups), 2):
            x = self.ups[idx](x)
            skip_connection = skip_connections[idx//2]

            if x.shape != skip_connection.shape:
                x = TF.resize(x, size=skip_connection.shape[2:])

            concat_skip = torch.cat((skip_connection, x), dim=1)
            x = self.ups[idx+1](concat_skip)

        return self.final_conv(x)

## 3.2 Training the U-Net model

### 3.2.1 Hyperparameter Definition

In [9]:
#HYPERPARAMETERS
LEARNING_RATE = 1e-4
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
BATCH_SIZE = 8
NUM_EPOCHS = 50
NUM_WORKERS = 0
IMAGE_HEIGHT = 270 #135,270,540,1080
IMAGE_WIDTH = 480  #240,480,960,1920
PIN_MEMORY = True
LOAD_MODEL = False
RUN_NAME = "u_net"

#EARLY STOPPING
BEST_ACCURACY = 0.0
BEST_DICE_SCORE = 0.0
PATIENCE=10
PATIENCE_COUNTER=0

### 3.2.2 Definition of the Training Functions

In [49]:
writer = SummaryWriter(f"runs/{RUN_NAME}")

def train_fn(loader, model, optimizer, loss_fn, scaler, epoch, writer):
    loop = tqdm(loader)  # Initialize a tqdm progress bar for the data loader
    model.train()  # Set the model to training mode

    for batch_idx, (data, targets) in enumerate(loop):
        data = data.to(device=DEVICE)  # Move input data to the specified device (e.g., GPU)
        targets = targets.float().unsqueeze(1).to(device=DEVICE)  # Convert targets to float and adjust dimensions
        targets = torch.sigmoid(targets)  # Apply sigmoid activation to targets

        # Forward pass with automatic mixed precision for efficiency
        with torch.cuda.amp.autocast():
            predictions = model(data)  # Generate model predictions
            predictions = torch.sigmoid(predictions)  # Apply sigmoid activation to predictions
            loss = loss_fn(predictions, targets)  # Compute the loss

        # Log the training loss to TensorBoard
        writer.add_scalar("Training loss", loss.item(), epoch * len(loader) + batch_idx)

        optimizer.zero_grad()  # Clear previous gradients
        scaler.scale(loss).backward()  # Backpropagate the scaled loss
        scaler.step(optimizer)  # Update the model parameters
        scaler.update()  # Update the scaler for the next iteration

        # Update the tqdm progress bar with the current loss
        loop.set_postfix(loss=loss.item())

    # Compute loss for the epoch and log it (Note: Recompute the loss on the last batch)
    loss_epoch = loss_fn(predictions, targets)
    writer = SummaryWriter(f"runs/{RUN_NAME}")  # Initialize a new SummaryWriter for logging


### 3.2.3 Definition of the Transformation Functions

In [13]:
train_transform = A.Compose(
    [
        A.SmallestMaxSize(max_size=max(IMAGE_HEIGHT, IMAGE_WIDTH), always_apply=True),  # Resize the image so its smaller side is equal to the max size
        A.CenterCrop(height=IMAGE_HEIGHT, width=IMAGE_WIDTH, always_apply=True),       # Crop the center of the image to the given dimensions
        A.PadIfNeeded(min_height=IMAGE_HEIGHT, min_width=IMAGE_WIDTH, border_mode=cv2.BORDER_CONSTANT, value=0, always_apply=True),  # Pad the image if needed with a constant border of value 0
        A.Rotate(limit=35, p=1.0),  # Rotate the image randomly within the range of -35 to 35 degrees (always applied)
        A.HorizontalFlip(p=0.5),    # Flip the image horizontally with 50% probability
        A.VerticalFlip(p=0.1),      # Flip the image vertically with 10% probability
        A.Normalize(                # Normalize the image to zero mean and unit variance
            mean=[0.0, 0.0, 0.0],
            std=[1.0, 1.0, 1.0],
            max_pixel_value=255.0,
        ),
        ToTensorV2(),  # Convert the image to a PyTorch tensor
    ],
)

val_transforms = A.Compose(
    [
        A.SmallestMaxSize(max_size=max(IMAGE_HEIGHT, IMAGE_WIDTH), always_apply=True),  # Resize the image so its smaller side is equal to the max size
        A.CenterCrop(height=IMAGE_HEIGHT, width=IMAGE_WIDTH, always_apply=True),       # Crop the center of the image to the given dimensions
        A.PadIfNeeded(min_height=IMAGE_HEIGHT, min_width=IMAGE_WIDTH, border_mode=cv2.BORDER_CONSTANT, value=0, always_apply=True),  # Pad the image if needed with a constant border of value 0
        A.Normalize(                # Normalize the image to zero mean and unit variance
            mean=[0.0, 0.0, 0.0],
            std=[1.0, 1.0, 1.0],
            max_pixel_value=255.0,
        ),
        ToTensorV2(),  # Convert the image to a PyTorch tensor
    ],
)

test_transforms = A.Compose([
    A.Resize(height=IMAGE_HEIGHT, width=IMAGE_WIDTH, always_apply=True),  # Resize the image to the given dimensions
    A.Normalize(                # Normalize the image to zero mean and unit variance
            mean=[0.0, 0.0, 0.0],
            std=[1.0, 1.0, 1.0],
            max_pixel_value=255.0,
    ),
    ToTensorV2(),  # Convert the image to a PyTorch tensor
])


### 3.2.4 Model Instantiation

In [14]:
# Model initialization with specified input and output channels
model = UNET(in_channels=3, out_channels=1).to(DEVICE)
# Loss function using Binary Cross Entropy with logits
loss_fn = nn.BCEWithLogitsLoss()
# Optimizer using Adam algorithm
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
# Loading datasets for training and validation
train_loader, val_loader,test_loader = get_loaders(TRAINDIR_IMGS,TRAINDIR_MASKS,VALDIR_IMGS,VALDIR_MASKS,TESTDIR_IMGS,TESTDIR_MASKS,BATCH_SIZE,train_transform,val_transforms,test_transforms,NUM_WORKERS,PIN_MEMORY)

### 3.2.5 Training

In [71]:
# Training loop over the number of epochs
if False: 
    # Utility for calculating accuracy
    check_accuracy(val_loader, model, device=DEVICE)
    # GradScaler for mixed precision training
    scaler = torch.cuda.amp.GradScaler()
    for epoch in range(NUM_EPOCHS):
        # Function to handle the training process
        train_fn(train_loader, model, optimizer, loss_fn, scaler, epoch, writer)
        # Checking model accuracy and dice score after each epoch
        accuracy, dice_score = check_accuracy(val_loader, model, device=DEVICE)
        # Log accuracy and dice score for each epoch
        writer.add_scalar("Accuracy", accuracy, epoch)
        writer.add_scalar("Dice Score", dice_score, epoch)
        # Check for model improvement, save model if improved
        if accuracy > BEST_ACCURACY * 0.8 and dice_score > BEST_DICE_SCORE:
            BEST_ACCURACY = accuracy
            BEST_DICE_SCORE = dice_score
            print(f"Epoch {epoch + 1}, New best model with accuracy: {accuracy:.4f}, Dice Score: {dice_score:.4f}")
            # Save model checkpoint
            checkpoint = {
            "state_dict": model.state_dict(),
            "optimizer": optimizer.state_dict(),
            }
            save_checkpoint(checkpoint)
            PATIENCE_COUNTER = 0  # Reset patience counter
        else:
            PATIENCE_COUNTER += 1  # Increment patience counter
            print(f"Epoch {epoch + 1}, No improvement. Patience: {PATIENCE_COUNTER}/{PATIENCE}")
        # Early stopping if no improvement for a defined number of epochs
        if PATIENCE_COUNTER >= PATIENCE:
            print(f"Early stopping triggered. No improvement in {PATIENCE} epochs.")
            print(f"Best model: Accuracy= {BEST_ACCURACY:.4f}, Dice_score= {BEST_DICE_SCORE:.4f}.")
            break
        # Save sample predictions as images
        save_predictions_as_imgs(
            val_loader, model, folder="results/saved_unet", device=DEVICE
        )

100%|██████████| 96/96 [01:03<00:00,  1.51it/s, loss=0.702]


FP: 133586.0, FN: 5362.0, TP: 263088.0, TN: 26036360.0
Epoch 1, New best model with accuracy: 99.4744, Dice Score: 79.1095
=> Saving checkpoint


100%|██████████| 96/96 [01:01<00:00,  1.57it/s, loss=0.699]


FP: 9866.0, FN: 7424.0, TP: 261026.0, TN: 26160086.0
Epoch 2, New best model with accuracy: 99.9346, Dice Score: 96.7942
=> Saving checkpoint


100%|██████████| 96/96 [01:01<00:00,  1.56it/s, loss=0.698]


FP: 3550.0, FN: 8900.0, TP: 259550.0, TN: 26166402.0
Epoch 3, New best model with accuracy: 99.9529, Dice Score: 97.6578
=> Saving checkpoint


100%|██████████| 96/96 [01:04<00:00,  1.48it/s, loss=0.696]


FP: 6387.0, FN: 6519.0, TP: 261931.0, TN: 26163564.0
Epoch 4, No improvement. Patience: 1/10


100%|██████████| 96/96 [01:04<00:00,  1.48it/s, loss=0.695]


FP: 5864.0, FN: 7074.0, TP: 261376.0, TN: 26164086.0
Epoch 5, No improvement. Patience: 2/10


100%|██████████| 96/96 [01:03<00:00,  1.50it/s, loss=0.694]


FP: 15964.0, FN: 10483.0, TP: 257967.0, TN: 26153984.0
Epoch 6, No improvement. Patience: 3/10


100%|██████████| 96/96 [01:03<00:00,  1.51it/s, loss=0.693]


FP: 6184.0, FN: 6555.0, TP: 261895.0, TN: 26163768.0
Epoch 7, No improvement. Patience: 4/10


100%|██████████| 96/96 [01:03<00:00,  1.52it/s, loss=0.694]


FP: 5348.0, FN: 6550.0, TP: 261900.0, TN: 26164602.0
Epoch 8, New best model with accuracy: 99.9550, Dice Score: 97.7790
=> Saving checkpoint


100%|██████████| 96/96 [01:06<00:00,  1.44it/s, loss=0.694]


FP: 5780.0, FN: 5825.0, TP: 262625.0, TN: 26164168.0
Epoch 9, New best model with accuracy: 99.9561, Dice Score: 97.8383
=> Saving checkpoint


100%|██████████| 96/96 [01:03<00:00,  1.51it/s, loss=0.692]


FP: 6803.0, FN: 6833.0, TP: 261617.0, TN: 26163152.0
Epoch 10, No improvement. Patience: 1/10


100%|██████████| 96/96 [01:05<00:00,  1.47it/s, loss=0.693]


FP: 6394.0, FN: 4832.0, TP: 263618.0, TN: 26163560.0
Epoch 11, New best model with accuracy: 99.9575, Dice Score: 97.9152
=> Saving checkpoint


100%|██████████| 96/96 [01:07<00:00,  1.42it/s, loss=0.693]


FP: 7207.0, FN: 4261.0, TP: 264189.0, TN: 26162748.0
Epoch 12, No improvement. Patience: 1/10


100%|██████████| 96/96 [01:06<00:00,  1.43it/s, loss=0.693]


FP: 5293.0, FN: 6129.0, TP: 262321.0, TN: 26164656.0
Epoch 13, No improvement. Patience: 2/10


100%|██████████| 96/96 [01:06<00:00,  1.44it/s, loss=0.693]


FP: 5815.0, FN: 4437.0, TP: 264013.0, TN: 26164136.0
Epoch 14, New best model with accuracy: 99.9612, Dice Score: 98.0954
=> Saving checkpoint


100%|██████████| 96/96 [01:04<00:00,  1.48it/s, loss=0.693]


FP: 4783.0, FN: 8061.0, TP: 260389.0, TN: 26165164.0
Epoch 15, No improvement. Patience: 1/10


100%|██████████| 96/96 [01:04<00:00,  1.49it/s, loss=0.693]


FP: 4371.0, FN: 5706.0, TP: 262744.0, TN: 26165584.0
Epoch 16, New best model with accuracy: 99.9619, Dice Score: 98.1184
=> Saving checkpoint


100%|██████████| 96/96 [01:04<00:00,  1.48it/s, loss=0.692]


FP: 4307.0, FN: 6473.0, TP: 261977.0, TN: 26165644.0
Epoch 17, No improvement. Patience: 1/10


100%|██████████| 96/96 [01:08<00:00,  1.40it/s, loss=0.693]


FP: 3685.0, FN: 5684.0, TP: 262766.0, TN: 26166266.0
Epoch 18, New best model with accuracy: 99.9646, Dice Score: 98.2485
=> Saving checkpoint


100%|██████████| 96/96 [01:07<00:00,  1.41it/s, loss=0.692]


FP: 4993.0, FN: 5833.0, TP: 262617.0, TN: 26164956.0
Epoch 19, No improvement. Patience: 1/10


100%|██████████| 96/96 [01:05<00:00,  1.46it/s, loss=0.692]


FP: 4115.0, FN: 5966.0, TP: 262484.0, TN: 26165836.0
Epoch 20, No improvement. Patience: 2/10


100%|██████████| 96/96 [01:04<00:00,  1.49it/s, loss=0.692]


FP: 5768.0, FN: 3434.0, TP: 265016.0, TN: 26164182.0
Epoch 21, New best model with accuracy: 99.9652, Dice Score: 98.2935
=> Saving checkpoint


100%|██████████| 96/96 [01:05<00:00,  1.46it/s, loss=0.692]


FP: 4067.0, FN: 5628.0, TP: 262822.0, TN: 26165880.0
Epoch 22, No improvement. Patience: 1/10


100%|██████████| 96/96 [01:05<00:00,  1.48it/s, loss=0.692]


FP: 5644.0, FN: 7492.0, TP: 260958.0, TN: 26164304.0
Epoch 23, No improvement. Patience: 2/10


100%|██████████| 96/96 [01:08<00:00,  1.41it/s, loss=0.692]


FP: 3719.0, FN: 8590.0, TP: 259860.0, TN: 26166232.0
Epoch 24, No improvement. Patience: 3/10


100%|██████████| 96/96 [01:06<00:00,  1.44it/s, loss=0.692]


FP: 507.0, FN: 32163.0, TP: 236287.0, TN: 26169442.0
Epoch 25, No improvement. Patience: 4/10


100%|██████████| 96/96 [01:06<00:00,  1.44it/s, loss=0.692]


FP: 4110.0, FN: 7178.0, TP: 261272.0, TN: 26165836.0
Epoch 26, No improvement. Patience: 5/10


100%|██████████| 96/96 [01:05<00:00,  1.47it/s, loss=0.692]


FP: 4206.0, FN: 6851.0, TP: 261599.0, TN: 26165742.0
Epoch 27, No improvement. Patience: 6/10


100%|██████████| 96/96 [01:06<00:00,  1.44it/s, loss=0.692]


FP: 4156.0, FN: 6452.0, TP: 261998.0, TN: 26165794.0
Epoch 28, No improvement. Patience: 7/10


100%|██████████| 96/96 [01:07<00:00,  1.42it/s, loss=0.692]


FP: 4040.0, FN: 7780.0, TP: 260670.0, TN: 26165908.0
Epoch 29, No improvement. Patience: 8/10


100%|██████████| 96/96 [01:06<00:00,  1.44it/s, loss=0.692]


FP: 3140.0, FN: 8300.0, TP: 260150.0, TN: 26166812.0
Epoch 30, No improvement. Patience: 9/10


100%|██████████| 96/96 [01:06<00:00,  1.44it/s, loss=0.692]


FP: 3843.0, FN: 8077.0, TP: 260373.0, TN: 26166108.0
Epoch 31, No improvement. Patience: 10/10
Early stopping triggered. No improvement in 10 epochs.
Best model: Accuracy= 99.9652, Dice_score= 98.2935.


# 4.- Evaluation


## 4.1 Performance of the segmentation block with different Image Sizes

In this study, we will evaluate the model's performance in terms of dice score, accuracy, and average inference time per image based on the input image size. We will begin by studying the performance with a downscaling factor of 4 (Image Height = 270 and Image Width = 480) and compare it with the model's performance without downscaling (Image Height = 1080, Image Width = 1920).

### Image Height = 270 & Image Width=480

In [58]:
# Define image dimensions
IMAGE_HEIGHT = 270
IMAGE_WIDTH = 480

# Path to the saved model checkpoint
CHECKPOINT_PATH = '../models/unet_checkpoint.pth.tar'

# Get data loaders for training, validation, and testing datasets with specified parameters
train_loader, val_loader, test_loader = get_loaders(
    TRAINDIR_IMGS, TRAINDIR_MASKS, VALDIR_IMGS, VALDIR_MASKS,
    TESTDIR_IMGS, TESTDIR_MASKS, BATCH_SIZE, train_transform,
    val_transforms, test_transforms, NUM_WORKERS, PIN_MEMORY
)

# Initialize the UNET model with specified input and output channels
model = UNET(in_channels=3, out_channels=1).to(DEVICE)

# Load the model checkpoint
load_checkpoint(torch.load(CHECKPOINT_PATH), model)

# Set the model to evaluation mode
model.eval()

# Record the start time of processing
start_time = time.time()

# Evaluate the model accuracy and dice score on the test dataset
accuracy, dice_score = check_accuracy(test_loader, model, device=DEVICE)

# Record the end time of processing
end_time = time.time()

# Calculate the total processing time
total_time = end_time - start_time

# Calculate the average processing time per image
average_time_per_image = total_time / len(test_loader.dataset)

# Print the model accuracy
print(f'Model accuracy: {accuracy}')

# Print the model dice score
print(f'Model dice score: {dice_score}')

# Print the average time per image
print(f"Average time per image: {average_time_per_image:.2f} seconds")


=> Loading checkpoint
FP: 1581.0, FN: 3.0, TP: 15827.0, TN: 3092989.0
Model accuracy: 99.9490737915039
Model dice score: 95.23436737060547
Average time per image: 0.05 seconds


### Image Height =1080 & Image Width=1920

In [None]:
# Define image dimensions
IMAGE_HEIGHT = 1080
IMAGE_WIDTH = 1920

# Path to the saved model checkpoint
CHECKPOINT_PATH = '../models/unet_checkpoint.pth.tar'

# Get data loaders for training, validation, and testing datasets with specified parameters
train_loader, val_loader, test_loader = get_loaders(
    TRAINDIR_IMGS, TRAINDIR_MASKS, VALDIR_IMGS, VALDIR_MASKS,
    TESTDIR_IMGS, TESTDIR_MASKS, BATCH_SIZE, train_transform,
    val_transforms, test_transforms, NUM_WORKERS, PIN_MEMORY
)

# Initialize the UNET model with specified input and output channels
model = UNET(in_channels=3, out_channels=1).to(DEVICE)

# Load the model checkpoint
load_checkpoint(torch.load(CHECKPOINT_PATH), model)

# Set the model to evaluation mode
model.eval()

# Record the start time of processing
start_time = time.time()

# Evaluate the model accuracy and dice score on the test dataset
accuracy, dice_score = check_accuracy(test_loader, model, device=DEVICE)

# Record the end time of processing
end_time = time.time()

# Calculate the total processing time
total_time = end_time - start_time

# Calculate the average processing time per image
average_time_per_image = total_time / len(test_loader.dataset)

# Print the model accuracy
print(f'Model accuracy: {accuracy}')

# Print the model dice score
print(f'Model dice score: {dice_score}')

# Print the average time per image
print(f"Average time per image: {average_time_per_image:.2f} seconds")

It is observed that the performance is slightly worse when applying the reduced scale. However, the inference time is much lower when using the downscaled images, leading us to conclude that it is advantageous to use this for inference.

## 4.2 Performance of the segmentation block with different type of corruptions

In this second study, we will evaluate the model's performance on the test set with different types of corruptions and severities. The same metrics of accuracy and dice score have been used. The results are presented in the form of a spider plot, as this allows for a quick evaluation of the performance against the different types of corruptions.

In [13]:
# Image dimensions and directory paths
IMAGE_HEIGHT = 270  # Height of the images
IMAGE_WIDTH = 480  # Width of the images
FOLDER = '../dataset/test-C'  # Path to the main dataset folder
SEVERITIES = ['s1', 's2', 's3', 's4', 's5']  # List of severity levels for corruptions
CORRUPTION = 'c0'  # Corruption type identifier
SUBFOLDERS = ['imgs', 'masks']  # Subfolders for images and masks
GRAPH_SAVEFILE = f'../results/saved_graphs/{CORRUPTION}'  # Path to save graph results
CHECKPOINT_PATH = '../models/unet_checkpoint.pth.tar'  # Path to the model checkpoint file

# Define transformations
corr_transforms = A.Compose([
    A.Resize(height=IMAGE_HEIGHT, width=IMAGE_WIDTH, always_apply=True),  # Resize images to specified dimensions
    A.Normalize(  # Normalize images to zero mean and unit variance
        mean=[0.0, 0.0, 0.0],
        std=[1.0, 1.0, 1.0],
        max_pixel_value=255.0,
    ),
    ToTensorV2(),  # Convert images to PyTorch tensors
])

def get_corruption_dataset_loaders(img_dir, mask_dir, batch_size, transform, num_workers=4, pin_memory=True):
    """Create DataLoader for corrupted dataset."""
    # Initialize dataset with specified directories and transformations
    corr_ds = RobotDataset(image_dir=img_dir, mask_dir=mask_dir, transform=transform)
    # Create DataLoader for the dataset with given parameters
    corr_loader = DataLoader(corr_ds, batch_size=batch_size, shuffle=True, num_workers=num_workers, pin_memory=pin_memory, worker_init_fn=worker_init_fn)
    return corr_loader

# Initialize model and load checkpoint
model = UNET(in_channels=3, out_channels=1).to(DEVICE)  # Create UNET model and move it to the specified device
load_checkpoint(torch.load(CHECKPOINT_PATH), model)  # Load the model checkpoint
model.eval()  # Set the model to evaluation mode

=> Loading checkpoint


UNET(
  (ups): ModuleList(
    (0): ConvTranspose2d(1024, 512, kernel_size=(2, 2), stride=(2, 2))
    (1): DoubleConv(
      (conv): Sequential(
        (0): Conv2d(1024, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (1): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (2): ReLU(inplace=True)
        (3): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (4): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (5): ReLU(inplace=True)
      )
    )
    (2): ConvTranspose2d(512, 256, kernel_size=(2, 2), stride=(2, 2))
    (3): DoubleConv(
      (conv): Sequential(
        (0): Conv2d(512, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (2): ReLU(inplace=True)
        (3): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), p

In [19]:
def spider_plot(dice_scores, graph_savefile, corr_name=None):
    os.makedirs(os.path.dirname(graph_savefile), exist_ok=True)  # Create directories if they don't exist
    
    labels = np.array(SEVERITIES)  # Convert severity levels to a NumPy array
    num_vars = len(labels)  # Number of variables (severity levels)
    
    # Compute angle for each bar
    angles = np.linspace(0, 2 * np.pi, num_vars, endpoint=False).tolist()  # Equally spaced angles around the circle
    
    # Close the loop for plotting by appending the start value to the end
    dice_scores = np.concatenate((dice_scores, [dice_scores[0]]))  # Append the first dice score to the end
    angles += angles[:1]  # Append the first angle to the end
    
    fig, ax = plt.subplots(figsize=(8, 8), subplot_kw=dict(polar=True))  # Create a polar subplot
    ax.fill(angles, dice_scores, color='cyan', alpha=0.6)  # Fill the area under the plot
    ax.plot(angles, dice_scores, color='blue', linewidth=2)  # Plot the dice scores
    
    # Set labels for each slice
    ax.set_xticks(angles[:-1])  # Set the ticks at each angle (except the last one to avoid duplication)
    ax.set_xticklabels(labels, fontsize=14, fontweight='bold', color='navy')  # Set the labels for each tick
    
    # Set the range for each slice
    ax.set_ylim(0, 100)  # Set the y-axis range from 0 to 100
    
    # Aesthetic improvements
    ax.set_facecolor('floralwhite')  # Set background color
    ax.grid(color='grey', linestyle='--', linewidth=0.5)  # Customize the grid
    
    # Set the plot title
    title = f'{corr_name} Dice Score Spider Plot' if corr_name else 'Dice Score Spider Plot'
    plt.title(title, fontsize=16, fontweight='bold')  # Set the plot title
    
    plt.savefig(graph_savefile)  # Save the plot to the specified file
    plt.close()  # Close the plot to free memory



### 4.2.1 c=0 Gaussian Noise

In [72]:
FOLDER = '../dataset/test-C'
CORRUPTION = 'c0'
SEVERITIES= ['s1','s2','s3','s4','s5']
SUBFOLDERS = ['imgs','masks']
GRAPH_SAVEFILE = f'../results/saved_graphs/{CORRUPTION}'

# Array to hold dice scores
dice_scores = np.zeros(len(SEVERITIES))

# Main loop to process each severity level
for i, severity in enumerate(SEVERITIES):
    img_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[0]}'
    mask_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[1]}'
    
    # Assuming get_corruption_dataset_loaders returns a DataLoader
    test_loader = get_corruption_dataset_loaders(img_dir, mask_dir, BATCH_SIZE, corr_transforms, NUM_WORKERS, PIN_MEMORY)
    
    # Assuming check_accuracy returns accuracy and dice_score
    accuracy, dice_score = check_accuracy(test_loader, model, device=DEVICE)
    
    # Output the results
    print(f'The accuracy of s={i} is: {accuracy}%')
    print(f'The dice_score of s={i} is: {dice_score}%')
    
    # Store the dice score
    dice_scores[i] = dice_score

print(dice_scores)
# Generating the spider plot after processing all severities
spider_plot(dice_scores, GRAPH_SAVEFILE,'Gaussian Noise')

FP: 3721.0, FN: 32.0, TP: 37257.0, TN: 7086990.0
The accuracy of s=0 is: 99.94734954833984%
The dice_score of s=0 is: 95.20487976074219%
FP: 3895.0, FN: 26.0, TP: 37263.0, TN: 7086816.0
The accuracy of s=1 is: 99.94499206542969%
The dice_score of s=1 is: 95.00172424316406%
FP: 12129.0, FN: 31.0, TP: 37258.0, TN: 7078582.0
The accuracy of s=2 is: 99.82940673828125%
The dice_score of s=2 is: 85.97074127197266%
FP: 53162.0, FN: 45.0, TP: 37244.0, TN: 7037549.0
The accuracy of s=3 is: 99.25354766845703%
The dice_score of s=3 is: 58.33274841308594%
FP: 142445.0, FN: 117.0, TP: 37172.0, TN: 6948266.0
The accuracy of s=4 is: 97.99996948242188%
The dice_score of s=4 is: 34.27475357055664%
[95.20487976 95.00172424 85.97074127 58.33274841 34.27475357]


### 4.2.2 c=1 Shot Noise

In [73]:
CORRUPTION = 'c1'
GRAPH_SAVEFILE = f'../results/saved_graphs/{CORRUPTION}'

# Array to hold dice scores
dice_scores = np.zeros(len(SEVERITIES))

# Main loop to process each severity level
for i, severity in enumerate(SEVERITIES):
    img_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[0]}'
    mask_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[1]}'
    
    # Assuming get_corruption_dataset_loaders returns a DataLoader
    test_loader = get_corruption_dataset_loaders(img_dir, mask_dir, BATCH_SIZE, corr_transforms, NUM_WORKERS, PIN_MEMORY)
    
    # Assuming check_accuracy returns accuracy and dice_score
    accuracy, dice_score = check_accuracy(test_loader, model, device=DEVICE)
    
    # Output the results
    print(f'The accuracy of s={i} is: {accuracy}%')
    print(f'The dice_score of s={i} is: {dice_score}%')
    
    # Store the dice score
    dice_scores[i] = dice_score

print(dice_scores)
# Generating the spider plot after processing all severities
spider_plot(dice_scores, GRAPH_SAVEFILE,'Shot Noise')

FP: 3932.0, FN: 27.0, TP: 37262.0, TN: 7086779.0
The accuracy of s=0 is: 99.9444580078125%
The dice_score of s=0 is: 94.95559692382812%
FP: 5586.0, FN: 26.0, TP: 37263.0, TN: 7085125.0
The accuracy of s=1 is: 99.9212646484375%
The dice_score of s=1 is: 92.99707794189453%
FP: 24288.0, FN: 29.0, TP: 37260.0, TN: 7066423.0
The accuracy of s=2 is: 99.65885162353516%
The dice_score of s=2 is: 75.39686584472656%
FP: 113246.0, FN: 47.0, TP: 37242.0, TN: 6977465.0
The accuracy of s=3 is: 98.41059112548828%
The dice_score of s=3 is: 39.66619873046875%
FP: 230444.0, FN: 86.0, TP: 37203.0, TN: 6860267.0
The accuracy of s=4 is: 96.76585388183594%
The dice_score of s=4 is: 24.400529861450195%
[94.95559692 92.99707794 75.39686584 39.66619873 24.40052986]


### 4.2.3  c=2 Impulse Noise

In [74]:
FOLDER = '../dataset/test-C'
CORRUPTION = 'c2'
SEVERITIES= ['s1','s2','s3','s4','s5']
SUBFOLDERS = ['imgs','masks']
GRAPH_SAVEFILE = f'../results/saved_graphs/{CORRUPTION}'

# Array to hold dice scores
dice_scores = np.zeros(len(SEVERITIES))

# Main loop to process each severity level
for i, severity in enumerate(SEVERITIES):
    img_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[0]}'
    mask_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[1]}'
    
    # Assuming get_corruption_dataset_loaders returns a DataLoader
    test_loader = get_corruption_dataset_loaders(img_dir, mask_dir, BATCH_SIZE, corr_transforms, NUM_WORKERS, PIN_MEMORY)
    
    # Assuming check_accuracy returns accuracy and dice_score
    accuracy, dice_score = check_accuracy(test_loader, model, device=DEVICE)
    
    # Output the results
    print(f'The accuracy of s={i} is: {accuracy}%')
    print(f'The dice_score of s={i} is: {dice_score}%')
    
    # Store the dice score
    dice_scores[i] = dice_score

print(dice_scores)
# Generating the spider plot after processing all severities
spider_plot(dice_scores, GRAPH_SAVEFILE,'Impulse Noise')

FP: 3727.0, FN: 24.0, TP: 37265.0, TN: 7086984.0
The accuracy of s=0 is: 99.94737243652344%
The dice_score of s=0 is: 95.20829010009766%
FP: 4145.0, FN: 32.0, TP: 37257.0, TN: 7086566.0
The accuracy of s=1 is: 99.94140625%
The dice_score of s=1 is: 94.69189453125%
FP: 6649.0, FN: 27.0, TP: 37262.0, TN: 7084062.0
The accuracy of s=2 is: 99.90634155273438%
The dice_score of s=2 is: 91.77832794189453%
FP: 28996.0, FN: 40.0, TP: 37249.0, TN: 7061715.0
The accuracy of s=3 is: 99.5926513671875%
The dice_score of s=3 is: 71.95510864257812%
FP: 85043.0, FN: 115.0, TP: 37174.0, TN: 7005668.0
The accuracy of s=4 is: 98.80530548095703%
The dice_score of s=4 is: 46.611412048339844%
[95.2082901  94.69189453 91.77832794 71.95510864 46.61141205]


### 4.2.4  c=4 Glass Blur

In [75]:
FOLDER = '../dataset/test-C'
CORRUPTION = 'c4'
SEVERITIES= ['s1','s2','s3','s4','s5']
SUBFOLDERS = ['imgs','masks']
GRAPH_SAVEFILE = f'../results/saved_graphs/{CORRUPTION}'

# Array to hold dice scores
dice_scores = np.zeros(len(SEVERITIES))

# Main loop to process each severity level
for i, severity in enumerate(SEVERITIES):
    img_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[0]}'
    mask_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[1]}'
    
    # Assuming get_corruption_dataset_loaders returns a DataLoader
    test_loader = get_corruption_dataset_loaders(img_dir, mask_dir, BATCH_SIZE, corr_transforms, NUM_WORKERS, PIN_MEMORY)
    
    # Assuming check_accuracy returns accuracy and dice_score
    accuracy, dice_score = check_accuracy(test_loader, model, device=DEVICE)
    
    # Output the results
    print(f'The accuracy of s={i} is: {accuracy}%')
    print(f'The dice_score of s={i} is: {dice_score}%')
    
    # Store the dice score
    dice_scores[i] = dice_score

print(dice_scores)
# Generating the spider plot after processing all severities
spider_plot(dice_scores, GRAPH_SAVEFILE,'Glass Blur')

FP: 2347.0, FN: 307.0, TP: 36982.0, TN: 7088364.0
The accuracy of s=0 is: 99.9627685546875%
The dice_score of s=0 is: 96.53606414794922%
FP: 1307.0, FN: 1016.0, TP: 36273.0, TN: 7089404.0
The accuracy of s=1 is: 99.9674072265625%
The dice_score of s=1 is: 96.89724731445312%
FP: 727.0, FN: 2160.0, TP: 35129.0, TN: 7089984.0
The accuracy of s=2 is: 99.9594955444336%
The dice_score of s=2 is: 96.05304718017578%
FP: 314.0, FN: 3982.0, TP: 33307.0, TN: 7090397.0
The accuracy of s=3 is: 99.93972778320312%
The dice_score of s=3 is: 93.94161987304688%
FP: 4.0, FN: 36616.0, TP: 673.0, TN: 7090707.0
The accuracy of s=4 is: 99.48625183105469%
The dice_score of s=4 is: 3.5452771186828613%
[96.53606415 96.89724731 96.05304718 93.94161987  3.54527712]


### 4.2.5 c=5 Motion Blur

In [76]:
CORRUPTION = 'c5'
GRAPH_SAVEFILE = f'../results/saved_graphs/{CORRUPTION}'

# Array to hold dice scores
dice_scores = np.zeros(len(SEVERITIES))

# Main loop to process each severity level
for i, severity in enumerate(SEVERITIES):
    img_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[0]}'
    mask_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[1]}'
    
    # Assuming get_corruption_dataset_loaders returns a DataLoader
    test_loader = get_corruption_dataset_loaders(img_dir, mask_dir, BATCH_SIZE, corr_transforms, NUM_WORKERS, PIN_MEMORY)
    
    # Assuming check_accuracy returns accuracy and dice_score
    accuracy, dice_score = check_accuracy(test_loader, model, device=DEVICE)
    
    # Output the results
    print(f'The accuracy of s={i} is: {accuracy}%')
    print(f'The dice_score of s={i} is: {dice_score}%')
    
    # Store the dice score
    dice_scores[i] = dice_score

print(dice_scores)
# Generating the spider plot after processing all severities
spider_plot(dice_scores, GRAPH_SAVEFILE,'Motion Blur')

FP: 6418.0, FN: 74.0, TP: 37215.0, TN: 7084293.0
The accuracy of s=0 is: 99.90892028808594%
The dice_score of s=0 is: 91.97746276855469%
FP: 10044.0, FN: 54.0, TP: 37235.0, TN: 7080667.0
The accuracy of s=1 is: 99.85832977294922%
The dice_score of s=1 is: 88.05931091308594%
FP: 10044.0, FN: 54.0, TP: 37235.0, TN: 7080667.0
The accuracy of s=2 is: 99.85832977294922%
The dice_score of s=2 is: 88.05931091308594%
FP: 10044.0, FN: 54.0, TP: 37235.0, TN: 7080667.0
The accuracy of s=3 is: 99.85832977294922%
The dice_score of s=3 is: 88.05931091308594%
FP: 13791.0, FN: 84.0, TP: 37205.0, TN: 7076920.0
The accuracy of s=4 is: 99.80534362792969%
The dice_score of s=4 is: 84.28385162353516%
[91.97746277 88.05931091 88.05931091 88.05931091 84.28385162]


### 4.2.6 c=6 Fog

In [77]:
CORRUPTION = 'c6'
GRAPH_SAVEFILE = f'../results/saved_graphs/{CORRUPTION}'

# Array to hold dice scores
dice_scores = np.zeros(len(SEVERITIES))

# Main loop to process each severity level
for i, severity in enumerate(SEVERITIES):
    img_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[0]}'
    mask_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[1]}'
    
    # Assuming get_corruption_dataset_loaders returns a DataLoader
    test_loader = get_corruption_dataset_loaders(img_dir, mask_dir, BATCH_SIZE, corr_transforms, NUM_WORKERS, PIN_MEMORY)
    
    # Assuming check_accuracy returns accuracy and dice_score
    accuracy, dice_score = check_accuracy(test_loader, model, device=DEVICE)
    
    # Output the results
    print(f'The accuracy of s={i} is: {accuracy}%')
    print(f'The dice_score of s={i} is: {dice_score}%')
    
    # Store the dice score
    dice_scores[i] = dice_score

print(dice_scores)
# Generating the spider plot after processing all severities
spider_plot(dice_scores, GRAPH_SAVEFILE,'Fog')

FP: 991.0, FN: 1434.0, TP: 35855.0, TN: 7089720.0
The accuracy of s=0 is: 99.96598052978516%
The dice_score of s=0 is: 96.72893524169922%
FP: 723.0, FN: 2758.0, TP: 34531.0, TN: 7089988.0
The accuracy of s=1 is: 99.95116424560547%
The dice_score of s=1 is: 95.20146942138672%
FP: 671.0, FN: 4369.0, TP: 32920.0, TN: 7090040.0
The accuracy of s=2 is: 99.92929077148438%
The dice_score of s=2 is: 92.88938903808594%
FP: 710.0, FN: 3937.0, TP: 33352.0, TN: 7090001.0
The accuracy of s=3 is: 99.93480682373047%
The dice_score of s=3 is: 93.48712921142578%
FP: 1172.0, FN: 5504.0, TP: 31785.0, TN: 7089539.0
The accuracy of s=4 is: 99.90634155273438%
The dice_score of s=4 is: 90.49625396728516%
[96.72893524 95.20146942 92.88938904 93.48712921 90.49625397]


### 4.2.7 c=7 Brightness

In [78]:
CORRUPTION = 'c7'
GRAPH_SAVEFILE = f'../results/saved_graphs/{CORRUPTION}'

# Array to hold dice scores
dice_scores = np.zeros(len(SEVERITIES))

# Main loop to process each severity level
for i, severity in enumerate(SEVERITIES):
    img_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[0]}'
    mask_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[1]}'
    
    # Assuming get_corruption_dataset_loaders returns a DataLoader
    test_loader = get_corruption_dataset_loaders(img_dir, mask_dir, BATCH_SIZE, corr_transforms, NUM_WORKERS, PIN_MEMORY)
    
    # Assuming check_accuracy returns accuracy and dice_score
    accuracy, dice_score = check_accuracy(test_loader, model, device=DEVICE)
    
    # Output the results
    print(f'The accuracy of s={i} is: {accuracy}%')
    print(f'The dice_score of s={i} is: {dice_score}%')
    
    # Store the dice score
    dice_scores[i] = dice_score

print(dice_scores)
# Generating the spider plot after processing all severities
spider_plot(dice_scores, GRAPH_SAVEFILE,'Brightness')

FP: 3698.0, FN: 32.0, TP: 37257.0, TN: 7087013.0
The accuracy of s=0 is: 99.94766998291016%
The dice_score of s=0 is: 95.23286437988281%
FP: 3823.0, FN: 55.0, TP: 37234.0, TN: 7086888.0
The accuracy of s=1 is: 99.94559478759766%
The dice_score of s=1 is: 95.05016326904297%
FP: 3815.0, FN: 77.0, TP: 37212.0, TN: 7086896.0
The accuracy of s=2 is: 99.94539642333984%
The dice_score of s=2 is: 95.03038787841797%
FP: 3745.0, FN: 108.0, TP: 37181.0, TN: 7086966.0
The accuracy of s=3 is: 99.9459457397461%
The dice_score of s=3 is: 95.07383728027344%
FP: 3674.0, FN: 189.0, TP: 37100.0, TN: 7087037.0
The accuracy of s=4 is: 99.94580841064453%
The dice_score of s=4 is: 95.05142974853516%
[95.23286438 95.05016327 95.03038788 95.07383728 95.05142975]


### 4.2.8 c=8 Contrast

In [79]:
CORRUPTION = 'c8'
GRAPH_SAVEFILE = f'../results/saved_graphs/{CORRUPTION}'

# Array to hold dice scores
dice_scores = np.zeros(len(SEVERITIES))

# Main loop to process each severity level
for i, severity in enumerate(SEVERITIES):
    img_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[0]}'
    mask_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[1]}'
    
    # Assuming get_corruption_dataset_loaders returns a DataLoader
    test_loader = get_corruption_dataset_loaders(img_dir, mask_dir, BATCH_SIZE, corr_transforms, NUM_WORKERS, PIN_MEMORY)
    
    # Assuming check_accuracy returns accuracy and dice_score
    accuracy, dice_score = check_accuracy(test_loader, model, device=DEVICE)
    
    # Output the results
    print(f'The accuracy of s={i} is: {accuracy}%')
    print(f'The dice_score of s={i} is: {dice_score}%')
    
    # Store the dice score
    dice_scores[i] = dice_score

print(dice_scores)
# Generating the spider plot after processing all severities
spider_plot(dice_scores, GRAPH_SAVEFILE,'Contrast')

FP: 182.0, FN: 3030.0, TP: 34259.0, TN: 7090529.0
The accuracy of s=0 is: 99.95494079589844%
The dice_score of s=0 is: 95.5220947265625%
FP: 0.0, FN: 13010.0, TP: 24279.0, TN: 7090711.0
The accuracy of s=1 is: 99.8174819946289%
The dice_score of s=1 is: 78.868896484375%
FP: 0.0, FN: 37289.0, TP: 0.0, TN: 7090711.0
The accuracy of s=2 is: 99.47686767578125%
The dice_score of s=2 is: 0.0%
FP: 0.0, FN: 37289.0, TP: 0.0, TN: 7090711.0
The accuracy of s=3 is: 99.47686767578125%
The dice_score of s=3 is: 0.0%
FP: 0.0, FN: 37289.0, TP: 0.0, TN: 7090711.0
The accuracy of s=4 is: 99.47686767578125%
The dice_score of s=4 is: 0.0%
[95.52209473 78.86889648  0.          0.          0.        ]


### 4.2.9 c=9 Elastic Transform

In [22]:
CORRUPTION = 'c9'
GRAPH_SAVEFILE = f'../results/saved_graphs/{CORRUPTION}'

# Array to hold dice scores
dice_scores = np.zeros(len(SEVERITIES))

# Main loop to process each severity level
for i, severity in enumerate(SEVERITIES):
    img_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[0]}'
    mask_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[1]}'
    
    # Assuming get_corruption_dataset_loaders returns a DataLoader
    test_loader = get_corruption_dataset_loaders(img_dir, mask_dir, BATCH_SIZE, corr_transforms, NUM_WORKERS, PIN_MEMORY)
    
    # Assuming check_accuracy returns accuracy and dice_score
    accuracy, dice_score = check_accuracy(test_loader, model, device=DEVICE)
    
    # Output the results
    print(f'The accuracy of s={i} is: {accuracy}%')
    print(f'The dice_score of s={i} is: {dice_score}%')
    
    # Store the dice score
    dice_scores[i] = dice_score

print(dice_scores)
# Generating the spider plot after processing all severities
spider_plot(dice_scores, GRAPH_SAVEFILE,'Elastic Transform')

FP: 14130.0, FN: 10046.0, TP: 27243.0, TN: 7076581.0
The accuracy of s=0 is: 99.66083526611328%
The dice_score of s=0 is: 69.26597595214844%
FP: 22334.0, FN: 18106.0, TP: 19183.0, TN: 7068377.0
The accuracy of s=1 is: 99.43266296386719%
The dice_score of s=1 is: 48.684112548828125%
FP: 4986.0, FN: 891.0, TP: 36398.0, TN: 7085725.0
The accuracy of s=2 is: 99.91754913330078%
The dice_score of s=2 is: 92.52983856201172%
FP: 4970.0, FN: 1073.0, TP: 36216.0, TN: 7085741.0
The accuracy of s=3 is: 99.91522216796875%
The dice_score of s=3 is: 92.2994613647461%
FP: 5334.0, FN: 1242.0, TP: 36047.0, TN: 7085377.0
The accuracy of s=4 is: 99.90774536132812%
The dice_score of s=4 is: 91.64102935791016%
[69.26597595 48.68411255 92.52983856 92.29946136 91.64102936]


### 4.2.10 c=11 Speckle Noise

In [23]:
CORRUPTION = 'c11'
GRAPH_SAVEFILE = f'../results/saved_graphs/{CORRUPTION}'

# Array to hold dice scores
dice_scores = np.zeros(len(SEVERITIES))

# Main loop to process each severity level
for i, severity in enumerate(SEVERITIES):
    img_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[0]}'
    mask_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[1]}'
    
    # Assuming get_corruption_dataset_loaders returns a DataLoader
    test_loader = get_corruption_dataset_loaders(img_dir, mask_dir, BATCH_SIZE, corr_transforms, NUM_WORKERS, PIN_MEMORY)
    
    # Assuming check_accuracy returns accuracy and dice_score
    accuracy, dice_score = check_accuracy(test_loader, model, device=DEVICE)
    
    # Output the results
    print(f'The accuracy of s={i} is: {accuracy}%')
    print(f'The dice_score of s={i} is: {dice_score}%')
    
    # Store the dice score
    dice_scores[i] = dice_score

print(dice_scores)
# Generating the spider plot after processing all severities
spider_plot(dice_scores, GRAPH_SAVEFILE,'Speckle Noise')

FP: 3918.0, FN: 22.0, TP: 37267.0, TN: 7086793.0
The accuracy of s=0 is: 99.9447250366211%
The dice_score of s=0 is: 94.9792251586914%
FP: 4104.0, FN: 23.0, TP: 37266.0, TN: 7086607.0
The accuracy of s=1 is: 99.94210052490234%
The dice_score of s=1 is: 94.75330352783203%
FP: 20353.0, FN: 22.0, TP: 37267.0, TN: 7070358.0
The accuracy of s=2 is: 99.71415710449219%
The dice_score of s=2 is: 78.53206634521484%
FP: 50358.0, FN: 25.0, TP: 37264.0, TN: 7040353.0
The accuracy of s=3 is: 99.29316711425781%
The dice_score of s=3 is: 59.66488265991211%
FP: 119343.0, FN: 30.0, TP: 37259.0, TN: 6971368.0
The accuracy of s=4 is: 98.3252944946289%
The dice_score of s=4 is: 38.43293380737305%
[94.97922516 94.75330353 78.53206635 59.66488266 38.43293381]


### 4.2.11 c=12 Gaussian Blur

In [27]:
CORRUPTION = 'c12'
GRAPH_SAVEFILE = f'../results/saved_graphs/{CORRUPTION}'

# Array to hold dice scores
dice_scores = np.zeros(len(SEVERITIES))

# Main loop to process each severity level
for i, severity in enumerate(SEVERITIES):
    img_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[0]}'
    mask_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[1]}'
    
    # Assuming get_corruption_dataset_loaders returns a DataLoader
    test_loader = get_corruption_dataset_loaders(img_dir, mask_dir, BATCH_SIZE, corr_transforms, NUM_WORKERS, PIN_MEMORY)
    
    # Assuming check_accuracy returns accuracy and dice_score
    accuracy, dice_score = check_accuracy(test_loader, model, device=DEVICE)
    
    # Output the results
    print(f'The accuracy of s={i} is: {accuracy}%')
    print(f'The dice_score of s={i} is: {dice_score}%')
    
    # Store the dice score
    dice_scores[i] = dice_score

print(dice_scores)
# Generating the spider plot after processing all severities
spider_plot(dice_scores, GRAPH_SAVEFILE,'Gaussian Blur')

FP: 762.0, FN: 1723.0, TP: 35566.0, TN: 7089949.0
The accuracy of s=0 is: 99.96513366699219%
The dice_score of s=0 is: 96.62442016601562%
FP: 189.0, FN: 36842.0, TP: 447.0, TN: 7090522.0
The accuracy of s=1 is: 99.48048400878906%
The dice_score of s=1 is: 2.3572840690612793%
FP: 102.0, FN: 37191.0, TP: 98.0, TN: 7090609.0
The accuracy of s=2 is: 99.476806640625%
The dice_score of s=2 is: 0.5228200554847717%
FP: 24.0, FN: 37281.0, TP: 8.0, TN: 7090687.0
The accuracy of s=3 is: 99.47663879394531%
The dice_score of s=3 is: 0.04287130385637283%
FP: 0.0, FN: 37289.0, TP: 0.0, TN: 7090711.0
The accuracy of s=4 is: 99.47686767578125%
The dice_score of s=4 is: 0.0%
[9.66244202e+01 2.35728407e+00 5.22820055e-01 4.28713039e-02
 0.00000000e+00]


### 4.2.12 c=13 spatter

In [25]:
CORRUPTION = 'c13'
GRAPH_SAVEFILE = f'../results/saved_graphs/{CORRUPTION}'

# Array to hold dice scores
dice_scores = np.zeros(len(SEVERITIES))

# Main loop to process each severity level
for i, severity in enumerate(SEVERITIES):
    img_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[0]}'
    mask_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[1]}'
    
    # Assuming get_corruption_dataset_loaders returns a DataLoader
    test_loader = get_corruption_dataset_loaders(img_dir, mask_dir, BATCH_SIZE, corr_transforms, NUM_WORKERS, PIN_MEMORY)
    
    # Assuming check_accuracy returns accuracy and dice_score
    accuracy, dice_score = check_accuracy(test_loader, model, device=DEVICE)
    
    # Output the results
    print(f'The accuracy of s={i} is: {accuracy}%')
    print(f'The dice_score of s={i} is: {dice_score}%')
    
    # Store the dice score
    dice_scores[i] = dice_score

print(dice_scores)
# Generating the spider plot after processing all severities
spider_plot(dice_scores, GRAPH_SAVEFILE,'Spatter')

FP: 3651.0, FN: 27.0, TP: 37262.0, TN: 7087060.0
The accuracy of s=0 is: 99.94840240478516%
The dice_score of s=0 is: 95.29679107666016%
FP: 3284.0, FN: 59.0, TP: 37230.0, TN: 7087427.0
The accuracy of s=1 is: 99.9531021118164%
The dice_score of s=1 is: 95.7032470703125%
FP: 3144.0, FN: 76.0, TP: 37213.0, TN: 7087567.0
The accuracy of s=2 is: 99.95482635498047%
The dice_score of s=2 is: 95.85297393798828%
FP: 221586.0, FN: 18.0, TP: 37271.0, TN: 6869125.0
The accuracy of s=3 is: 96.89107513427734%
The dice_score of s=3 is: 25.170692443847656%
FP: 901342.0, FN: 12.0, TP: 37277.0, TN: 6189369.0
The accuracy of s=4 is: 87.35474395751953%
The dice_score of s=4 is: 7.6394500732421875%
[95.29679108 95.70324707 95.85297394 25.17069244  7.63945007]


### 4.2.13 c=14 Saturate

In [28]:
CORRUPTION = 'c14'
GRAPH_SAVEFILE = f'../results/saved_graphs/{CORRUPTION}'

# Array to hold dice scores
dice_scores = np.zeros(len(SEVERITIES))

# Main loop to process each severity level
for i, severity in enumerate(SEVERITIES):
    img_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[0]}'
    mask_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[1]}'
    
    # Assuming get_corruption_dataset_loaders returns a DataLoader
    test_loader = get_corruption_dataset_loaders(img_dir, mask_dir, BATCH_SIZE, corr_transforms, NUM_WORKERS, PIN_MEMORY)
    
    # Assuming check_accuracy returns accuracy and dice_score
    accuracy, dice_score = check_accuracy(test_loader, model, device=DEVICE)
    
    # Output the results
    print(f'The accuracy of s={i} is: {accuracy}%')
    print(f'The dice_score of s={i} is: {dice_score}%')
    
    # Store the dice score
    dice_scores[i] = dice_score

print(dice_scores)
# Generating the spider plot after processing all severities
spider_plot(dice_scores, GRAPH_SAVEFILE,'Saturate')

FP: 843.0, FN: 1365.0, TP: 35924.0, TN: 7089868.0
The accuracy of s=0 is: 99.96902465820312%
The dice_score of s=0 is: 97.01847076416016%
FP: 281.0, FN: 36277.0, TP: 1012.0, TN: 7090430.0
The accuracy of s=1 is: 99.48712158203125%
The dice_score of s=1 is: 5.245969295501709%
FP: 5105.0, FN: 5.0, TP: 37284.0, TN: 7085606.0
The accuracy of s=2 is: 99.92831420898438%
The dice_score of s=2 is: 93.5866928100586%
FP: 464228.0, FN: 1.0, TP: 37288.0, TN: 6626483.0
The accuracy of s=3 is: 93.48724365234375%
The dice_score of s=3 is: 13.8410005569458%
FP: 4944049.0, FN: 1.0, TP: 37288.0, TN: 2146662.0
The accuracy of s=4 is: 30.639028549194336%
The dice_score of s=4 is: 1.485984444618225%
[97.01847076  5.2459693  93.58669281 13.84100056  1.48598444]


## First Iteration (Modelling & Evaluation)

We notice that the model's performance significantly worsens under severe conditions for some corruptions. At this point, we developed a post-processing block, which will be applied to the segmented images from the U-Net model and will act as a filter to ensure that the minimum number of pixels that do not correspond to a real piece pass to the next block.

## 3.3 Postprocessing Block

After evaluating the U-Net model against various levels of perturbations, we consider it beneficial to add a post-processing component to the modeling pipeline. This component will take the output image from the U-Net model and act as a filter to prevent as many pixels that do not correspond to the actual piece from passing to the next block.

### 3.3.1 Definition of the function to process and plot the image

In [None]:
def process_and_plot_image(file_path, area_thresholds):
    
    """
    The function process_and_plot_image takes an image file path and a list of area thresholds as input. 
    It performs image segmentation using morphological operations, contour detection, and the Watershed algorithm. 
    The function visualizes the segmentation results for different area thresholds, showing how the segmentation 
    changes as the threshold varies. Each threshold value is used to filter contours based on their area, and the 
    resulting segmented regions are displayed in a subplot.
    """
    
    # Load the image in grayscale
    image = cv2.imread(file_path, cv2.IMREAD_GRAYSCALE)
    if image is None:
        print(f"Error: Unable to load image {file_path}")
        return None

    # Convert to color for visualization
    image_color = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)

    # Define the kernel for morphological operations
    kernel = np.ones((5, 5), np.uint8)

    fig, axs = plt.subplots(2, 3, figsize=(15, 10))
    fig.suptitle('Study of Area Threshold for Image Segmentation')

    for idx, area_threshold in enumerate(area_thresholds):
        # Perform morphological operations: opening followed by closing
        opening = cv2.morphologyEx(image, cv2.MORPH_OPEN, kernel, iterations=2)
        closing = cv2.morphologyEx(opening, cv2.MORPH_CLOSE, kernel)

        # Find contours in the processed image
        contours, _ = cv2.findContours(closing, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        mask = np.zeros_like(image)

        # Filter and draw contours based on area threshold
        for contour in contours:
            if cv2.contourArea(contour) > area_threshold:
                cv2.drawContours(mask, [contour], -1, (255), thickness=cv2.FILLED)

        # Further morphological operations for background extraction
        sure_bg = cv2.dilate(mask, kernel, iterations=3)

        # Distance transform and thresholding to find the sure foreground
        dist_transform = cv2.distanceTransform(opening, cv2.DIST_L2, 5)
        _, sure_fg = cv2.threshold(dist_transform, 0.7 * dist_transform.max(), 255, 0)
        sure_fg = np.uint8(sure_fg)

        # Identifying unknown region
        unknown = cv2.subtract(sure_bg, sure_fg)

        # Connected components labeling
        _, markers = cv2.connectedComponents(sure_fg)
        markers = markers + 1
        markers[unknown == 255] = 0

        # Apply the Watershed algorithm for segmentation
        cv2.watershed(image_color, markers)
        image_color[markers == -1] = [255, 0, 0]

        # Prepare the final mask
        final_mask = np.zeros_like(image, dtype=np.uint8)
        final_mask[markers > 1] = 255

        # Display the result in a subplot
        ax = axs[idx // 3, idx % 3]
        ax.imshow(cv2.cvtColor(final_mask, cv2.COLOR_GRAY2RGB))
        ax.set_title(f'Area Threshold: {area_threshold}')
        ax.axis('off')

    # Adjust the layout and show the figure
    plt.subplots_adjust(hspace=0.3)
    plt.show()

In [None]:
IMAGEDIR = '../data/predictions/Not identified/image_15.png'
area_thresholds = [2000, 3000, 4000, 7000, 8000, 9000]
process_and_plot_image(IMAGEDIR, area_thresholds)

## 4.3 Evaluation of the segmentation + Postprocessing blocks


### Definition of new Utility Funcions 

In [None]:
def check_accuracy(loader, model, device="cuda"):
    """
    Computes the accuracy and Dice score of the model using a given loader.

    Parameters:
        loader (DataLoader): The DataLoader for the dataset to evaluate.
        model (torch.nn.Module): The model to evaluate.
        device (str, optional): The device to use for computation. Defaults to "cuda".

    Returns:
        tuple: A tuple containing the accuracy and Dice score, both multiplied by 100.
    """
    # Initialize counters for true positives, false positives, true negatives, and false negatives
    num_tp = 0
    num_fp = 0
    num_tn = 0
    num_fn = 0
    model.eval()  # Set the model to evaluation mode

    with torch.no_grad():  # Disable gradient calculation for evaluation
        for x, y in loader:
            x = x.to(device)  # Move input data to the specified device
            y = y.to(device).unsqueeze(1)  # Move targets to the device and adjust dimensions
            y = torch.sigmoid(y)  # Apply sigmoid activation to targets
            y = (y > 0.5).float()  # Binarize targets

            preds = torch.sigmoid(model(x))  # Get model predictions and apply sigmoid activation
            preds = (preds > 0.5).float()  # Binarize predictions

            # Update counters
            num_tp += (preds * y).sum()  # True positives
            num_tn += ((1 - preds) * (1 - y)).sum()  # True negatives
            num_fp += (preds * (1 - y)).sum()  # False positives
            num_fn += ((1 - preds) * y).sum()  # False negatives

    # Print the counts of false positives, false negatives, true positives, and true negatives
    print(f"FP: {num_fp}, FN: {num_fn}, TP: {num_tp}, TN: {num_tn}")
    
    # Calculate accuracy and Dice score
    accuracy = (num_tp + num_tn) / (num_tp + num_tn + num_fp + num_fn)
    dice_score = (2 * num_tp) / ((2 * num_tp) + num_fp + num_fn + 1e-8)  # Add small epsilon to avoid division by zero

    model.train()  # Set the model back to training mode
    return accuracy * 100, dice_score * 100  # Return accuracy and Dice score as percentages


In [None]:
def postprocess(tensor_prediction, area_threshold):
    """
    Postprocesses the tensor prediction to refine segmentation masks.

    Parameters:
        tensor_prediction (torch.Tensor): The tensor containing the prediction.
        area_threshold (float): The minimum area for contours to be kept.

    Returns:
        torch.Tensor: The final postprocessed mask tensor.
    """
    # Convert tensor to numpy array and remove any singleton dimensions
    image = tensor_prediction.squeeze().cpu().numpy()
    
    # Ensure the image is in 8-bit format
    if image.dtype != np.uint8:
        # Scale the image to the range [0, 255] and convert to uint8
        image = np.clip(image * 255, 0, 255).astype(np.uint8)

    # Handle color conversion if necessary
    if len(image.shape) == 2:  # Check if the image is grayscale
        # Convert grayscale image to BGR color format
        image_color = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)
    else:
        # Image is already in BGR format
        image_color = image

    # Define the kernel for morphological operations
    kernel = np.ones((5, 5), np.uint8)

    # Apply morphological opening to remove small noise
    opening = cv2.morphologyEx(image, cv2.MORPH_OPEN, kernel, iterations=2)
    # Apply morphological closing to close small holes inside the foreground
    closing = cv2.morphologyEx(opening, cv2.MORPH_CLOSE, kernel)

    # Convert the image to a binary image using Otsu's thresholding
    _, binary = cv2.threshold(closing, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
    
    # Find contours in the binary image
    contours, _ = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    # Create a mask of the same size as the image, initialized to zeros
    mask = np.zeros_like(image)

    # Filter and draw contours based on area
    for contour in contours:
        if cv2.contourArea(contour) > area_threshold:
            # Draw the contour on the mask if it meets the area threshold
            cv2.drawContours(mask, [contour], -1, (255), thickness=cv2.FILLED)

    # Further morphological cleaning by dilating the mask
    sure_bg = cv2.dilate(mask, kernel, iterations=3)

    # Apply distance transformation for segmentation
    dist_transform = cv2.distanceTransform(sure_bg, cv2.DIST_L2, 5)
    # Threshold the distance transform to get the foreground
    _, sure_fg = cv2.threshold(dist_transform, 0.7 * dist_transform.max(), 255, 0)
    sure_fg = np.uint8(sure_fg)

    # Find the unknown region by subtracting the foreground from the background
    unknown = cv2.subtract(sure_bg, sure_fg)

    # Label the connected components in the foreground
    _, markers = cv2.connectedComponents(sure_fg)
    markers = markers + 1  # Increment all labels by 1 to distinguish from the background
    markers[unknown == 255] = 0  # Mark the unknown region with zero

    # Apply the watershed algorithm to segment the connected parts
    cv2.watershed(image_color, markers)
    # Mark the boundaries in red in the original image
    image_color[markers == -1] = [255, 0, 0]

    # Prepare the final mask in the same format as input
    final_mask = np.zeros_like(image, dtype=np.uint8)
    final_mask[markers > 1] = 1  # Mark the segmented regions

    # Apply the necessary changes to the image
    image[image > 0] = 1
    
    # Convert the final mask back to tensor format and add a batch dimension if necessary
    final_tensor = torch.from_numpy(final_mask).unsqueeze(0)

    return final_tensor


In [136]:
def check_accuracy_postprocessing(loader, model, device="cuda", area_threshold=AREA_TRESHOLD):
    """
    Computes the accuracy and Dice score of the model using a given loader.

    Parameters:
        loader (DataLoader): The DataLoader for the dataset to evaluate.
        model (torch.nn.Module): The model to evaluate.
        device (str, optional): The device to use for computation. Defaults to "cuda".

    Returns:
        tuple: A tuple containing the accuracy and Dice score, both multiplied by 100.
    """
    num_tp = 0  # Initialize true positives counter
    num_fp = 0  # Initialize false positives counter
    num_tn = 0  # Initialize true negatives counter
    num_fn = 0  # Initialize false negatives counter
    model.eval()  # Set the model to evaluation mode

    with torch.no_grad():  # Disable gradient computation for evaluation
        for x, y in loader:  # Iterate over batches from the loader
            x = x.to(device)  # Move input batch to the specified device
            y = y.to(device).unsqueeze(1)  # Move target batch to the device and add channel dimension
            y = torch.sigmoid(y)  # Apply sigmoid activation to targets
            y = (y > 0.5).float()  # Binarize targets
            preds = torch.sigmoid(model(x))  # Get model predictions and apply sigmoid activation
            
            # The changes go here
            preds = preds.to('cpu')  # Move predictions to CPU
            preds = (preds > 0.5).float()  # Binarize predictions
            processed_tensors = []  # Initialize list to store processed tensors
            
            # Iterate over each tensor in the batch
            for i in range(preds.size(0)):
                single_tensor = preds[i]  # Extract individual tensor from batch

                # Call postprocess function on the tensor
                processed_tensor = postprocess(single_tensor, area_threshold)
            
                # Store the processed tensor
                processed_tensors.append(processed_tensor)

            # Concatenate all processed tensors back into a single tensor
            final_tensor = torch.cat(processed_tensors, dim=0)
            
            preds = preds.to(device)  # Move predictions back to the original device
            
            # Move the final processed tensor to the original device
            final_tensor = final_tensor.to(device)
            final_tensor = final_tensor.unsqueeze(1)  # Add channel dimension
            final_tensor = (final_tensor > 0.5).float()  # Binarize final tensor
            
            # Convert tensors to binary
            preds_binary = (preds > 0.5).float()
            final_tensor_binary = (final_tensor > 0.5).float()
            
            # Ensure both tensors are on the correct device
            preds_binary = preds_binary.to(device)
            final_tensor_binary = final_tensor_binary.to(device)

            # Initialize counters
            num_tp = torch.tensor(0.0, device=device)
            num_tn = torch.tensor(0.0, device=device)
            num_fp = torch.tensor(0.0, device=device)
            num_fn = torch.tensor(0.0, device=device)

            # Calculate TP, TN, FP, FN
            #num_tp += (final_tensor_binary * preds_binary).sum()
            #num_tn += ((1 - final_tensor_binary) * (1 - preds_binary)).sum()
            #num_fp += (final_tensor_binary * (1 - preds_binary)).sum()
            #num_fn += ((1 - final_tensor_binary) * preds_binary).sum()

            # Optional: print results
            #print(f"True Positives: {num_tp.item()}")
            #print(f"True Negatives: {num_tn.item()}")
            #print(f"False Positives: {num_fp.item()}")
            #print(f"False Negatives: {num_fn.item()}")

            # Calculate TP, TN, FP, FN based on final tensor and targets
            num_tp += (final_tensor_binary * y).sum()
            num_tn += ((1 - final_tensor_binary) * (1 - y)).sum()
            num_fp += (final_tensor_binary * (1 - y)).sum()
            num_fn += ((1 - final_tensor_binary) * y).sum()
            
    # Calculate accuracy and Dice score
    accuracy = (num_tp + num_tn) / (num_tp + num_tn + num_fp + num_fn)
    dice_score = (2 * num_tp) / ((2 * num_tp) + num_fp + num_fn + 1e-8)

    model.train()  # Set the model back to training mode
    return accuracy * 100, dice_score * 100  # Return accuracy and Dice score in percentage

In [None]:
def spider_plot(dice_scores, dice_scores_postprocess, graph_savefile, corr_name=None):
    """
    Plots a spider plot comparing original and postprocessed Dice scores.

    Parameters:
        dice_scores (numpy.ndarray): Array of original Dice scores.
        dice_scores_postprocess (numpy.ndarray): Array of postprocessed Dice scores.
        graph_savefile (str): Path to save the generated plot.
        corr_name (str, optional): Name of the corruption type for the plot title. Defaults to None.
    """
    os.makedirs(os.path.dirname(graph_savefile), exist_ok=True)  # Create directories if they don't exist
    
    labels = np.array(SEVERITIES)  # Convert severity levels to a NumPy array
    num_vars = len(labels)  # Number of variables (severity levels)
    
    # Compute angles for each bar in the plot
    angles = np.linspace(0, 2 * np.pi, num_vars, endpoint=False).tolist()
    
    # Close the loop for plotting by appending the first value to the end
    dice_scores = np.concatenate((dice_scores, [dice_scores[0]]))  # Append the first dice score to the end
    dice_scores_postprocess = np.concatenate((dice_scores_postprocess, [dice_scores_postprocess[0]]))  # Append the first postprocessed dice score to the end
    angles += angles[:1]  # Append the first angle to the end
    
    fig, ax = plt.subplots(figsize=(8, 8), subplot_kw=dict(polar=True))  # Create a polar subplot
    
    # Plot the original Dice scores
    ax.fill(angles, dice_scores, color='blue', alpha=0.6)
    ax.plot(angles, dice_scores, color='blue', linewidth=2, label='Original')
    
    # Plot the postprocessed Dice scores
    ax.fill(angles, dice_scores_postprocess, color='red', alpha=0.4)
    ax.plot(angles, dice_scores_postprocess, color='red', linewidth=2, label='Postprocessed')
    
    # Set labels for each slice
    ax.set_xticks(angles[:-1])  # Set the ticks at each angle (except the last one to avoid duplication)
    ax.set_xticklabels(labels, fontsize=14, fontweight='bold', color='navy')  # Set the labels for each tick
    
    # Set the range for each slice
    ax.set_ylim(0, 100)  # Set the y-axis range from 0 to 100
    
    # Aesthetic improvements
    ax.set_facecolor('floralwhite')  # Set background color
    ax.grid(color='grey', linestyle='--', linewidth=0.5)  # Customize the grid
    
    # Set the plot title
    title = f'{corr_name} Dice Score Spider Plot' if corr_name else 'Dice Score Spider Plot'
    plt.title(title, fontsize=16, fontweight='bold')  # Set the plot title
    
    plt.legend(loc='upper right', fontsize=12)  # Add legend to the plot
    plt.savefig(graph_savefile)  # Save the plot to the specified file
    plt.close()  # Close the plot to free memory


### Definition of the Parameters


In [44]:
AREA_THRESHOLD = 500  # Adjusted because we are applying postprocess on smaller images
IMAGE_HEIGHT = 270  # Height of the images
IMAGE_WIDTH = 480  # Width of the images
FOLDER = '../dataset/test-C'  # Path to the main dataset folder
SEVERITIES = ['s1', 's2', 's3', 's4', 's5']  # List of severity levels for corruptions
SUBFOLDERS = ['imgs', 'masks']  # Subfolders for images and masks
GRAPH_SAVEFILE = f'../results/saved_graphs/{CORRUPTION}'  # Path to save graph results
CHECKPOINT_PATH = './models/unet_checkpoint.pth.tar'  # Path to the model checkpoint file

# Define transformations for corrupted images
corr_transforms = A.Compose([
    A.Resize(height=IMAGE_HEIGHT, width=IMAGE_WIDTH, always_apply=True),  # Resize images to specified dimensions
    A.Normalize(  # Normalize images to zero mean and unit variance
        mean=[0.0, 0.0, 0.0],
        std=[1.0, 1.0, 1.0],
        max_pixel_value=255.0,
    ),
    ToTensorV2(),  # Convert images to PyTorch tensors
])

def get_corruption_dataset_loaders(img_dir, mask_dir, batch_size, transform, num_workers=4, pin_memory=True):
    """Create DataLoader for corrupted dataset."""
    # Initialize dataset with specified directories and transformations
    corr_ds = RobotDataset(image_dir=img_dir, mask_dir=mask_dir, transform=transform)
    # Create DataLoader for the dataset with given parameters
    corr_loader = DataLoader(corr_ds, batch_size=batch_size, shuffle=True, num_workers=num_workers, pin_memory=pin_memory, worker_init_fn=worker_init_fn)
    return corr_loader

# Initialize model and load checkpoint
model = UNET(in_channels=3, out_channels=1).to(DEVICE)  # Create UNET model and move it to the specified device
load_checkpoint(torch.load(CHECKPOINT_PATH), model)  # Load the model checkpoint
model.eval()  # Set the model to evaluation mode


=> Loading checkpoint


UNET(
  (ups): ModuleList(
    (0): ConvTranspose2d(1024, 512, kernel_size=(2, 2), stride=(2, 2))
    (1): DoubleConv(
      (conv): Sequential(
        (0): Conv2d(1024, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (1): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (2): ReLU(inplace=True)
        (3): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (4): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (5): ReLU(inplace=True)
      )
    )
    (2): ConvTranspose2d(512, 256, kernel_size=(2, 2), stride=(2, 2))
    (3): DoubleConv(
      (conv): Sequential(
        (0): Conv2d(512, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (2): ReLU(inplace=True)
        (3): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), p

### 4.3.1 c=0 Gaussian Noise

In [141]:
FOLDER = '../dataset/test-C'  # Path to the main folder containing the datasets
CORRUPTION = 'c0'  # Type of corruption applied to the dataset
SEVERITIES = ['s1', 's2', 's3', 's4', 's5']  # Different levels of severity of the corruption
SUBFOLDERS = ['imgs', 'masks']  # Subfolders for images and masks within each severity level
GRAPH_SAVEFILE = f'../results/saved_graphs/{CORRUPTION}'  # Path to save the generated graph
AREA_THRESHOLD = 8000
name = 'Gaussian noise'  # Name of the corruption type

# Array to hold Dice scores for each severity level
dice_scores = np.zeros(len(SEVERITIES))
dice_scores_postprocess = np.zeros(len(SEVERITIES))

# Main loop to process each severity level
for i, severity in enumerate(SEVERITIES):
    # Construct paths to the image and mask directories for the current severity level
    img_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[0]}'
    mask_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[1]}'
    
    # Load the corrupted datasets
    test_loader = get_corruption_dataset_loaders(img_dir, mask_dir, BATCH_SIZE, corr_transforms, NUM_WORKERS, PIN_MEMORY)
    test_loader_corr = get_corruption_dataset_loaders(img_dir, mask_dir, BATCH_SIZE, corr_transforms, NUM_WORKERS, PIN_MEMORY)
    
    # Evaluate accuracy and Dice score before postprocessing
    accuracy, dice_score = check_accuracy(test_loader, model, device=DEVICE)
    
    # Evaluate accuracy and Dice score after postprocessing
    accuracy2, dice_score2 = check_accuracy_postprocessing(test_loader_corr, model, device=DEVICE, area_threshold=AREA_TRESHOLD)
    
    # Store the Dice scores
    dice_scores[i] = dice_score
    dice_scores_postprocess[i] = dice_score2

# Print the Dice scores for all severity levels
print(dice_scores)
print(dice_scores_postprocess)

# Generate the spider plot to visualize the Dice scores
spider_plot(dice_scores, dice_scores_postprocess, graph_savefile=GRAPH_SAVEFILE, corr_name=name)


FP: 3736.0, FN: 29.0, TP: 37260.0, TN: 7086975.0
FP: 3841.0, FN: 23.0, TP: 36593.0, TN: 6957943.0
FP: 12129.0, FN: 31.0, TP: 37258.0, TN: 7078582.0
FP: 53162.0, FN: 45.0, TP: 37244.0, TN: 7037549.0
FP: 142446.0, FN: 117.0, TP: 37172.0, TN: 6948265.0
[95.19065094 94.9850769  85.97074127 58.33274841 34.27459717]
[96.46716309 92.09622192 95.93479156 95.78102112 95.24790955]


### 4.3.2 c=1 Shot Noise

In [142]:
FOLDER = '../dataset/test-C'
CORRUPTION = 'c1'
SEVERITIES= ['s1','s2','s3','s4','s5']
SUBFOLDERS = ['imgs','masks']
GRAPH_SAVEFILE = f'../results/saved_graphs/{CORRUPTION}'
name = 'Shot Noise'
# Array to hold dice scores
dice_scores = np.zeros(len(SEVERITIES))
dice_scores_postprocess = np.zeros(len(SEVERITIES))
# Main loop to process each severity level
for i, severity in enumerate(SEVERITIES):
    img_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[0]}'
    mask_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[1]}'
    
    test_loader = get_corruption_dataset_loaders(img_dir, mask_dir, BATCH_SIZE, corr_transforms, NUM_WORKERS, PIN_MEMORY)
    test_loader_corr = get_corruption_dataset_loaders(img_dir, mask_dir, BATCH_SIZE, corr_transforms, NUM_WORKERS, PIN_MEMORY)
    
    accuracy, dice_score = check_accuracy(test_loader, model, device=DEVICE)
    
    #After postprocessing
    accuracy2, dice_score2 = check_accuracy_postprocessing(test_loader_corr, model, device=DEVICE, area_threshold=AREA_TRESHOLD)
    
    # Store the dice score
    dice_scores[i] = dice_score
    dice_scores_postprocess[i] = dice_score2
    
print(dice_scores)
print(dice_scores_postprocess)

# Generating the spider plot after processing all severities
spider_plot(dice_scores, dice_scores_postprocess, graph_savefile=GRAPH_SAVEFILE, corr_name=name)

FP: 3932.0, FN: 27.0, TP: 37262.0, TN: 7086779.0
FP: 5586.0, FN: 26.0, TP: 37263.0, TN: 7085125.0
FP: 24287.0, FN: 29.0, TP: 37260.0, TN: 7066424.0
FP: 113242.0, FN: 47.0, TP: 37242.0, TN: 6977469.0
FP: 230437.0, FN: 86.0, TP: 37203.0, TN: 6860274.0
[94.95559692 92.99707794 75.39762878 39.66704559 24.40108871]
[96.19619751 96.43956757 96.07313538 94.90003967 95.3412323 ]


### 4.3.3 c=2 Impulse Noise

In [143]:
FOLDER = '../dataset/test-C'
CORRUPTION = 'c2'
SEVERITIES= ['s1','s2','s3','s4','s5']
SUBFOLDERS = ['imgs','masks']
GRAPH_SAVEFILE = f'../results/saved_graphs/{CORRUPTION}'
name = 'Impulse Noise'
# Array to hold dice scores
dice_scores = np.zeros(len(SEVERITIES))
dice_scores_postprocess = np.zeros(len(SEVERITIES))
# Main loop to process each severity level
for i, severity in enumerate(SEVERITIES):
    img_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[0]}'
    mask_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[1]}'
    
    test_loader = get_corruption_dataset_loaders(img_dir, mask_dir, BATCH_SIZE, corr_transforms, NUM_WORKERS, PIN_MEMORY)
    test_loader_corr = get_corruption_dataset_loaders(img_dir, mask_dir, BATCH_SIZE, corr_transforms, NUM_WORKERS, PIN_MEMORY)
    
    accuracy, dice_score = check_accuracy(test_loader, model, device=DEVICE)
    
    #After postprocessing
    accuracy2, dice_score2 = check_accuracy_postprocessing(test_loader_corr, model, device=DEVICE, area_threshold=AREA_TRESHOLD)
    
    # Store the dice score
    dice_scores[i] = dice_score
    dice_scores_postprocess[i] = dice_score2
    
print(dice_scores)
print(dice_scores_postprocess)

# Generating the spider plot after processing all severities
spider_plot(dice_scores, dice_scores_postprocess, graph_savefile=GRAPH_SAVEFILE, corr_name=name)

FP: 3726.0, FN: 24.0, TP: 37265.0, TN: 7086985.0
FP: 4145.0, FN: 32.0, TP: 37257.0, TN: 7086566.0
FP: 6649.0, FN: 27.0, TP: 37262.0, TN: 7084062.0
FP: 28995.0, FN: 40.0, TP: 37249.0, TN: 7061716.0
FP: 85042.0, FN: 115.0, TP: 37174.0, TN: 7005669.0
[95.20950317 94.69189453 91.77832794 71.95580292 46.61170578]
[96.72683716 92.44219971 91.5531311  95.24590302 95.31038666]


### 4.3.4 c=4 Glass Blur

In [144]:
FOLDER = '../dataset/test-C'
CORRUPTION = 'c4'
SEVERITIES= ['s1','s2','s3','s4','s5']
SUBFOLDERS = ['imgs','masks']
GRAPH_SAVEFILE = f'../results/saved_graphs/{CORRUPTION}'
name = 'Glass Blur'
# Array to hold dice scores
dice_scores = np.zeros(len(SEVERITIES))
dice_scores_postprocess = np.zeros(len(SEVERITIES))
# Main loop to process each severity level
for i, severity in enumerate(SEVERITIES):
    img_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[0]}'
    mask_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[1]}'
    
    test_loader = get_corruption_dataset_loaders(img_dir, mask_dir, BATCH_SIZE, corr_transforms, NUM_WORKERS, PIN_MEMORY)
    test_loader_corr = get_corruption_dataset_loaders(img_dir, mask_dir, BATCH_SIZE, corr_transforms, NUM_WORKERS, PIN_MEMORY)
    
    accuracy, dice_score = check_accuracy(test_loader, model, device=DEVICE)
    
    #After postprocessing
    accuracy2, dice_score2 = check_accuracy_postprocessing(test_loader_corr, model, device=DEVICE, area_threshold=AREA_TRESHOLD)
    
    # Store the dice score
    dice_scores[i] = dice_score
    dice_scores_postprocess[i] = dice_score2
    
print(dice_scores)
print(dice_scores_postprocess)

# Generating the spider plot after processing all severities
spider_plot(dice_scores, dice_scores_postprocess, graph_savefile=GRAPH_SAVEFILE, corr_name=name)

FP: 2347.0, FN: 307.0, TP: 36982.0, TN: 7088364.0
FP: 1307.0, FN: 1016.0, TP: 36273.0, TN: 7089404.0
FP: 727.0, FN: 2160.0, TP: 35129.0, TN: 7089984.0
FP: 314.0, FN: 3982.0, TP: 33307.0, TN: 7090397.0
FP: 4.0, FN: 36616.0, TP: 673.0, TN: 7090707.0
[96.53606415 96.89724731 96.05304718 93.94161987  3.54527712]
[97.2215271  95.42256927 95.28530121 92.15415955  0.        ]


### 4.3.5 c=5 Motion Blur

In [145]:
FOLDER = '../dataset/test-C'
CORRUPTION = 'c5'
SEVERITIES= ['s1','s2','s3','s4','s5']
SUBFOLDERS = ['imgs','masks']
GRAPH_SAVEFILE = f'../results/saved_graphs/{CORRUPTION}'
name = 'Motion Blur'
# Array to hold dice scores
dice_scores = np.zeros(len(SEVERITIES))
dice_scores_postprocess = np.zeros(len(SEVERITIES))
# Main loop to process each severity level
for i, severity in enumerate(SEVERITIES):
    img_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[0]}'
    mask_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[1]}'
    
    test_loader = get_corruption_dataset_loaders(img_dir, mask_dir, BATCH_SIZE, corr_transforms, NUM_WORKERS, PIN_MEMORY)
    test_loader_corr = get_corruption_dataset_loaders(img_dir, mask_dir, BATCH_SIZE, corr_transforms, NUM_WORKERS, PIN_MEMORY)
    
    accuracy, dice_score = check_accuracy(test_loader, model, device=DEVICE)
    
    #After postprocessing
    accuracy2, dice_score2 = check_accuracy_postprocessing(test_loader_corr, model, device=DEVICE, area_threshold=AREA_TRESHOLD)
    
    # Store the dice score
    dice_scores[i] = dice_score
    dice_scores_postprocess[i] = dice_score2
    
print(dice_scores)
print(dice_scores_postprocess)

# Generating the spider plot after processing all severities
spider_plot(dice_scores, dice_scores_postprocess, graph_savefile=GRAPH_SAVEFILE, corr_name=name)

FP: 6418.0, FN: 74.0, TP: 37215.0, TN: 7084293.0
FP: 10044.0, FN: 54.0, TP: 37235.0, TN: 7080667.0
FP: 10044.0, FN: 54.0, TP: 37235.0, TN: 7080667.0
FP: 10044.0, FN: 54.0, TP: 37235.0, TN: 7080667.0
FP: 13792.0, FN: 84.0, TP: 37205.0, TN: 7076919.0
[91.97746277 88.05931091 88.05931091 88.05931091 84.28289795]
[93.99585724 90.27441406 91.07578278 90.84861755 87.42728424]


### 4.3.6 c=6 Fog

In [146]:
FOLDER = '../dataset/test-C'
CORRUPTION = 'c6'
SEVERITIES= ['s1','s2','s3','s4','s5']
SUBFOLDERS = ['imgs','masks']
GRAPH_SAVEFILE = f'../results/saved_graphs/{CORRUPTION}'
name = 'Fog'
# Array to hold dice scores
dice_scores = np.zeros(len(SEVERITIES))
dice_scores_postprocess = np.zeros(len(SEVERITIES))
# Main loop to process each severity level
for i, severity in enumerate(SEVERITIES):
    img_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[0]}'
    mask_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[1]}'
    
    test_loader = get_corruption_dataset_loaders(img_dir, mask_dir, BATCH_SIZE, corr_transforms, NUM_WORKERS, PIN_MEMORY)
    test_loader_corr = get_corruption_dataset_loaders(img_dir, mask_dir, BATCH_SIZE, corr_transforms, NUM_WORKERS, PIN_MEMORY)
    
    accuracy, dice_score = check_accuracy(test_loader, model, device=DEVICE)
    
    #After postprocessing
    accuracy2, dice_score2 = check_accuracy_postprocessing(test_loader_corr, model, device=DEVICE, area_threshold=AREA_TRESHOLD)
    
    # Store the dice score
    dice_scores[i] = dice_score
    dice_scores_postprocess[i] = dice_score2
    
print(dice_scores)
print(dice_scores_postprocess)

# Generating the spider plot after processing all severities
spider_plot(dice_scores, dice_scores_postprocess, graph_savefile=GRAPH_SAVEFILE, corr_name=name)

FP: 991.0, FN: 1434.0, TP: 35855.0, TN: 7089720.0
FP: 723.0, FN: 2757.0, TP: 34532.0, TN: 7089988.0
FP: 671.0, FN: 4369.0, TP: 32920.0, TN: 7090040.0
FP: 710.0, FN: 3937.0, TP: 33352.0, TN: 7090001.0
FP: 1172.0, FN: 5504.0, TP: 31785.0, TN: 7089539.0
[96.72893524 95.20291138 92.88938904 93.48712921 90.49625397]
[96.13828278 93.83876801 93.09766388 77.81071472 53.93540192]


### 4.3.7 c=7 Brightness

In [147]:
FOLDER = '../dataset/test-C'
CORRUPTION = 'c7'
SEVERITIES= ['s1','s2','s3','s4','s5']
SUBFOLDERS = ['imgs','masks']
GRAPH_SAVEFILE = f'../results/saved_graphs/{CORRUPTION}'
name = 'Brightness'
# Array to hold dice scores
dice_scores = np.zeros(len(SEVERITIES))
dice_scores_postprocess = np.zeros(len(SEVERITIES))
# Main loop to process each severity level
for i, severity in enumerate(SEVERITIES):
    img_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[0]}'
    mask_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[1]}'
    
    test_loader = get_corruption_dataset_loaders(img_dir, mask_dir, BATCH_SIZE, corr_transforms, NUM_WORKERS, PIN_MEMORY)
    test_loader_corr = get_corruption_dataset_loaders(img_dir, mask_dir, BATCH_SIZE, corr_transforms, NUM_WORKERS, PIN_MEMORY)
    
    accuracy, dice_score = check_accuracy(test_loader, model, device=DEVICE)
    
    #After postprocessing
    accuracy2, dice_score2 = check_accuracy_postprocessing(test_loader_corr, model, device=DEVICE, area_threshold=AREA_TRESHOLD)
    
    # Store the dice score
    dice_scores[i] = dice_score
    dice_scores_postprocess[i] = dice_score2
    
print(dice_scores)
print(dice_scores_postprocess)

# Generating the spider plot after processing all severities
spider_plot(dice_scores, dice_scores_postprocess, graph_savefile=GRAPH_SAVEFILE, corr_name=name)

FP: 3698.0, FN: 32.0, TP: 37257.0, TN: 7087013.0
FP: 3823.0, FN: 55.0, TP: 37234.0, TN: 7086888.0
FP: 3815.0, FN: 77.0, TP: 37212.0, TN: 7086896.0
FP: 3744.0, FN: 108.0, TP: 37181.0, TN: 7086967.0
FP: 3674.0, FN: 189.0, TP: 37100.0, TN: 7087037.0
[95.23286438 95.05016327 95.03038788 95.07505035 95.05142975]
[96.24013519 92.38829803 95.95375824 92.39871216 91.96096039]


### 4.3.8 c=8 Contrast

In [148]:
FOLDER = '../dataset/test-C'
CORRUPTION = 'c8'
SEVERITIES= ['s1','s2','s3','s4','s5']
SUBFOLDERS = ['imgs','masks']
GRAPH_SAVEFILE = f'../results/saved_graphs/{CORRUPTION}'
name = 'Contrast'
# Array to hold dice scores
dice_scores = np.zeros(len(SEVERITIES))
dice_scores_postprocess = np.zeros(len(SEVERITIES))
# Main loop to process each severity level
for i, severity in enumerate(SEVERITIES):
    img_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[0]}'
    mask_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[1]}'
    
    test_loader = get_corruption_dataset_loaders(img_dir, mask_dir, BATCH_SIZE, corr_transforms, NUM_WORKERS, PIN_MEMORY)
    test_loader_corr = get_corruption_dataset_loaders(img_dir, mask_dir, BATCH_SIZE, corr_transforms, NUM_WORKERS, PIN_MEMORY)
    
    accuracy, dice_score = check_accuracy(test_loader, model, device=DEVICE)
    
    #After postprocessing
    accuracy2, dice_score2 = check_accuracy_postprocessing(test_loader_corr, model, device=DEVICE, area_threshold=AREA_TRESHOLD)
    
    # Store the dice score
    dice_scores[i] = dice_score
    dice_scores_postprocess[i] = dice_score2
    
print(dice_scores)
print(dice_scores_postprocess)

# Generating the spider plot after processing all severities
spider_plot(dice_scores, dice_scores_postprocess, graph_savefile=GRAPH_SAVEFILE, corr_name=name)

FP: 182.0, FN: 3030.0, TP: 34259.0, TN: 7090529.0
FP: 0.0, FN: 13010.0, TP: 24279.0, TN: 7090711.0
FP: 0.0, FN: 37289.0, TP: 0.0, TN: 7090711.0
FP: 0.0, FN: 37289.0, TP: 0.0, TN: 7090711.0
FP: 0.0, FN: 37289.0, TP: 0.0, TN: 7090711.0
[95.52209473 78.86889648  0.          0.          0.        ]
[94.10299683  0.          0.          0.          0.        ]


### 4.3.9 c=9 Elastic Transform

In [149]:
FOLDER = '../dataset/test-C'
CORRUPTION = 'c9'
SEVERITIES= ['s1','s2','s3','s4','s5']
SUBFOLDERS = ['imgs','masks']
GRAPH_SAVEFILE = f'../results/saved_graphs/{CORRUPTION}'
name = 'Elastic Transform'
# Array to hold dice scores
dice_scores = np.zeros(len(SEVERITIES))
dice_scores_postprocess = np.zeros(len(SEVERITIES))
# Main loop to process each severity level
for i, severity in enumerate(SEVERITIES):
    img_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[0]}'
    mask_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[1]}'
    
    test_loader = get_corruption_dataset_loaders(img_dir, mask_dir, BATCH_SIZE, corr_transforms, NUM_WORKERS, PIN_MEMORY)
    test_loader_corr = get_corruption_dataset_loaders(img_dir, mask_dir, BATCH_SIZE, corr_transforms, NUM_WORKERS, PIN_MEMORY)
    
    accuracy, dice_score = check_accuracy(test_loader, model, device=DEVICE)
    
    #After postprocessing
    accuracy2, dice_score2 = check_accuracy_postprocessing(test_loader_corr, model, device=DEVICE, area_threshold=AREA_TRESHOLD)
    
    # Store the dice score
    dice_scores[i] = dice_score
    dice_scores_postprocess[i] = dice_score2
    
print(dice_scores)
print(dice_scores_postprocess)

# Generating the spider plot after processing all severities
spider_plot(dice_scores, dice_scores_postprocess, graph_savefile=GRAPH_SAVEFILE, corr_name=name)

FP: 14130.0, FN: 10046.0, TP: 27243.0, TN: 7076581.0
FP: 22334.0, FN: 18106.0, TP: 19183.0, TN: 7068377.0
FP: 4986.0, FN: 891.0, TP: 36398.0, TN: 7085725.0
FP: 4970.0, FN: 1073.0, TP: 36216.0, TN: 7085741.0
FP: 5335.0, FN: 1242.0, TP: 36047.0, TN: 7085376.0
[69.26597595 48.68411255 92.52983856 92.29946136 91.63986206]
[64.01673889 34.24013138 91.26007843 94.87046051 91.56062317]


### 4.3.10 c=11 Speckle noise

In [150]:
FOLDER = '../dataset/test-C'
CORRUPTION = 'c11'
SEVERITIES= ['s1','s2','s3','s4','s5']
SUBFOLDERS = ['imgs','masks']
GRAPH_SAVEFILE = f'../results/saved_graphs/{CORRUPTION}'
name = 'Speckle Noise'
# Array to hold dice scores
dice_scores = np.zeros(len(SEVERITIES))
dice_scores_postprocess = np.zeros(len(SEVERITIES))
# Main loop to process each severity level
for i, severity in enumerate(SEVERITIES):
    img_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[0]}'
    mask_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[1]}'
    
    test_loader = get_corruption_dataset_loaders(img_dir, mask_dir, BATCH_SIZE, corr_transforms, NUM_WORKERS, PIN_MEMORY)
    test_loader_corr = get_corruption_dataset_loaders(img_dir, mask_dir, BATCH_SIZE, corr_transforms, NUM_WORKERS, PIN_MEMORY)
    
    accuracy, dice_score = check_accuracy(test_loader, model, device=DEVICE)
    
    #After postprocessing
    accuracy2, dice_score2 = check_accuracy_postprocessing(test_loader_corr, model, device=DEVICE, area_threshold=AREA_TRESHOLD)
    
    # Store the dice score
    dice_scores[i] = dice_score
    dice_scores_postprocess[i] = dice_score2
    
print(dice_scores)
print(dice_scores_postprocess)

# Generating the spider plot after processing all severities
spider_plot(dice_scores, dice_scores_postprocess, graph_savefile=GRAPH_SAVEFILE, corr_name=name)

FP: 3918.0, FN: 22.0, TP: 37267.0, TN: 7086793.0
FP: 4104.0, FN: 23.0, TP: 37266.0, TN: 7086607.0
FP: 20352.0, FN: 22.0, TP: 37267.0, TN: 7070359.0
FP: 50357.0, FN: 25.0, TP: 37264.0, TN: 7040354.0
FP: 119343.0, FN: 30.0, TP: 37259.0, TN: 6971368.0
[94.97922516 94.75330353 78.53289032 59.6653595  38.43293381]
[96.38528442 96.46395874 95.72859192 92.40705109 91.6518631 ]


### 4.3.11 c=12 Gaussian Blur

In [151]:
FOLDER = '../dataset/test-C'
CORRUPTION = 'c12'
SEVERITIES= ['s1','s2','s3','s4','s5']
SUBFOLDERS = ['imgs','masks']
GRAPH_SAVEFILE = f'../results/saved_graphs/{CORRUPTION}'
name = 'Gaussian Blur'
# Array to hold dice scores
dice_scores = np.zeros(len(SEVERITIES))
dice_scores_postprocess = np.zeros(len(SEVERITIES))
# Main loop to process each severity level
for i, severity in enumerate(SEVERITIES):
    img_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[0]}'
    mask_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[1]}'
    
    test_loader = get_corruption_dataset_loaders(img_dir, mask_dir, BATCH_SIZE, corr_transforms, NUM_WORKERS, PIN_MEMORY)
    test_loader_corr = get_corruption_dataset_loaders(img_dir, mask_dir, BATCH_SIZE, corr_transforms, NUM_WORKERS, PIN_MEMORY)
    
    accuracy, dice_score = check_accuracy(test_loader, model, device=DEVICE)
    
    #After postprocessing
    accuracy2, dice_score2 = check_accuracy_postprocessing(test_loader_corr, model, device=DEVICE, area_threshold=AREA_TRESHOLD)
    
    # Store the dice score
    dice_scores[i] = dice_score
    dice_scores_postprocess[i] = dice_score2
    
print(dice_scores)
print(dice_scores_postprocess)

# Generating the spider plot after processing all severities
spider_plot(dice_scores, dice_scores_postprocess, graph_savefile=GRAPH_SAVEFILE, corr_name=name)

FP: 762.0, FN: 1723.0, TP: 35566.0, TN: 7089949.0
FP: 189.0, FN: 36842.0, TP: 447.0, TN: 7090522.0
FP: 102.0, FN: 37191.0, TP: 98.0, TN: 7090609.0
FP: 24.0, FN: 37281.0, TP: 8.0, TN: 7090687.0
FP: 0.0, FN: 37289.0, TP: 0.0, TN: 7090711.0
[9.66244202e+01 2.35728407e+00 5.22820055e-01 4.28713039e-02
 0.00000000e+00]
[96.36540985  0.          0.          0.          0.        ]


### 4.3.12 c=13 Spatter

In [152]:
FOLDER = '../dataset/test-C'
CORRUPTION = 'c13'
SEVERITIES= ['s1','s2','s3','s4','s5']
SUBFOLDERS = ['imgs','masks']
GRAPH_SAVEFILE = f'../results/saved_graphs/{CORRUPTION}'
name = 'Spatter'
# Array to hold dice scores
dice_scores = np.zeros(len(SEVERITIES))
dice_scores_postprocess = np.zeros(len(SEVERITIES))
# Main loop to process each severity level
for i, severity in enumerate(SEVERITIES):
    img_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[0]}'
    mask_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[1]}'
    
    test_loader = get_corruption_dataset_loaders(img_dir, mask_dir, BATCH_SIZE, corr_transforms, NUM_WORKERS, PIN_MEMORY)
    test_loader_corr = get_corruption_dataset_loaders(img_dir, mask_dir, BATCH_SIZE, corr_transforms, NUM_WORKERS, PIN_MEMORY)
    
    accuracy, dice_score = check_accuracy(test_loader, model, device=DEVICE)
    
    #After postprocessing
    accuracy2, dice_score2 = check_accuracy_postprocessing(test_loader_corr, model, device=DEVICE, area_threshold=AREA_TRESHOLD)
    
    # Store the dice score
    dice_scores[i] = dice_score
    dice_scores_postprocess[i] = dice_score2
    
print(dice_scores)
print(dice_scores_postprocess)

# Generating the spider plot after processing all severities
spider_plot(dice_scores, dice_scores_postprocess, graph_savefile=GRAPH_SAVEFILE, corr_name=name)

FP: 3651.0, FN: 27.0, TP: 37262.0, TN: 7087060.0
FP: 3284.0, FN: 59.0, TP: 37230.0, TN: 7087427.0
FP: 3144.0, FN: 76.0, TP: 37213.0, TN: 7087567.0
FP: 221592.0, FN: 18.0, TP: 37271.0, TN: 6869119.0
FP: 901334.0, FN: 12.0, TP: 37277.0, TN: 6189377.0
[95.29679108 95.70324707 95.85297394 25.17018318  7.63951254]
[96.18859863 96.25656891 96.32398224 94.49280548 92.29705811]


### 4.3.13 c=14 Saturate

In [153]:
FOLDER = '../dataset/test-C'
CORRUPTION = 'c14'
SEVERITIES= ['s1','s2','s3','s4','s5']
SUBFOLDERS = ['imgs','masks']
GRAPH_SAVEFILE = f'../results/saved_graphs/{CORRUPTION}'
name = 'Saturate'
# Array to hold dice scores
dice_scores = np.zeros(len(SEVERITIES))
dice_scores_postprocess = np.zeros(len(SEVERITIES))
# Main loop to process each severity level
for i, severity in enumerate(SEVERITIES):
    img_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[0]}'
    mask_dir = f'{FOLDER}/{CORRUPTION}/{severity}/{SUBFOLDERS[1]}'
    
    test_loader = get_corruption_dataset_loaders(img_dir, mask_dir, BATCH_SIZE, corr_transforms, NUM_WORKERS, PIN_MEMORY)
    test_loader_corr = get_corruption_dataset_loaders(img_dir, mask_dir, BATCH_SIZE, corr_transforms, NUM_WORKERS, PIN_MEMORY)
    
    accuracy, dice_score = check_accuracy(test_loader, model, device=DEVICE)
    
    #After postprocessing
    accuracy2, dice_score2 = check_accuracy_postprocessing(test_loader_corr, model, device=DEVICE, area_threshold=AREA_TRESHOLD)
    
    # Store the dice score
    dice_scores[i] = dice_score
    dice_scores_postprocess[i] = dice_score2
    
print(dice_scores)
print(dice_scores_postprocess)

# Generating the spider plot after processing all severities
spider_plot(dice_scores, dice_scores_postprocess, graph_savefile=GRAPH_SAVEFILE, corr_name=name)

FP: 843.0, FN: 1365.0, TP: 35924.0, TN: 7089868.0
FP: 281.0, FN: 36277.0, TP: 1012.0, TN: 7090430.0
FP: 5105.0, FN: 5.0, TP: 37284.0, TN: 7085606.0
FP: 464227.0, FN: 1.0, TP: 37288.0, TN: 6626484.0
FP: 4944052.0, FN: 1.0, TP: 37288.0, TN: 2146659.0
[97.01847076  5.2459693  93.58669281 13.84102535  1.48598361]
[96.84901428  0.         95.96188354 23.48732567  0.75491244]


## 3.5 Filtering Block

### Define Camera Class

In [None]:
class Camera:

	def __init__(self,MTXDIR='../data/calibration_matrixes/mtx.npy', DISTDIR = './data/calibration_matrixes/distortion.npy'):

		# Camera configuration properties
		self.color_resolution = (1920, 1080)
		self.depth_resolution = (1280, 720)
		self.frames_per_second = 30
		self.id = '821312060307'

		# Camera connection properties
		self.conn = None
		self.conf = None
		self.align = None

		# Camera calibration properties (calculated from 'abb_camera_calibration.py'-file)
		self.mtx = np.load(MTXDIR)
		self.dist = np.load(DISTDIR)

		# Chessboard properties
		self.h = 14
		self.b = 9
		self.size = 17.4 # mm

	# Start camera readout
	def start(self):

		# Connect
		self.conn = realsense.pipeline()

		# Config
		self.conf = realsense.config()
		self.conf.enable_device(self.id)
		self.conf.enable_stream(realsense.stream.depth, self.depth_resolution[0], self.depth_resolution[1], realsense.format.z16, self.frames_per_second)
		self.conf.enable_stream(realsense.stream.color, self.color_resolution[0], self.color_resolution[1], realsense.format.bgr8, self.frames_per_second)
		
		# Start streaming
		self.conn.start(self.conf)

		# Align images
		self.align = realsense.align(realsense.stream.color)

	# Stop camera readout
	def close(self):

		# Stop streaming
		self.conn.stop()

	# Read frame
	def read(self):

		# Wait for image
		frames = self.conn.wait_for_frames()

		# Align images
		aligned_frames = self.align.process(frames)

		# Retreive images
		color_frame = aligned_frames.get_color_frame()
		depth_frame = aligned_frames.get_depth_frame()

		# Convert to arrays
		depth = np.asanyarray(depth_frame.get_data())
		color = np.asanyarray(color_frame.get_data())

		return color, depth

	# Get depth of pixel
	def get_pixel_depth(self, image, pixel):
		depth = image[pixel[1], pixel[0]]
		return depth

	# Extrinsic calibration
	def extrinsic_calibration(self, img):

		# termination criteria
		criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 30, 0.001)

		# objectpunten van het schaakbord voorbereiden
		objp = np.zeros((self.b * self.h, 3), numpy.float32)
		objp[:, :2] = np.mgrid[0:self.b, 0:self.h].T.reshape(-1, 2)
		objp = self.size * objp

		# Convert to grayscale
		gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)

		# Get chessboard corners
		ret, corners = cv2.findChessboardCornersSB(gray, (self.b, self.h), cv2.CALIB_CB_MARKER)
	
		# If corners are found
		if ret == True:
			
			# Refine corners
			corners2 = cv2.cornerSubPix(gray, corners, (11, 11), (-1, -1), criteria)

			# Extrinsic calibration
			ret, rvecs, tvecs, _ = cv2.solvePnPRansac(objp, corners2, self.mtx, self.dist)

			# Get extrinsic matrix
			rvecs_matrix = cv2.Rodrigues(rvecs)[0]
			extrinsics = np.hstack((rvecs_matrix, tvecs))
			extrinsics = np.vstack((extrinsics, [0.0, 0.0, 0.0, 1.0]))

			return ret, corners2, rvecs, tvecs, extrinsics

		# If corners not found
		else:
			return None, None, None, None, None


	# Covert 2D to 3D cooridnates
	def intrinsic_trans(self, pixel, z, mtx):
		if (z):
			x = (pixel[0] - mtx[0, 2]) / mtx[0, 0] * z
			y = (pixel[1] - mtx[1, 2]) / mtx[1, 1] * z
			return x, y, z
		else:
			return None, None, None
		
	# Covert 3D to 2D cooridnates
	def intrinsic_trans_inv(self, x, y, z, mtx):
		if (z):
			u = x * mtx[0, 0] * z + mtx[0, 2] 
			v = y * mtx[1, 1] * z + mtx[1, 2]
			return u, v
		else:
			return None, None

#generate camera object
cam = Camera(os.path.join(TRANFMATRIX,'mtx.npy'),os.path.join(TRANFMATRIX,'distortion.npy'))

### 3.5.1 Data Paths and Parameter loading

In [None]:
# Precompute camera matrix and distortion mappings if possible
# This assumes all images have the same dimension, which should be validated beforehand
TESTDIR_IMGS = '../data/images_rgb/'  # Directory containing the test images
SAVEDIR = '../data/calibration_matrixes/'  # Directory to save the calibration matrices
AREA_THRESHOLD = 8000  # Threshold area for processing

# Get the path of a sample image from the test directory
sample_image_path = os.path.join(TESTDIR_IMGS, os.listdir(TESTDIR_IMGS)[0])
# Read the sample image using OpenCV
sample_image = cv2.imread(sample_image_path)
# Get the height and width of the sample image
h, w = sample_image.shape[:2]

# Compute the optimal new camera matrix and region of interest
newcameramtx, roi = cv2.getOptimalNewCameraMatrix(cam.mtx, cam.dist, (w, h), 1, (w, h))
# Initialize the undistort rectify map
mapx, mapy = cv2.initUndistortRectifyMap(cam.mtx, cam.dist, None, newcameramtx, (w, h), 5)

# Define the resizing transformation to be applied post prediction
post_predict_resize = A.Resize(height=1080, width=1920, interpolation=1)


First, given the healthy dataset, I calculate the normal eccentricity and radius features of the pieces and compute the mean and standard deviation.

In [None]:
normalfeat = []  # List to store features of each image

for image_file in os.listdir(TESTDIR_IMGS):
    image_path = os.path.join(TESTDIR_IMGS, image_file)  # Get the full path of the image file
    image = cv2.imread(image_path)  # Read the image using OpenCV
    image = cv2.warpPerspective(image, M, (w, h))  # Apply perspective transformation

    # Convert image to PIL, apply transforms, and predict with the model
    image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))  # Convert BGR image to RGB and then to PIL format
    transformed = test_transforms(image=np.array(image))  # Apply transformations to the image
    image = transformed["image"].unsqueeze(0).to(DEVICE)  # Convert transformed image to tensor and move to device
    
    with torch.no_grad():  # Disable gradient calculation for inference
        prediction = model(image)  # Get model prediction
    prediction = torch.sigmoid(prediction)  # Apply sigmoid activation to prediction
    prediction = (prediction > 0.5).float()  # Binarize prediction

    # Resize prediction and save
    prediction = prediction.squeeze().cpu().numpy()  # Remove batch dimension and move to CPU
    resized_prediction = post_predict_resize(image=prediction)['image']  # Resize prediction using defined transformation
    tensor_prediction = torch.from_numpy(resized_prediction).unsqueeze(0)  # Convert resized prediction back to tensor
    
    pieces_features = get_pieces_features(tensor_prediction)  # Extract features from the prediction
    normalfeat.append(pieces_features)  # Append extracted features to the list
    
result = stats(normalfeat, SAVEDIR)  # Compute and save statistics of features
print(result)  # Print the result


## 3.6 Segmentation2RobotFrame (x,y,z)_RF Block

In this part the final block of the model will be developed, from the post-processed segmented image to obtaining the position (x,y,z) in the robot dataframe. This will be done through two approaches: in the first approach, transformation matrices will be used, and in the second approach, an MLP will be trained to simulate the transformation matrices.

### Define Parameters, Data Paths and Transformation Matrixes

In [None]:
TRANFMATRIX = '../data/calibration_matrixes/'
PRED_TESTDIR = '../data/predictions/'
TESTDIR_IMGS = '../data/images_rgb_CF/'
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"  # Set device to CUDA if available, otherwise use CPU.
print(DEVICE)
CHECKPOINT_PATH = "./models/unet_checkpoint.pth.tar"  # Path to the model checkpoint
IMAGE_HEIGHT = 270  
IMAGE_WIDTH  = 480

#Load T_bc (Transformation matrix from robot base frame to camera frame)
T_bc = np.load(os.path.join(TRANFMATRIX,'T_bc.npy'))
#Load perspective matrix (calculated using the image_rectification_test.py file)
M = np.load(os.path.join(TRANFMATRIX,'perspective_transform.npy'))

# Define the transformations for validation images.
test_transforms = A.Compose(
    [
        A.Resize(height=IMAGE_HEIGHT, width=IMAGE_WIDTH),  # Resize images to defined dimensions.
        A.Normalize(
            mean=[0.0, 0.0, 0.0],  # Normalize images with a mean of 0.
            std=[1.0, 1.0, 1.0],    # Standard deviation for normalization.
            max_pixel_value=255.0,  # Maximum pixel value in input images.
        ),
        ToTensorV2(),  # Convert images to tensor format compatible with PyTorch.
    ],
)

### 3.6.1 First Approach (Using Transformation Matrixes)

#### Camera and Tool Correction C_x, C_y

In [None]:
# Precompute camera matrix and distortion mappings if possible
# This assumes all images have the same dimension, which should be validated beforehand
SAVEDIR = '../data/calibration_matrixes/'
STATSDIR= os.path.join(SAVEDIR, 'feature_stats.json')

#TESTDIR_IMGS = '../data/images-rgb-C/c13/s4/imgs'
TESTDIR_IMGS = '../data/images_rgb_CF' 
TESTDIR_DEPTH = '../data/imgs_depth/'
TESTDIR_LABELSGT = '../data/labels_gt_RF/'

BASE_DATASET = '../data/images-rgb-C'

SIGMA = 1
AREA_TRESHOLD = 9000

In [None]:
# Initialize an empty list to store differences
differences = []
start_time = time.time()  # Record the start time of processing
NON_DETECTED_PIECES = 0  # Initialize the counter for non-detected pieces

print(f'Images Folder: {TESTDIR_IMGS} ')
SIGMA = 3  # Set the sigma value for filtering

# Iterate through all image files in the directory
for image_file in os.listdir(TESTDIR_IMGS):
    
    # Read the image, depth, and ground truth labels
    image = cv2.imread(os.path.join(TESTDIR_IMGS, image_file))
    depth = np.load(os.path.join(TESTDIR_DEPTH, image_file.replace('.jpg', '.npy')))
    labels_gt = read_labels(os.path.join(TESTDIR_LABELSGT, image_file.replace('.jpg', '.txt')))
    
    # Warp the image using a perspective transformation
    image = cv2.warpPerspective(image, M, (w, h))
    
    # Convert the image to PIL format, apply transformations, and predict with the model
    image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
    transformed = test_transforms(image=np.array(image))
    image = transformed["image"].unsqueeze(0).to(DEVICE)
    
    with torch.no_grad():
        prediction = model(image)  # Make a prediction using the model
    prediction = torch.sigmoid(prediction)  # Apply sigmoid activation
    prediction = (prediction > 0.5).float()  # Threshold the prediction

    # Resize the prediction and save
    prediction = prediction.squeeze().cpu().numpy()
    resized_prediction = post_predict_resize(image=prediction)['image']
    tensor_prediction = torch.from_numpy(resized_prediction).unsqueeze(0)
   
    # Postprocess the prediction
    tensor_prediction = postprocess(tensor_prediction, AREA_TRESHOLD)    
    
    # Get features of detected pieces
    pieces_features = get_pieces_features(tensor_prediction)
    
    # Filter objects that are not similar to images
    pieces_features = filter_pieces(STATSDIR, pieces_features, SIGMA)
    save_path = os.path.join(PRED_TESTDIR, f"{image_file.split('.')[0]}.png")
    
    if len(pieces_features) != 0:
        for piece in pieces_features: 
            # Get piece data
            center = piece['centroid']
            radius = piece['radius']
            
            # Transform pixel on warped image back to original image
            new_pixel = np.dot(np.linalg.inv(M), np.array([[center[0]], [center[1]], [1]]))
            center = [int(new_pixel[0][0]/new_pixel[2][0]), int(new_pixel[1][0]/new_pixel[2][0])]
            
            # Get pixel depth 
            pixel_depth = depth[center[1], center[0]]
            
            # Transform 2D to 3D camera coordinates
            xcam, ycam, zcam = cam.intrinsic_trans(center, pixel_depth, cam.mtx)

            # Transform camera coordinates to robot base frame using T_bc
            p_bt = np.dot(T_bc, np.array([[xcam], [ycam], [zcam], [1]]))  # Position of the object in robot base frame

            # Create pick position for robot considering grip height and tool offset
            xyz = np.array([p_bt[0][0], p_bt[1][0], p_bt[2][0]]) 
            xyz[2] = 5
            
            x_pred, y_pred = xyz[0], xyz[1]
            x_gt, y_gt = labels_gt[0], labels_gt[1]
            
            # Save the differences
            differences.append((x_gt - x_pred, y_gt - y_pred))
            actual_error = np.array((x_gt - x_pred, y_gt - y_pred)) 
            r_value = math.sqrt(actual_error[0]**2 + actual_error[1]**2)
            if r_value > 25.0:
                print(image_file)
                save_annotated_image(tensor_prediction, save_path, pieces_features)

    else:
        # Increment the counter for non-detected pieces
        NON_DETECTED_PIECES += 1
        print(image_file)

# Convert differences to numpy array and compute r values
errors_np = np.array(differences)     
r_values = np.sqrt(np.sum(errors_np**2, axis=1))

# Calculate the mean and standard deviation of r values
mean_r = np.mean(r_values)
std_r = np.std(r_values)

# Create a tuple with the mean and standard deviation
result = (mean_r, std_r)
print(f"Mean of errors: {mean_r}")
print(f"Standard deviation of errors: {std_r}")

# Calculate the total number of pieces and detection percentage
TOTAL_PIECES = len(os.listdir(TESTDIR_IMGS))
print(f'Detected percentage: {(1-(NON_DETECTED_PIECES/TOTAL_PIECES)) * 100}')

# Calculate the optimal offsets
C_x, C_y = calculate_corrections(differences)
print(f"Optimal offsets: C_x = {C_x}, C_y = {C_y}")

end_time = time.time()  # Record the end time of processing
total_time = end_time - start_time  # Calculate the total processing time
average_time_per_image = total_time / len(os.listdir(TESTDIR_IMGS))  # Calculate the average processing time per image
print(f"Average time per image: {average_time_per_image:.2f} seconds")  # Print the average time per image

## 4.4 First Approach Evaluation

### Hyperparameters and Data Loading

In [None]:
# Define directory paths for saving calibration matrices and statistics
SAVEDIR = '../data/calibration_matrixes/'  # Directory to save calibration matrices
STATSDIR = os.path.join(SAVEDIR, 'feature_stats.json')  # Path to save feature statistics in JSON format

# Define directories for test data and labels
TESTDIR_DEPTH = '../data/imgs_depth/'  # Directory containing depth images for testing
TESTDIR_LABELSGT = '../data/labels_gt_RF/'  # Directory containing ground truth labels for testing

# Define directories and files for saving results and base dataset
SAVEFILE = '../data'  # Directory to save various data files
BASE_DATASET = '../data/images-rgb-C'  # Base directory containing RGB images of dataset

# Define a threshold value for area
AREA_TRESHOLD = 8000  # Threshold value for area filtering

# Define constants for calibration (camera parameters or similar)
C_x = -5.600124724948292  # Calibration constant for x-coordinate
C_y = -10.20561848357434  # Calibration constant for y-coordinate

# Define file paths for saving results
FILE_PATH_POINTWISE = '../data/results_pointwise.csv'  # Path to save pointwise results in CSV format
FILE_PATH = '../data/results.csv'  # Path to save general results in CSV format
SAVEFOLDER = '../results/'  # Directory to save generated graphs

# Define a mapping between corruption types and their respective function names
corruption_function_map = {
    'c0': 'gaussian_noise',           # Corruption type 'c0' maps to Gaussian noise
    'c1': 'shot_noise',               # Corruption type 'c1' maps to Shot noise
    'c2': 'impulse_noise',            # Corruption type 'c2' maps to Impulse noise
    'c4': 'glass_blur',               # Corruption type 'c4' maps to Glass blur
    'c5': 'motion_blur',              # Corruption type 'c5' maps to Motion blur
    'c6': 'fog',                      # Corruption type 'c6' maps to Fog
    'c7': 'brightness',               # Corruption type 'c7' maps to Brightness adjustment
    'c8': 'contrast',                 # Corruption type 'c8' maps to Contrast adjustment
    'c9': 'elastic_transform',        # Corruption type 'c9' maps to Elastic transform
    'c10': 'speckle_noise',           # Corruption type 'c10' maps to Speckle noise
    'c11': 'gaussian_blur',           # Corruption type 'c11' maps to Gaussian blur
    'c12': 'spatter',                 # Corruption type 'c12' maps to Spatter
    'c13': 'saturate'                 # Corruption type 'c13' maps to Saturate
}

# Define colors associated with each corruption type for visualization purposes
colors = {
    'gaussian_noise': 'orange',       # Color for Gaussian noise is orange
    'shot_noise': 'blue',             # Color for Shot noise is blue
    'impulse_noise': 'green',         # Color for Impulse noise is green
    'glass_blur': 'purple',           # Color for Glass blur is purple
    'motion_blur': 'brown',           # Color for Motion blur is brown
    'fog': 'pink',                    # Color for Fog is pink
    'brightness': 'gray',             # Color for Brightness adjustment is gray
    'contrast': 'cyan',               # Color for Contrast adjustment is cyan
    'elastic_transform': 'magenta',   # Color for Elastic transform is magenta
    'speckle_noise': 'yellow',        # Color for Speckle noise is yellow
    'gaussian_blur': 'black',         # Color for Gaussian blur is black
    'spatter': 'lime',                # Color for Spatter is lime
    'saturate': 'navy'                # Color for Saturate is navy
}

### Precompute matrix for a faster Inference

In [None]:
# Precompute camera matrix and distortion mappings if possible

# Get the path of a sample image from the test directory
sample_image_path = os.path.join(TESTDIR_IMGS, os.listdir(TESTDIR_IMGS)[0])

# Read the sample image using OpenCV
sample_image = cv2.imread(sample_image_path)

# Get the height and width of the sample image
h, w = sample_image.shape[:2]

# Compute the optimal new camera matrix given the original camera matrix, 
# distortion coefficients, and image size. The free scaling parameter is set to 1.
newcameramtx, roi = cv2.getOptimalNewCameraMatrix(cam.mtx, cam.dist, (w, h), 1, (w, h))

# Compute the undistortion and rectification transformation map
# `mapx` and `mapy` are the x and y coordinates of the undistorted image
mapx, mapy = cv2.initUndistortRectifyMap(cam.mtx, cam.dist, None, newcameramtx, (w, h), 5)

# Define a resizing transformation to resize the image to 1920x1080 using linear interpolation
post_predict_resize = A.Resize(height=1080, width=1920, interpolation=1)

### Get data into CSV

In [None]:
# Initialize lists to store results
differences = []
results = []
pointwise_result = []

# Sweep through sigma values from 1 to 10 with a step of 2
for sigma in range(1, 11, 2):
    print(f'Sigma: {sigma}')
    # Iterate through different types of corruption in the dataset
    for corruption in os.listdir(BASE_DATASET):
        corruption_path = os.path.join(BASE_DATASET, corruption)
        # Iterate through severity levels of each corruption type
        for severity in os.listdir(corruption_path):
            images_path = os.path.join(corruption_path, severity, 'imgs')
            NON_DETECTED = 0 
            TOTAL_PIECES = len(os.listdir(images_path))
            differences = []
            start_time = time.time()
            # Process each image in the current path
            for image_file in os.listdir(images_path):
                image = cv2.imread(os.path.join(images_path, image_file))
                depth = np.load(os.path.join(TESTDIR_DEPTH, image_file.replace('.jpg', '.npy')))
                labels_gt = read_labels(os.path.join(TESTDIR_LABELSGT, image_file.replace('.jpg', '.txt')))
                
                # Apply perspective warp to the image
                image = cv2.warpPerspective(image, M, (w, h))
                
                # Convert image to PIL format, apply transformations, and predict with the model
                image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
                transformed = test_transforms(image=np.array(image))
                image = transformed["image"].unsqueeze(0).to(DEVICE)
                
                with torch.no_grad():
                    prediction = model(image)
                prediction = torch.sigmoid(prediction)
                prediction = (prediction > 0.5).float()

                # Resize the prediction and convert to tensor
                prediction = prediction.squeeze().cpu().numpy()
                resized_prediction = post_predict_resize(image=prediction)['image']
                tensor_prediction = torch.from_numpy(resized_prediction).unsqueeze(0)
            
                # Post-process the prediction
                tensor_prediction = postprocess(tensor_prediction, AREA_TRESHOLD)    
                
                # Get features of detected pieces
                pieces_features = get_pieces_features(tensor_prediction)
                
                # Filter objects not similar to images based on sigma
                pieces_features = filter_pieces(STATSDIR, pieces_features, sigma)
                save_path = os.path.join(PRED_TESTDIR, f"{image_file.split('.')[0]}.png")
                
                if len(pieces_features) != 0:
                    for piece in pieces_features: 
                        # Get data for each detected piece
                        center = piece['centroid']
                        radius = piece['radius']
                        # Transform pixel coordinates back to the original image
                        new_pixel = np.dot(np.linalg.inv(M), np.array([[center[0]], [center[1]], [1]]))
                        center = [int(new_pixel[0][0] / new_pixel[2][0]), int(new_pixel[1][0] / new_pixel[2][0])]
                        
                        # Get depth of the pixel
                        pixel_depth = depth[center[1], center[0]]
                        
                        # Transform 2D pixel to 3D camera coordinates
                        xcam, ycam, zcam = cam.intrinsic_trans(center, pixel_depth, cam.mtx)

                        # Transform camera coordinates to robot base frame
                        p_bt = np.dot(T_bc, np.array([[xcam], [ycam], [zcam], [1]]))  # Position of the object in robot base frame

                        # Create pick position for robot
                        xyz = np.array([p_bt[0][0], p_bt[1][0], p_bt[2][0]]) 
                        xyz[2] = 5
                        
                        # Predicted and ground truth coordinates
                        x_pred, y_pred = xyz[0] + C_x, xyz[1] + C_y
                        x_gt, y_gt = labels_gt[0], labels_gt[1]
                        
                        # Save the differences
                        differences.append((x_gt - x_pred, y_gt - y_pred))
                        r_value_pointwise = math.sqrt((x_gt - x_pred) ** 2 + (y_gt - y_pred) ** 2)
                        c = corruption_function_map[corruption]
                        s = severity
                        pointwise_result.append([sigma, c, s, r_value_pointwise])
                        
                else:
                    NON_DETECTED += 1

            # Calculate and store the errors if there are any
            errors_np = np.array(differences)
            if len(errors_np) != 0:     
                r_values = np.sqrt(np.sum(errors_np ** 2, axis=1))

                # Calculate mean and standard deviation of errors
                mean_r = np.mean(r_values)
                std_r = np.std(r_values)

                # Create a tuple with mean and standard deviation
                result = (mean_r, std_r)
                percentage = (1 - (NON_DETECTED / TOTAL_PIECES)) * 100
                c = corruption_function_map[corruption]
                s = severity
                results.append([sigma, c, s, result[0], result[1], percentage])
            else:
                s = severity
                c = corruption_function_map[corruption]
                percentage = (1 - (NON_DETECTED / TOTAL_PIECES)) * 100
                results.append([sigma, c, s, None, None, percentage])

# Save pointwise results to a CSV file
df_pointwise = pd.DataFrame(pointwise_result, columns=['sigma', 'corruption_name', 'severity', 'error'])
df_pointwise.to_csv(os.path.join(SAVEFILE, 'results_pointwise.csv'), index=False)

# Create a pandas DataFrame with the results
df = pd.DataFrame(results, columns=['sigma', 'corruption_name', 'severity', 'mean_error', 'std_error', '%detected'])

# Save the DataFrame to a CSV file
df.to_csv(os.path.join(SAVEFILE, 'results.csv'), index=False)

### CSV to plot

In [None]:
# Load the CSV file
data = pd.read_csv(FILE_PATH)

# Load another CSV file for pointwise data
data_pointwise = pd.read_csv(FILE_PATH_POINTWISE)

# Separate the data into blocks based on unique values of sigma
blocks = {sigma: data[data['sigma'] == sigma] for sigma in data['sigma'].unique()}
# Separate the pointwise data into blocks based on unique values of sigma
blocks_pointwise = {sigma: data_pointwise[data_pointwise['sigma'] == sigma] for sigma in data_pointwise['sigma'].unique()}

# Define a list of sigma values to iterate over
sigma_list = list(range(1, 10, 2))

# Loop through each block
for i in range(len(blocks)):
    sigma = sigma_list[i]

    # Create a summary to check the number of unique corruption names and the number of unique severities within each corruption name for each block
    block_example = blocks[sigma]  # Change the sigma value as needed

    # Create improved x-axis labels without the extra 's' prefix
    # Ensure all combinations are represented
    corruption_names = list(corruption_function_map.values())
    severity_levels = [f's{i}' for i in range(1, 6)]

    # Calculate values for the histogram of the percentage of detected pieces
    data_for_histogram = []

    for corruption_name in corruption_names:
        for severity in severity_levels:
            subset = block_example[(block_example['corruption_name'] == corruption_name) & (block_example['severity'] == severity)]
            if not subset.empty:
                detected_value = subset['%detected'].values[0]
                data_for_histogram.append(detected_value)
            else:
                data_for_histogram.append(0)

    # Generate colors for each bar in the histogram
    bar_colors = []
    for corruption_name in corruption_names:
        bar_colors.extend([colors[corruption_name]] * len(severity_levels))

    # Create the histogram plot
    plt.figure(figsize=(20, 6))

    # Y-axis values (% detected)
    y_values_detected = np.array(data_for_histogram)

    # Generate the histogram with colors and adjust margins
    bar_width = 0.8  # Adjust the width of the bars to leave space between blocks
    positions = np.arange(len(y_values_detected)) * 1.1  # Adjust positions to leave space between blocks

    plt.bar(positions, y_values_detected, color=bar_colors, width=bar_width)

    # Adjust x-axis labels
    severity_labels = []
    for corruption_name in corruption_names:
        for severity in severity_levels:
            severity_labels.append(f'{severity}')

    plt.xticks(ticks=positions, labels=severity_labels, rotation=90)

    # Set y-axis range
    plt.ylim(0, 100)

    # Set x-axis range to occupy the whole graph
    plt.xlim(-0.5, positions[-1] + 0.5)

    # Add legend to identify colors
    legend_labels = [f'{corruption_function_map[c]} = {c}' for c in corruption_function_map]
    handles = [plt.Line2D([0], [0], color=colors[corruption_function_map[c]], lw=4) for c in corruption_function_map]

    plt.legend(handles, legend_labels, bbox_to_anchor=(1.05, 1), loc='upper left', borderaxespad=0.)

    # Set labels and title for the plot
    plt.ylabel('Detected Percentage')
    plt.title(f'Detected Percentage vs Severity and Corruption Name for Sigma = {sigma}')
    plt.grid(True, axis='y')
    plt.tight_layout()
    # Save the plot instead of displaying it
    plt.savefig(f'{SAVEFOLDER}histogram_sigma_{sigma}.png')
    plt.close()

# Loop through each pointwise block
for i in range(len(blocks_pointwise)):   
    sigma = sigma_list[i]

    # Create a summary to check the number of unique corruption names and the number of unique severities within each corruption name for each block
    block_example = blocks_pointwise[sigma]  # Change the sigma value as needed

    # Calculate values for the boxplot using percentiles
    data_for_boxplot = []
    corruption_names = list(corruption_function_map.values())
    severity_levels = [f's{i}' for i in range(1, 6)]

    for corruption_name in corruption_names:
        for severity in severity_levels:
            subset = block_example[(block_example['corruption_name'] == corruption_name) & (block_example['severity'] == severity)]
            if not subset.empty:
                percentiles = np.percentile(subset['error'], [25, 50, 75])
                min_val = subset['error'].min()
                max_val = subset['error'].max()
                data_for_boxplot.append([min_val, percentiles[0], percentiles[1], percentiles[2], max_val])
            else:
                data_for_boxplot.append([np.nan, np.nan, np.nan, np.nan, np.nan])

    # Generate colors for each box in the boxplot
    box_colors = []
    for corruption_name in corruption_names:
        box_colors.extend([colors[corruption_name]] * len(severity_levels))

    # Create the boxplot
    plt.figure(figsize=(20, 6))
    box = plt.boxplot(data_for_boxplot, vert=True, patch_artist=True, whis=[0, 100])

    # Apply colors to each box in the boxplot
    for patch, color in zip(box['boxes'], box_colors):
        patch.set_facecolor(color)

    # Adjust x-axis labels
    severity_labels = []
    for corruption_name in corruption_names:
        for severity in severity_levels:
            severity_labels.append(f'{severity}')

    plt.xticks(ticks=np.arange(1, len(severity_labels) + 1), labels=severity_labels, rotation=90)

    # Add legend to identify colors
    legend_labels = [f'{corruption_function_map[c]} = {c}' for c in corruption_function_map]
    handles = [plt.Rectangle((0, 0), 1, 1, color=colors[corruption_function_map[c]]) for c in corruption_function_map]

    plt.legend(handles, legend_labels, bbox_to_anchor=(1.05, 1), loc='upper left', borderaxespad=0.)

    # Set y-axis label and title for the plot
    plt.ylabel('Piece picking uncertainty (mm)')
    plt.title(f'Boxplot of Picking uncertainty (mm) vs Severity of Corruption Name for Sigma = {sigma}')
    plt.grid(True)
    plt.tight_layout()

    # Set y-axis limit
    plt.ylim(0, 10)
    plt.savefig(f'{SAVEFOLDER}boxplot_sigma_{sigma}.png')
    plt.close()

### 3.6.2 Second Approach (Training an MLP)

#### Data Pths and Hyperparameters 

In [None]:
# Precompute camera matrix and distortion mappings if possible
# This assumes all images have the same dimension, which should be validated beforehand
SAVEDIR = '../data/calibration_matrixes/'
STATSDIR= os.path.join(SAVEDIR, 'feature_stats.json')

TESTDIR_IMGS = '../data/images_rgb'
TESTDIR_DEPTH = '../data/imgs_depth/'
TESTDIR_LABELSGT = '../data/labels_gt_RF/'

BASE_DATASET = '../data/images-rgb-C'

SIGMA = 5
AREA_TRESHOLD = 9000
NON_DETECTED_PIECES=0

x_wc_list = []  
y_gt_list = []

#### Get GT labels, Warped Camera Frame (Input) and Robot Frame (Output)

In [None]:
for image_file in os.listdir(TESTDIR_IMGS):
    
    image = cv2.imread(os.path.join(TESTDIR_IMGS, image_file))

    depth = np.load(os.path.join(TESTDIR_DEPTH,image_file.replace('.jpg', '.npy')))
    labels_gt = read_labels(os.path.join(TESTDIR_LABELSGT,image_file.replace('.jpg', '.txt')))
    
    #image = cv2.remap(image, mapx, mapy, cv2.INTER_LINEAR)
    image = cv2.warpPerspective(image, M, (w, h))
    
    # Convert image to PIL, apply transforms, and predict with the model
    image= Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
    transformed = test_transforms(image=np.array(image))
    image = transformed["image"].unsqueeze(0).to(DEVICE)
    
    with torch.no_grad():
        prediction = model(image)
    prediction = torch.sigmoid(prediction)
    prediction = (prediction > 0.5).float()

    # Resize prediction and save
    prediction = prediction.squeeze().cpu().numpy()
    resized_prediction = post_predict_resize(image=prediction)['image']
    tensor_prediction = torch.from_numpy(resized_prediction).unsqueeze(0)
   
    #Postprocessing
    tensor_prediction = postprocess(tensor_prediction, AREA_TRESHOLD)    
    
    #Get Pieces features
    pieces_features = get_pieces_features(tensor_prediction)
    
    #Filter objects not similar to images
    pieces_features = filter_pieces(STATSDIR , pieces_features, SIGMA)
    save_path = os.path.join(PRED_TESTDIR, f"{image_file.split('.')[0]}.png")
    #save_annotated_image(tensor_prediction, save_path, pieces_features)
    
    if len(pieces_features)!=0:
        for piece in pieces_features: 
            #Get Data
            center = piece['centroid']
            radius = piece['radius']
            # Transform pixel on warped image back to original image
            new_pixel = np.dot(np.linalg.inv(M), np.array([[center[0]], [center[1]], [1]]))
            center = [int(new_pixel[0][0]/new_pixel[2][0]), int(new_pixel[1][0]/new_pixel[2][0])]
            
            ## Get pixel depth 
            pixel_depth = depth[center[1], center[0]]
            
            x_wc = center[0]
            y_wc = center[1]
            z_wc = pixel_depth 

            x_wc_list.append((x_wc,y_wc,z_wc))

            x_gt, y_gt, z_gt = labels_gt[0], labels_gt[1] , labels_gt[2]
            
            y_gt_list.append((x_gt,y_gt,z_gt))


    else:
        print(f'Nothing detected')  
        NON_DETECTED_PIECES +=1


In [None]:
# Convert the lists to numpy arrays
X = np.array(x_wc_list)
y = np.array(y_gt_list)

# Normalize the data
scaler_X = StandardScaler()  # Initialize a StandardScaler for X
scaler_y = StandardScaler()  # Initialize a StandardScaler for y

X_scaled = scaler_X.fit_transform(X)  # Fit the scaler to X and transform X
y_scaled = scaler_y.fit_transform(y)  # Fit the scaler to y and transform y

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y_scaled, test_size=0.2, random_state=42)

# Convert the data to tensors
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)  # Convert the training data for X to a float32 tensor
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)    # Convert the testing data for X to a float32 tensor
y_train_tensor = torch.tensor(y_train, dtype=torch.float32)  # Convert the training data for y to a float32 tensor
y_test_tensor = torch.tensor(y_test, dtype=torch.float32) 

#### MLP Hyperparameter Sweep

In [None]:
# Hyperparameters to explore
NINPUT = 3  # Number of input features
NOUTPUT = 3  # Number of output features
NHIDDEN = [5, 10, 15, 20, 25]  # List of hidden layer sizes to explore
ACTIVATION = [nn.ReLU(), nn.Sigmoid(), nn.Tanh()]  # List of activation functions to explore
LEARNING_RATE = [0.01, 0.001, 0.0001, 0.00001]  # List of learning rates to explore
NUM_EPOCHS = 1000  # Number of epochs for training

# Function to train and evaluate the model
def train_and_evaluate(model, criterion, optimizer, num_epochs, X_train, y_train, X_test, y_test, scaler_y):
    for epoch in range(num_epochs):
        model.train()  # Set the model to training mode
        optimizer.zero_grad()  # Clear the gradients
        outputs = model(X_train)  # Forward pass
        loss = criterion(outputs, y_train)  # Compute the loss
        loss.backward()  # Backward pass
        optimizer.step()  # Update the weights
        
    model.eval()  # Set the model to evaluation mode
    with torch.no_grad():
        predictions = model(X_test)  # Make predictions
        # Convert predictions and labels back to their original scale
        predictions_unscaled = scaler_y.inverse_transform(predictions.numpy())
        y_test_unscaled = scaler_y.inverse_transform(y_test.numpy())
        # Calculate Euclidean distances
        distances = np.sqrt(np.sum((y_test_unscaled - predictions_unscaled)**2, axis=1))
        average_distance = np.mean(distances)  # Calculate the average distance
    return average_distance

# Hyperparameter sweep
best_distance = float('inf')  # Initialize the best distance with infinity
best_params = {}  # Dictionary to store the best hyperparameters

for hidden in NHIDDEN:
    for activation in ACTIVATION:
        for lr in LEARNING_RATE:
            model = nn.Sequential(
                nn.Linear(NINPUT, hidden),  # Input to first hidden layer
                activation,  # Activation function
                nn.Linear(hidden, hidden),  # Hidden to hidden layer
                activation,  # Activation function
                nn.Linear(hidden, NOUTPUT)  # Hidden to output layer
            )
            criterion = nn.MSELoss()  # Mean Squared Error loss
            optimizer = optim.Adam(model.parameters(), lr=lr)  # Adam optimizer with current learning rate
            
            # Train and evaluate the model
            avg_distance = train_and_evaluate(model, criterion, optimizer, NUM_EPOCHS, 
                                              X_train_tensor, y_train_tensor, 
                                              X_test_tensor, y_test_tensor, scaler_y)
            
            # Print current hyperparameters and average distance
            print(f'Hidden: {hidden}, Activation: {activation}, Learning Rate: {lr}, Average Distance: {avg_distance:.4f}')
            
            model.eval()  # Set the model to evaluation mode
            with torch.no_grad():
                predictions = model(X_test_tensor)  # Make predictions on the test set
                loss = criterion(predictions, y_test_tensor)  # Compute the test loss
                print(f'Test Loss: {loss.item():.4f}')
    
                # Convert predictions and labels back to their original scale
                predictions_unscaled = scaler_y.inverse_transform(predictions.numpy())
                y_test_unscaled = scaler_y.inverse_transform(y_test)
    
                # Calculate Mean Squared Error (MSE) on the original scale
                mse = mean_squared_error(y_test_unscaled, predictions_unscaled)
                print(f'Test MSE (original scale): {mse:.4f}')
    
                # Calculate Euclidean distances
                distances = np.sqrt(np.sum((y_test_unscaled - predictions_unscaled)**2, axis=1))
                average_distance = np.mean(distances)  # Calculate the average distance
            
            # Print current hyperparameters and average distance
            print(f'Hidden: {hidden}, Activation: {activation}, Learning Rate: {lr}, Average Distance: {avg_distance:.4f}')
            
            # Update the best parameters if the current average distance is lower
            if avg_distance < best_distance:
                best_distance = avg_distance
                best_params = {
                    'hidden': hidden,
                    'activation': activation,
                    'learning_rate': lr
                }

# Print the best hyperparameters and the best average distance
print(f'Best Parameters: {best_params}')
print(f'Best Average Distance: {best_distance:.4f}')

#### Optimal Hyperparameter Model Training and Evaluation in the Validation Set

In [None]:
# Define the number of input features
NINPUT = 3

# Define the number of output features
NOUTPUT = 3

# Define the number of neurons in the hidden layers
NHIDDEN = 25

# Set the activation function to Tanh
ACTIVATION = nn.Tanh()

# Define the learning rate for the optimizer
LEARNING_RATE = 0.01

# Set the number of epochs for training
NUM_EPOCHS = 1000


# Define the model with the best hyperparameters
model = nn.Sequential(
    nn.Linear(NINPUT, NHIDDEN),  # Input layer to first hidden layer
    ACTIVATION,                 # Activation function
    nn.Linear(NHIDDEN, NHIDDEN), # First hidden layer to second hidden layer
    ACTIVATION,                 # Activation function
    nn.Linear(NHIDDEN, NOUTPUT) # Second hidden layer to output layer
)

# Set the loss function to Mean Squared Error
criterion = nn.MSELoss()

# Set the optimizer to Adam with the defined learning rate
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

# Train the model
for epoch in range(NUM_EPOCHS):
    model.train()  # Set the model to training mode
    optimizer.zero_grad()  # Zero the gradients
    outputs = model(X_train_tensor)  # Forward pass: compute predicted outputs by passing inputs to the model
    loss = criterion(outputs, y_train_tensor)  # Compute the loss
    loss.backward()  # Backward pass: compute gradient of the loss with respect to model parameters
    optimizer.step()  # Update model parameters
    
    if (epoch+1) % 100 == 0:  # Print the loss every 100 epochs
        print(f'Epoch [{epoch+1}/{NUM_EPOCHS}], Loss: {loss.item():.4f}')
        
# Evaluate the model
model.eval()  # Set the model to evaluation mode
with torch.no_grad():  # Disable gradient calculation
    predictions = model(X_test_tensor)  # Compute predicted outputs for the test set
    # Convert the predictions and the labels back to their original scale
    predictions_unscaled = scaler_y.inverse_transform(predictions.numpy())
    y_test_unscaled = scaler_y.inverse_transform(y_test_tensor.numpy())
    
    # Calculate the Euclidean distances between the true and predicted values
    distances = np.sqrt(np.sum((y_test_unscaled - predictions_unscaled)**2, axis=1))
    average_distance = np.mean(distances)  # Compute the average distance

print(f'Average Euclidean Distance: {average_distance:.4f}')

# Save the model checkpoint
torch.save(model.state_dict(), '../models/mlp_checkpoint.pth.tar')

## 4.5 Evaluation of the Second Approach

#### Model Initialization with pretrained weights

In [None]:
# Define the number of input features
NINPUT = 3

# Define the number of output features
NOUTPUT = 3

# Define the number of neurons in the hidden layers
NHIDDEN = 25

# Set the activation function to Tanh
ACTIVATION = nn.Tanh()

model_mlp = nn.Sequential(
    nn.Linear(NINPUT, NHIDDEN),
    ACTIVATION,
    nn.Linear(NHIDDEN, NHIDDEN),
    ACTIVATION,
    nn.Linear(NHIDDEN, NOUTPUT)
)
model_mlp.load_state_dict(torch.load('mlp_checkpoint.pth.tar'))

# Model initialization with specified input and output channels
model = UNET(in_channels=3, out_channels=1).to(DEVICE)
load_checkpoint(torch.load(CHECKPOINT_PATH ), model)
model.eval()  # Set the model to evaluation mode.

#### Used For Model Inference

In [None]:
for image_file in os.listdir(TESTDIR_IMGS):
    
    image = cv2.imread(os.path.join(TESTDIR_IMGS, image_file))

    depth = np.load(os.path.join(TESTDIR_DEPTH,image_file.replace('.jpg', '.npy')))
    labels_gt = read_labels(os.path.join(TESTDIR_LABELSGT,image_file.replace('.jpg', '.txt')))
    
    #image = cv2.remap(image, mapx, mapy, cv2.INTER_LINEAR)
    image = cv2.warpPerspective(image, M, (w, h))
    
    # Convert image to PIL, apply transforms, and predict with the model
    image= Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
    transformed = test_transforms(image=np.array(image))
    image = transformed["image"].unsqueeze(0).to(DEVICE)
    
    with torch.no_grad():
        prediction = model(image)
    prediction = torch.sigmoid(prediction)
    prediction = (prediction > 0.5).float()

    # Resize prediction and save
    prediction = prediction.squeeze().cpu().numpy()
    resized_prediction = post_predict_resize(image=prediction)['image']
    tensor_prediction = torch.from_numpy(resized_prediction).unsqueeze(0)
   
    #Postprocessing
    tensor_prediction = postprocess(tensor_prediction, AREA_TRESHOLD)    
    
    #Get Pieces features
    pieces_features = get_pieces_features(tensor_prediction)
    
    #Filter objects not similar to images
    pieces_features = filter_pieces(STATSDIR , pieces_features, SIGMA)
    save_path = os.path.join(PRED_TESTDIR, f"{image_file.split('.')[0]}.png")
    #save_annotated_image(tensor_prediction, save_path, pieces_features)
    
    if len(pieces_features)!=0:
        for piece in pieces_features: 
            #Get Data
            center = piece['centroid']
            radius = piece['radius']
            # Transform pixel on warped image back to original image
            new_pixel = np.dot(np.linalg.inv(M), np.array([[center[0]], [center[1]], [1]]))
            center = [int(new_pixel[0][0]/new_pixel[2][0]), int(new_pixel[1][0]/new_pixel[2][0])]
            
            ## Get pixel depth 
            pixel_depth = depth[center[1], center[0]]
            
            x_wc = center[0]
            y_wc = center[1]
            z_wc = pixel_depth 
            # Preparar entrada para el modelo MLP
            input_mlp_raw = np.array([[x_wc, y_wc, z_wc]])  # Convertir a formato numpy array y agregar una dimensión
            input_mlp_scaled = scaler_X.transform(input_mlp_raw)  # Aplicar el escalado
            input_mlp = torch.tensor(input_mlp_scaled, dtype=torch.float32)  # Convertir a tensor
            with torch.no_grad():
                output_mlp = model_mlp(input_mlp)
            output_mlp_unscaled = scaler_y.inverse_transform(output_mlp.numpy().reshape(1, -1)).squeeze()
            x_pred, y_pred, z_pred = output_mlp_unscaled
            x_gt, y_gt, z_gt = labels_gt[0], labels_gt[1] , labels_gt[2]
            
            print(f'Image: {image_file}')
            print(f'Warped Camera Frame: ({x_pred},{y_pred},{z_pred})')
            print(f'Robot Frame: ({x_gt},{y_gt},{z_gt})')


    else:
        print(f'Nothing detected')  
        NON_DETECTED_PIECES +=1

#### Hyperparameter Definition

In [None]:
# Paths to directories and files
SAVEDIR = '../data/calibration_matrixes/'
STATSDIR = os.path.join(SAVEDIR, 'feature_stats.json')

TESTDIR_DEPTH = '../data/imgs_depth/'
TESTDIR_LABELSGT = '../data/labels_gt/'
SAVEFILE = '../data'
BASE_DATASET = '../data/images-rgb-C'
AREA_TRESHOLD = 8000

C_x = -5.600124724948292
C_y = -10.20561848357434
SIGMA = 9 

FILE_PATH_POINTWISE = '../data/results_pointwise_approach2.csv'
FILE_PATH = '../data/results_approach2.csv'
SAVEFOLDER = '../results/boxplot_histogram_model_mlp/'

SCALER_X_PATH = '../data/calibration_matrixes/scaler_X.pkl'
SCALER_Y_PATH= '../data/calibration_matrixes/scaler_Y.pkl'

scaler_X = joblib.load(SCALER_X_PATH)
scaler_Y = joblib.load(SCALER_Y_PATH)

# Mapping of corruption types
corruption_function_map = {
    'c0': 'gaussian_noise',
    'c1': 'shot_noise',
    'c2': 'impulse_noise',
    'c4': 'glass_blur',
    'c5': 'motion_blur',
    'c6': 'fog',
    'c7': 'brightness',
    'c8': 'contrast',
    'c9': 'elastic_transform',
    'c10': 'speckle_noise',
    'c11': 'gaussian_blur',
    'c12': 'spatter',
    'c13': 'saturate'
}

# Colors associated with each corruption type for visualization
colors = {
    'gaussian_noise': 'orange',
    'shot_noise': 'blue',
    'impulse_noise': 'green',
    'glass_blur': 'purple',
    'motion_blur': 'brown',
    'fog': 'pink',
    'brightness': 'gray',
    'contrast': 'cyan',
    'elastic_transform': 'magenta',
    'speckle_noise': 'yellow',
    'gaussian_blur': 'black',
    'spatter': 'lime',
    'saturate': 'navy'
}

# Functions and processing logic would be implemented below.


#### Prediction CSV

In [None]:
# Initialize lists to store results
differences = []
results = []
pointwise_result = []

# Iterate over a range of sigma values
for sigma in range(1, 11, 2):
    print(f'Sigma: {sigma}')
    # Iterate over each corruption type in the dataset
    for corruption in os.listdir(BASE_DATASET):
        corruption_path = os.path.join(BASE_DATASET, corruption)
        # Iterate over each severity level of the corruption
        for severity in os.listdir(corruption_path):
            images_path = os.path.join(corruption_path, severity, 'imgs')
            NON_DETECTED = 0
            TOTAL_PIECES = len(os.listdir(images_path))
            differences = []
            start_time = time.time()
            
            # Iterate over each image in the specified path
            for image_file in os.listdir(images_path):
                image = cv2.imread(os.path.join(images_path, image_file))
                depth = np.load(os.path.join(TESTDIR_DEPTH, image_file.replace('.jpg', '.npy')))
                labels_gt = read_labels(os.path.join(TESTDIR_LABELSGT, image_file.replace('.jpg', '.txt')))
                
                # Apply perspective transformation to the image
                image = cv2.warpPerspective(image, M, (w, h))
                
                # Convert image to PIL format, apply transformations, and predict with the model
                image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
                transformed = test_transforms(image=np.array(image))
                image = transformed["image"].unsqueeze(0).to(DEVICE)
                
                with torch.no_grad():
                    prediction = model(image)
                prediction = torch.sigmoid(prediction)
                prediction = (prediction > 0.5).float()

                # Resize prediction and save
                prediction = prediction.squeeze().cpu().numpy()
                resized_prediction = post_predict_resize(image=prediction)['image']
                tensor_prediction = torch.from_numpy(resized_prediction).unsqueeze(0)
            
                # Postprocessing
                tensor_prediction = postprocess(tensor_prediction, AREA_TRESHOLD)    
                
                # Get pieces features
                pieces_features = get_pieces_features(tensor_prediction)
                
                # Filter objects not similar to images
                pieces_features = filter_pieces(STATSDIR, pieces_features, sigma)
                save_path = os.path.join(PRED_TESTDIR, f"{image_file.split('.')[0]}.png")
                
                if len(pieces_features) != 0:
                    for piece in pieces_features: 
                        # Get data
                        center = piece['centroid']
                        radius = piece['radius']
                        # Transform pixel on warped image back to original image
                        new_pixel = np.dot(np.linalg.inv(M), np.array([[center[0]], [center[1]], [1]]))
                        center = [int(new_pixel[0][0]/new_pixel[2][0]), int(new_pixel[1][0]/new_pixel[2][0])]
                        
                        # Get pixel depth 
                        pixel_depth = depth[center[1], center[0]]
            
                        x_wc = center[0]
                        y_wc = center[1]
                        z_wc = pixel_depth 
                        
                        # Prepare input for the MLP model
                        input_mlp_raw = np.array([[x_wc, y_wc, z_wc]])
                        input_mlp_scaled = scaler_X.transform(input_mlp_raw)
                        input_mlp = torch.tensor(input_mlp_scaled, dtype=torch.float32)
                        
                        with torch.no_grad():
                            output_mlp = model_mlp(input_mlp)
                        output_mlp_unscaled = scaler_y.inverse_transform(output_mlp.numpy().reshape(1, -1)).squeeze()
                        x_pred, y_pred, z_pred = output_mlp_unscaled

                        x_gt, y_gt = labels_gt[0], labels_gt[1]
                        
                        # Save the differences
                        differences.append((x_gt - x_pred, y_gt - y_pred))
                        r_value_pointwise = math.sqrt((x_gt - x_pred)**2 + (y_gt - y_pred)**2)
                        c = corruption_function_map[corruption]
                        s = severity
                        pointwise_result.append([sigma, c, s, r_value_pointwise])
                        
                else:
                    NON_DETECTED += 1
            
            errors_np = np.array(differences)
            if len(errors_np) != 0:     
                r_values = np.sqrt(np.sum(errors_np**2, axis=1))

                # Calculate mean and standard deviation of errors
                mean_r = np.mean(r_values)
                std_r = np.std(r_values)

                percentage = (1 - (NON_DETECTED / TOTAL_PIECES)) * 100
                c = corruption_function_map[corruption]
                s = severity
                
                # Save results
                results.append([sigma, c, s, mean_r, std_r, percentage])
            else:
                s = severity
                c = corruption_function_map[corruption]
                percentage = (1 - (NON_DETECTED / TOTAL_PIECES)) * 100
                results.append([sigma, c, s, None, None, percentage])

# Create a DataFrame for pointwise results and save it to a CSV file
df_pointwise = pd.DataFrame(pointwise_result, columns=['sigma', 'corruption_name', 'severity', 'error'])
df_pointwise.to_csv(os.path.join(SAVEFILE, 'results_pointwise_approach2.csv'), index=False)

# Create a DataFrame for overall results and save it to a CSV file
df = pd.DataFrame(results, columns=['sigma', 'corruption_name', 'severity', 'mean_error', 'std_error', '%detected'])
df.to_csv(os.path.join(SAVEFILE, 'results_approach2.csv'), index=False)

#### CSV 2 Plot

In [None]:
# Load the CSV file
data = pd.read_csv(FILE_PATH)
data_pointwise = pd.read_csv(FILE_PATH_POINTWISE)

# Separate the data into blocks based on unique values of sigma
blocks = {sigma: data[data['sigma'] == sigma] for sigma in data['sigma'].unique()}
# Separate the pointwise data into blocks based on unique values of sigma
blocks_pointwise = {sigma: data_pointwise[data_pointwise['sigma'] == sigma] for sigma in data_pointwise['sigma'].unique()}

# Define a list of sigma values to iterate over
sigma_list = list(range(1, 10, 2))
for i in range(len(blocks)):
    sigma = sigma_list[i]

    # Create a summary to check the number of unique corruption names and the number of unique severities within each corruption name for each block
    block_example = blocks[sigma]

    # Create enhanced x-axis labels without the extra 's' prefix
    corruption_names = list(corruption_function_map.values())
    severity_levels = [f's{i}' for i in range(1, 6)]

    # Calculate values for the histogram of percentage detected
    data_for_histogram = []
    for corruption_name in corruption_names:
        for severity in severity_levels:
            subset = block_example[(block_example['corruption_name'] == corruption_name) & (block_example['severity'] == severity)]
            if not subset.empty:
                detected_value = subset['%detected'].values[0]
                data_for_histogram.append(detected_value)
            else:
                data_for_histogram.append(0)

    # Generate colors for each bar in the histogram
    bar_colors = []
    for corruption_name in corruption_names:
        bar_colors.extend([colors[corruption_name]] * len(severity_levels))

    # Create the histogram plot
    plt.figure(figsize=(20, 6))
    y_values_detected = np.array(data_for_histogram)

    # Generate the histogram with colors and adjust margins
    bar_width = 0.8  # Adjust bar width to leave space between blocks
    positions = np.arange(len(y_values_detected)) * 1.1  # Adjust positions to leave space between blocks

    plt.bar(positions, y_values_detected, color=bar_colors, width=bar_width)

    # Adjust x-axis labels
    severity_labels = []
    for corruption_name in corruption_names:
        for severity in severity_levels:
            severity_labels.append(f'{severity}')

    plt.xticks(ticks=positions, labels=severity_labels, rotation=90)

    # Adjust y-axis range
    plt.ylim(0, 100)

    # Adjust x-axis range to occupy the entire plot
    plt.xlim(-0.5, positions[-1] + 0.5)

    # Add legend to identify colors
    legend_labels = [f'{corruption_function_map[c]} = {c}' for c in corruption_function_map]
    handles = [plt.Line2D([0], [0], color=colors[corruption_function_map[c]], lw=4) for c in corruption_function_map]

    plt.legend(handles, legend_labels, bbox_to_anchor=(1.05, 1), loc='upper left', borderaxespad=0.)

    # Add y-axis label and plot title
    plt.ylabel('Detected Percentage')
    plt.title(f'Detected Percentage vs Severity and Corruption Name for Sigma = {sigma}')
    plt.grid(True, axis='y')
    plt.tight_layout()
    # Save the plot instead of displaying it
    plt.savefig(f'{SAVEFOLDER}histogram_sigma_{sigma}_approach2.png')
    plt.close()

# Repeat the process for pointwise data
for i in range(len(blocks_pointwise)):
    sigma = sigma_list[i]

    # Create a summary to check the number of unique corruption names and the number of unique severities within each corruption name for each block
    block_example = blocks_pointwise[sigma]

    # Calculate values for the boxplot using percentiles
    data_for_boxplot = []
    corruption_names = list(corruption_function_map.values())
    severity_levels = [f's{i}' for i in range(1, 6)]

    for corruption_name in corruption_names:
        for severity in severity_levels:
            subset = block_example[(block_example['corruption_name'] == corruption_name) & (block_example['severity'] == severity)]
            if not subset.empty:
                percentiles = np.percentile(subset['error'], [25, 50, 75])
                min_val = subset['error'].min()
                max_val = subset['error'].max()
                data_for_boxplot.append([min_val, percentiles[0], percentiles[1], percentiles[2], max_val])
            else:
                data_for_boxplot.append([np.nan, np.nan, np.nan, np.nan, np.nan])

    # Generate colors for each box in the boxplot
    box_colors = []
    for corruption_name in corruption_names:
        box_colors.extend([colors[corruption_name]] * len(severity_levels))

    # Create the boxplot
    plt.figure(figsize=(20, 6))
    box = plt.boxplot(data_for_boxplot, vert=True, patch_artist=True, whis=[0, 100])

    # Apply colors to each box in the boxplot
    for patch, color in zip(box['boxes'], box_colors):
        patch.set_facecolor(color)

    # Adjust x-axis labels
    severity_labels = []
    for corruption_name in corruption_names:
        for severity in severity_levels:
            severity_labels.append(f'{severity}')

    plt.xticks(ticks=np.arange(1, len(severity_labels) + 1), labels=severity_labels, rotation=90)

    # Add legend to identify colors
    legend_labels = [f'{corruption_function_map[c]} = {c}' for c in corruption_function_map]
    handles = [plt.Rectangle((0, 0), 1, 1, color=colors[corruption_function_map[c]]) for c in corruption_function_map]

    plt.legend(handles, legend_labels, bbox_to_anchor=(1.05, 1), loc='upper left', borderaxespad=0.)

    plt.ylabel('Piece picking uncertainty (mm)')
    plt.title(f'Boxplot of Picking uncertainty (mm) vs Severity of Corruption Name for Sigma = {sigma}')
    plt.grid(True)
    plt.tight_layout()

    # Adjust y-axis limit
    plt.ylim(0, 10)
    plt.savefig(f'{SAVEFOLDER}boxplot_sigma_{sigma}_approach2.png')
    plt.close()