In [None]:
import os
import matplotlib.pyplot as plt
from PIL import Image
import numpy as np
import random

import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from torchvision.transforms import Compose, ToTensor, Normalize, RandomHorizontalFlip, Resize
from torchvision.transforms import functional as TF

import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision.models as models
from torch.nn.functional import interpolate

from tqdm import tqdm

from sklearn.metrics import jaccard_score

In [None]:
def explore_images(base_path):
    # Define the categories of the folders containing the images and their labels
    categories = ['train', 'train_labels', 'val', 'val_labels', 'test', 'test_labels']
    image_counts = {}
    
    # Iterate over each category to count the image files
    for category in categories:
        # Build the full path to the category folder
        path = os.path.join(base_path, category)
        # Count and store the number of files in each category
        image_counts[category] = len([name for name in os.listdir(path) if os.path.isfile(os.path.join(path, name))])
    
    # Print a summary of the image count for each folder
    print("Image count per folder:", image_counts)
    
    # Categories containing images for display
    sample_images = ['train', 'val', 'test']
    for category in sample_images:
        # Get the list of image files and labels
        images_list = os.listdir(os.path.join(base_path, category))
        labels_list = os.listdir(os.path.join(base_path, f"{category}_labels"))
        
        # Check if the image and label lists are not empty
        if images_list and labels_list:
            # Select the first image and label from the lists for display
            image_path = os.path.join(base_path, category, images_list[0])
            label_path = os.path.join(base_path, f"{category}_labels", labels_list[0])
            
            # Open the image and label with Pillow
            image = Image.open(image_path)
            label = Image.open(label_path)
            
            # Set up the display environment with Matplotlib
            fig, ax = plt.subplots(1, 2, figsize=(10, 5))
            ax[0].imshow(image)
            ax[0].set_title(f'Satellite Image ({category})')
            ax[0].axis('off')
            
            ax[1].imshow(label, cmap='gray')
            ax[1].set_title(f'Building Mask ({category})')
            ax[1].axis('off')
            
            # Display the images and labels
            plt.show()

# Get the current directory and use it to call the function
current_directory = os.getcwd()
explore_images(os.path.join(current_directory, 'MBDS/png'))


In [None]:
class SatelliteBuildingDataset(Dataset):
    def __init__(self, image_dir, mask_dir, transform_image=None, transform_mask=None):
        """
        Initializes the SatelliteBuildingDataset class with specific directories for images and masks,
        and with optional transformations that can be applied to both images and masks.

        Args:
            image_dir (string): Directory containing all images.
            mask_dir (string): Directory containing all masks.
            transform_image (callable, optional): Optional transformation function to apply to images.
            transform_mask (callable, optional): Optional transformation function to apply to masks.
        """
        # Assign to instance variables the values of the parameters
        self.image_dir = image_dir
        self.mask_dir = mask_dir
        self.transform_image = transform_image
        self.transform_mask = transform_mask
        # List all files in the image directory. It is assumed that each image has its corresponding mask.
        self.images = os.listdir(image_dir)

    def __len__(self):
        # Returns the total number of images in the dataset
        return len(self.images)

    def __getitem__(self, idx):
        # Build the full paths for the image and mask using the provided index
        img_name = os.path.join(self.image_dir, self.images[idx])
        mask_name = os.path.join(self.mask_dir, self.images[idx])
        
        # Open the image and convert it to RGB color
        image = Image.open(img_name).convert("RGB")
        # Open the mask and convert it to grayscale so that it has a single channel
        mask = Image.open(mask_name).convert("L")  

        # Apply the specified transformations to the image and mask, if provided
        if self.transform_image:
            image = self.transform_image(image)
        if self.transform_mask:
            mask = self.transform_mask(mask)

        # Return the processed image and mask
        return image, mask

In [None]:
def create_dataloaders(base_path, resize, batch_size=4):
    """
    Creates and returns dataloaders for training and validation datasets.
    
    Args:
        base_path (str): Base path where the data folders are located.
        resize (tuple): Dimensions to which all images and masks will be resized.
        batch_size (int, optional): Number of samples per batch. The default value is 4.
    """
    
    # Define transformations to apply to the images:
    # - Resize the images to the specified size.
    # - Convert the images to PyTorch tensors.
    # - Normalize the images using specified means and standard deviations.
    transform_image = Compose([
        Resize(resize),
        ToTensor(),
        Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    
    # Define transformations for the masks, which include resizing and conversion to tensor.
    # Masks generally do not require normalization.
    transform_mask = Compose([
        Resize(resize),
        ToTensor()
    ])

    # Build full paths to the training and validation image and mask directories
    train_dir = os.path.join(base_path, 'train')
    train_mask_dir = os.path.join(base_path, 'train_labels')
    val_dir = os.path.join(base_path, 'val')
    val_mask_dir = os.path.join(base_path, 'val_labels')
    
    # Create the training and validation datasets using the SatelliteBuildingDataset class
    train_dataset = SatelliteBuildingDataset(train_dir, train_mask_dir, transform_image, transform_mask)
    val_dataset = SatelliteBuildingDataset(val_dir, val_mask_dir, transform_image, transform_mask)

    # Create dataloaders for the training and validation datasets.
    # The training dataloader shuffles the data (shuffle=True) to improve training.
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

    # Return the training and validation dataloaders
    return train_loader, val_loader


In [None]:
# Assuming you are running this from the main directory of your project
base_path = os.getcwd()
base_path = os.path.join(base_path, 'MBDS/png')

In [None]:
# We create the model, as well as the loss function. 
# The model is of the U-Net type, with 4 encoder downsampling blocks and 4 decoder upsampling blocks.

class DoubleConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        """
        Double convolution block that applies two convolution layers with a batch normalization 
        and ReLU activation between them.

        Args:
            in_channels (int): Number of input channels.
            out_channels (int): Number of output channels.
        """
        super(DoubleConv, self).__init__()
        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
        )

    def forward(self, x):
        return self.double_conv(x)
    
    
class DownBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        """
        DownBlock in U-Net that includes a DoubleConv followed by max pooling 
        to reduce spatial dimension.

        Args:
            in_channels (int): Number of input channels.
            out_channels (int): Number of output channels after DoubleConv.
        """
        super(DownBlock, self).__init__()
        self.double_conv = DoubleConv(in_channels, out_channels)
        self.down_sample = nn.MaxPool2d(2)

    def forward(self, x):
        skip_out = self.double_conv(x)
        down_out = self.down_sample(skip_out)
        return (down_out, skip_out)

    
class UpBlock(nn.Module):
    def __init__(self, in_channels, out_channels, up_sample_mode):
        """
        UpBlock in U-Net that performs upsampling and combines features
        from earlier layers through concatenation operation.

        Args:
            in_channels (int): Total number of input channels (sum of output channels from previous block and skip connection).
            out_channels (int): Number of output channels.
            up_sample_mode (str): Upsampling mode, 'conv_transpose' for ConvTranspose2d or 'bilinear' for bilinear interpolation.
        """
        super(UpBlock, self).__init__()
        if up_sample_mode == 'conv_transpose':
            self.up_sample = nn.ConvTranspose2d(in_channels - out_channels, in_channels - out_channels, kernel_size=2, stride=2)        
        elif up_sample_mode == 'bilinear':
            self.up_sample = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
        else:
            raise ValueError("Unsupported `up_sample_mode` (can take one of `conv_transpose` or `bilinear`)")
        self.double_conv = DoubleConv(in_channels, out_channels)

    def forward(self, down_input, skip_input):
        x = self.up_sample(down_input)
        x = torch.cat([x, skip_input], dim=1)
        return self.double_conv(x)

    
class UNet(nn.Module):
    def __init__(self, out_classes=1, up_sample_mode='conv_transpose'):
        """
        U-Net architecture for segmentation that includes descending and ascending paths with a bottleneck.

        Args:
            out_classes (int): Number of output classes (usually 1 for binary segmentation masks).
            up_sample_mode (str): Upsampling mode used in the ascent blocks.
        """
        super(UNet, self).__init__()
        self.up_sample_mode = up_sample_mode
        self.down_conv1 = DownBlock(3, 64)
        self.down_conv2 = DownBlock(64, 128)
        self.down_conv3 = DownBlock(128, 256)
        self.down_conv4 = DownBlock(256, 512)
        self.double_conv = DoubleConv(512, 1024)
        self.up_conv4 = UpBlock(1024 + 512, 512, self.up_sample_mode)
        self.up_conv3 = UpBlock(512 + 256, 256, self.up_sample_mode)
        self.up_conv2 = UpBlock(256 + 128, 128, self.up_sample_mode)
        self.up_conv1 = UpBlock(128 + 64, 64, self.up_sample_mode)
        self.conv_last = nn.Conv2d(64, out_classes, kernel_size=1)

    def forward(self, x):
        x, skip1_out = self.down_conv1(x)
        x, skip2_out = self.down_conv2(x)
        x, skip3_out = self.down_conv3(x)
        x, skip4_out = self.down_conv4(x)
        x = self.double_conv(x)
        x = self.up_conv4(x, skip4_out)
        x = self.up_conv3(x, skip3_out)
        x = self.up_conv2(x, skip2_out)
        x = self.up_conv1(x, skip1_out)
        x = self.conv_last(x)
        return x


class DiceLoss(nn.Module):
    def __init__(self, smooth=1.):
        """
        Implementation of the Dice Loss, useful for comparing the similarity between two samples.

        Args:
            smooth (float): Value to prevent division by zero and smooth the result.
        """
        super(DiceLoss, self).__init__()
        self.smooth = smooth

    def forward(self, inputs, targets):
        inputs = torch.sigmoid(inputs)  # Convert logits to probabilities
        inputs = inputs.view(-1)
        targets = targets.view(-1)

        intersection = (inputs * targets).sum()                            
        dice = (2. * intersection + self.smooth) / (inputs.sum() + targets.sum() + self.smooth)  
        
        return 1 - dice


In [None]:
# Load the model
model = UNet()
# Check if CUDA is available and move the model to the appropriate device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Loss function and optimizer
criterion = DiceLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

In [None]:
# Test to ensure everything is functioning correctly
# Create an input tensor and move it to the same device
input_tensor = torch.rand((1, 3, 256, 256)).to(device)

# Perform the prediction
output_mask = model(input_tensor)
print(output_mask.shape)  # The output shape should be [1, 1, 256, 256], corresponding to the predicted mask

In [None]:
# We add the IoU as a metric to see how the model progresses and performs. 
# The training metric is Dice, but it's good to have more than one metric if possible.

def calculate_iou(outputs, masks):
    """
    Calcula el promedio de la métrica Intersección sobre Unión (IoU) entre las salidas del modelo y las máscaras verdaderas.

    Args:
        outputs: Salidas del modelo (logits).
        masks: Máscaras verdaderas (ground truth).

    Returns:
        Un valor flotante que representa el IoU promedio.
    """
    outputs = torch.sigmoid(outputs) > 0.5  # Convertir logits a predicciones binarias
    outputs = outputs.cpu().numpy().astype(np.uint8)  # Convertir a NumPy para cálculo
    masks = masks.cpu().numpy().astype(np.uint8)
    
    # Calcular IoU para cada par de salida y máscara
    ious = [jaccard_score(m.flatten(), o.flatten()) for m, o in zip(masks, outputs)]
    return np.mean(ious)

def weight_reset(m):
    """
    Reinicia los pesos de las capas convolucionales y lineales a sus valores iniciales.

    Args:
        m: Módulo de PyTorch (capa).
    """
    if isinstance(m, nn.Conv2d) or isinstance(m, nn.Linear):
        m.reset_parameters()

In [None]:
# Training Loop

# Input sizes to test
sizes = [(224, 224), ]
# Number of epochs for training
num_epochs = 2
# Get the base path of the project
base_path = os.getcwd()
# Path to images
base_path = os.path.join(base_path, 'MBDS/png')
# Initialize the best loss as infinity
best_loss = float('inf')

for resize in sizes:
    print(f"Training model with image size: {resize}")
    # Create dataloaders for training and validation
    train_loader, val_loader = create_dataloaders(base_path, resize, batch_size=4)

    # Initialize lists to store losses and IoU scores
    train_losses = []
    val_losses = []
    iou_scores = []

    for epoch in range(num_epochs):
        model.train()  # Set the model to training mode
        running_loss = 0.0
        train_tqdm = tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs} Training', leave=True)
        for images, masks in train_tqdm:
            images, masks = images.to(device), masks.to(device)
            
            optimizer.zero_grad()  # Reset optimizer gradients
            outputs = model(images)  # Pass images through the model
            loss = criterion(outputs, masks)  # Calculate loss
            loss.backward()  # Backpropagate the error
            optimizer.step()  # Update weights
            
            running_loss += loss.item() * images.size(0)  # Accumulate scaled loss by batch size
            train_tqdm.set_postfix(loss=(running_loss / (train_tqdm.last_print_n + 1)))  # Update tqdm info
        
        epoch_loss = running_loss / len(train_loader.dataset)  # Calculate average loss for the epoch
        train_losses.append(epoch_loss)

        # Validation process to evaluate the model
        model.eval()  # Set the model to evaluation mode
        val_running_loss = 0.0
        ious = []
        val_tqdm = tqdm(val_loader, desc=f'Epoch {epoch+1}/{num_epochs} Validation', leave=True)
        with torch.no_grad():  # Disable gradient computation for validation
            for images, masks in val_tqdm:
                images, masks = images.to(device), masks.to(device)
                outputs = model(images)
                loss = criterion(outputs, masks)
                val_running_loss += loss.item() * images.size(0)
                
                iou = calculate_iou(outputs, masks)  # Calculate IoU
                ious.append(iou)
        
        val_epoch_loss = val_running_loss / len(val_loader.dataset)  # Average validation loss
        average_iou = np.mean(ious)  # Average IoU
        val_losses.append(val_epoch_loss)
        iou_scores.append(average_iou)
        
        # Print training and validation statistics
        print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {train_losses[-1]:.4f}, Val Loss: {val_losses[-1]:.4f}, Average IoU: {iou_scores[-1]:.4f}')

        # Save the model if the validation loss is the lowest so far
        if val_epoch_loss < best_loss:
            model_save_path = os.path.join('saved_models', f'model_{resize[0]}x{resize[1]}.pth')
            os.makedirs('saved_models', exist_ok=True)
            torch.save(model.state_dict(), model_save_path)
            print(f"Model saved to {model_save_path}")
            best_loss = val_epoch_loss

    # Visualize graphs of losses and IoU
    plt.figure(figsize=(12, 5))
    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label='Train Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.title(f'Losses over Epochs for Resize: {resize}')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(iou_scores, label='IoU Score')
    plt.title(f'IoU Scores over Epochs for Resize: {resize}')
    plt.xlabel('Epochs')
    plt.ylabel('IoU Score')
    plt.legend()
    plt.show()

    model.apply(weight_reset)  # Reset model weights for the next size configuration


In [None]:
# Finally, we evaluate the model on the test set. 
# Besides the metrics, I think it's useful to see some examples where the original image, the true mask, and the predicted mask are displayed.

def create_test_dataloader(base_path, batch_size, resize):
    """
    Creates a DataLoader for the test dataset.

    Args:
        base_path (str): Base path where images and masks are stored.
        batch_size (int): Batch size.
        resize (tuple): Dimensions to which images and masks will be resized.

    Returns:
        DataLoader: DataLoader object configured for the test dataset.
    """
    # Transformations applied to the images
    transform_image = Compose([
        Resize(resize),
        ToTensor(),
        Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

    # Transformations applied to the masks (only resizing and tensor conversion)
    transform_mask = Compose([
        Resize(resize),
        ToTensor()
    ])
    
    # Create the dataset using the specified directories
    test_dir = os.path.join(base_path, 'test')
    test_mask_dir = os.path.join(base_path, 'test_labels')
    test_dataset = SatelliteBuildingDataset(test_dir, test_mask_dir, transform_image, transform_mask)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    return test_loader

def plot_examples(data_loader, model, device, num_examples=5):
    """
    Displays examples of images, their true masks, and model predictions.

    Args:
        data_loader (DataLoader): DataLoader containing the test dataset.
        model (nn.Module): Neural network model to generate predictions.
        device (torch.device): Device where the model is running (CPU or GPU).
        num_examples (int): Number of examples to visualize.
    """
    fig, axs = plt.subplots(num_examples, 3, figsize=(15, num_examples * 5))
    model.eval()

    for idx, (images, masks) in enumerate(data_loader):
        if idx >= num_examples:
            break
        images, masks = images.to(device), masks.to(device)
        with torch.no_grad():
            outputs = model(images)
            preds = torch.sigmoid(outputs) > 0.5  # Convert logits to binary predictions

        # Visualize the original image, the true mask, and the predicted mask
        axs[idx, 0].imshow(images[0].cpu().permute(1, 2, 0))
        axs[idx, 0].set_title('Original Image')
        axs[idx, 0].axis('off')

        axs[idx, 1].imshow(masks[0].cpu().squeeze(), cmap='gray')
        axs[idx, 1].set_title('True Mask')
        axs[idx, 1].axis('off')

        axs[idx, 2].imshow(preds[0].cpu().squeeze(), cmap='gray')
        axs[idx, 2].set_title('Predicted Mask')
        axs[idx, 2].axis('off')

    plt.show()


In [None]:
batch_size = 1

for resize in sizes:
    model_path = f'saved_models/model_{resize[0]}x{resize[1]}.pth'
    test_loader = create_test_dataloader(base_path, batch_size, resize)
    
    model = UNet()  # Assume the model has been previously defined
    model.load_state_dict(torch.load(model_path))
    model.to(device)
    model.eval()
    
    iou_scores = []  # List to store IoU scores

    for images, true_masks in test_loader:
        images = images.to(device)
        true_masks = true_masks.to(device)

        with torch.no_grad():
            outputs = model(images)
            preds = torch.sigmoid(outputs) > 0.5  # Threshold to obtain binary masks

        preds_np = preds.cpu().numpy().astype(np.uint8)
        true_masks_np = true_masks.cpu().numpy().astype(np.uint8)

        # Calculate and store the IoU score for each image in the batch
        for pred, true_mask in zip(preds_np, true_masks_np):
            iou_score = jaccard_score(true_mask.flatten(), pred.flatten())
            iou_scores.append(iou_score)

    mean_iou = np.mean(iou_scores)  # Calculate the mean IoU for the current size
    print(f'Mean IoU for size {resize}: {mean_iou:.4f}')
    plot_examples(test_loader, model, device, num_examples=5)  # Visualize examples of predictions

In [None]:
# Proposal 2: Crop the images into smaller pieces, \
# thus training with more images (these are actually the same images, 
# but we take small pieces from them, which in practice means we have more material to train with).

### The process is the same as before, although most functions need to be adjusted to work with 
# the new image parameters.

class RandomCropTransform:
    def __init__(self, output_size):
        """
        Initializes the random cropping transformation with a specified output size.

        Args:
            output_size (int): Side length of the output square for cropping.
        """
        self.output_size = output_size

    def __call__(self, image, mask):
        assert image.size[0] >= self.output_size and image.size[1] >= self.output_size
        
        white_threshold = 0.1  # Threshold for the maximum allowed white pixels in the crop
        max_attempts = 100  # Limit the number of attempts to find a suitable crop

        for _ in range(max_attempts):
            i = random.randint(0, image.size[0] - self.output_size)
            j = random.randint(0, image.size[1] - self.output_size)

            cropped_image = TF.crop(image, i, j, self.output_size, self.output_size)
            cropped_mask = TF.crop(mask, i, j, self.output_size, self.output_size)

            image_array = np.array(cropped_image)
            num_white_pixels = np.sum(np.all(image_array == [255, 255, 255], axis=-1))
            total_pixels = self.output_size * self.output_size
            if num_white_pixels / total_pixels < white_threshold:
                break
                
        # Apply additional random transformations
        if random.random() > 0.5:
            cropped_image, cropped_mask = TF.hflip(cropped_image), TF.hflip(cropped_mask)
        if random.random() > 0.5:
            cropped_image, cropped_mask = TF.vflip(cropped_image), TF.vflip(cropped_mask)
        if random.random() > 0.5:
            angle = 90
            cropped_image, cropped_mask = TF.rotate(cropped_image, angle), TF.rotate(cropped_mask, angle)

        return cropped_image, cropped_mask


class SatelliteDataset_onehot(Dataset):
    def __init__(self, image_dir, mask_dir, output_size, transform=None, resize=True):
        """
        Initializes the satellite dataset with capabilities to transform images and masks.

        Args:
            image_dir (str): Directory with images.
            mask_dir (str): Directory with masks.
            output_size (int): Side length of the output square for cropping.
            transform (callable, optional): Transformations to apply to the masks.
            resize (bool): Whether to apply cropping and resizing.
        """
        self.image_dir = image_dir
        self.mask_dir = mask_dir
        self.images = [img for img in os.listdir(image_dir) if img.endswith('.png')]
        self.transform = transform
        self.crop_transform = RandomCropTransform(output_size)
        self.resize = resize
        self.output_size = output_size
        self.resize_transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ])

    def __len__(self):
        return len(self.images)

    def __getitem__(self, index):
        img_path = os.path.join(self.image_dir, self.images[index])
        mask_path = os.path.join(self.mask_dir, self.images[index])
        image = Image.open(img_path).convert("RGB")
        mask = Image.open(mask_path).convert("L")
        
        if self.resize:
            image, mask = self.crop_transform(image, mask)
            image = self.resize_transform(image)
            mask = self.transform(mask)
        else:
            image = self.transform(image)
            mask = self.transform(mask)
        
        return image, mask

In [None]:
class ToOneHot:
    def __init__(self, n_classes=2):
        """
        Initializes a transformer that converts masks to one-hot representation.

        Args:
            n_classes (int): Number of distinct classes in the mask, including the background.
        """
        self.n_classes = n_classes

    def __call__(self, mask):
        """
        Applies the transformation to a mask.

        Args:
            mask (PIL Image or ndarray): Input mask.

        Returns:
            Tensor: Mask in one-hot format as a PyTorch tensor.
        """
        # Convert the mask to a numpy array and adjust the values to 0 or 1
        mask_array = np.array(mask)
        mask_array = np.where(mask_array == 255, 1, 0)  # Convert 255 to 1, keeping 0 as is
        # Convert the adjusted mask to a tensor
        mask_tensor = torch.as_tensor(mask_array, dtype=torch.int64)
        # Apply one_hot
        one_hot = torch.nn.functional.one_hot(mask_tensor, num_classes=self.n_classes).permute(2, 0, 1).float()
        return one_hot

def create_dataloaders_onehot(base_path, resize, batch_size=4):
    """
    Creates dataloaders for training and validation with specific transformations, including one-hot for the masks.

    Args:
        base_path (str): Base path where image and mask directories are located.
        resize (int): Size to which images and masks will be resized.
        batch_size (int): Number of elements in each data batch.

    Returns:
        tuple: Two DataLoaders, one for training and another for validation.
    """
    # Transformations for images
    transform_image = transforms.Compose([
        transforms.Resize(1344),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    
    transform_mask_train = transforms.Compose([
        transforms.Resize(resize),
        ToOneHot(),
        # transforms.ToTensor()
    ])

    transform_mask_val = transforms.Compose([
        transforms.Resize(1344),
        ToOneHot(),
        # transforms.ToTensor()
    ])
    
    # Data directories
    train_dir = os.path.join(base_path, 'train')
    train_mask_dir = os.path.join(base_path, 'train_labels')
    val_dir = os.path.join(base_path, 'val')
    val_mask_dir = os.path.join(base_path, 'val_labels')
    
    # Create datasets and dataloaders
    train_dataset = SatelliteDataset_onehot(train_dir, train_mask_dir, output_size=resize, transform=transform_mask_train, resize=True)
    val_dataset = SatelliteBuildingDataset(val_dir, val_mask_dir, transform_image, transform_mask_val)
    
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

    return train_loader, val_loader

In [None]:
def show_images(images, masks, num_images=4):
    """
    Displays a set of images and masks.

    Args:
        images (Tensor): Tensor of images with dimensions [batch_size, channels, height, width].
        masks (Tensor): Tensor of masks with dimensions [batch_size, channels, height, width], 
                        where each channel represents a different class.
        num_images (int): Number of images and masks to show.
    """
    fig, axs = plt.subplots(num_images, 2, figsize=(10, 5 * num_images))
    for i in range(num_images):
        # Reorder the image channels for visualization
        img = images[i].permute(1, 2, 0)
        img = img.numpy()
        # Normalize the image to enhance visualization
        img = (img - img.min()) / (img.max() - img.min())
        
        # Select the channel corresponding to the class of interest, e.g., buildings
        mask = masks[i][0]  # Assuming the second channel represents buildings
        mask = mask.numpy()

        # Print dimensions to verify correctness
        print("Image shape:", images[i].shape, "Mask shape:", masks[i].shape)
        
        axs[i, 0].imshow(img)
        axs[i, 1].imshow(mask, cmap='gray')
        axs[i, 0].set_title('Satellite Image')
        axs[i, 1].set_title('Label Mask (Buildings)')
        axs[i, 0].axis('off')
        axs[i, 1].axis('off')
    plt.show()

# Assuming `train_loader` is already defined and loaded properly
images, masks = next(iter(train_loader))  # Load a batch of images and masks from the DataLoader
show_images(images, masks)  # Call the function to display the images


In [None]:
class UNet(nn.Module):
    def __init__(self, out_classes=2, up_sample_mode='conv_transpose'):
        """
        Initializes the U-Net with a descending and ascending path, including a final block.

        Args:
            out_classes (int): Number of output classes for segmentation.
            up_sample_mode (str): Method for upsampling; either 'conv_transpose' or 'bilinear'.
        """
        super(UNet, self).__init__()
        self.up_sample_mode = up_sample_mode
        # Initialize the descending blocks
        self.down_conv1 = DownBlock(3, 64)
        self.down_conv2 = DownBlock(64, 128)
        self.down_conv3 = DownBlock(128, 256)
        self.down_conv4 = DownBlock(256, 512)
        # Bottleneck
        self.double_conv = DoubleConv(512, 1024)
        # Initialize the ascending blocks
        self.up_conv4 = UpBlock(512 + 1024, 512, self.up_sample_mode)
        self.up_conv3 = UpBlock(256 + 512, 256, self.up_sample_mode)
        self.up_conv2 = UpBlock(128 + 256, 128, self.up_sample_mode)
        self.up_conv1 = UpBlock(128 + 64, 64, self.up_sample_mode)
        # Final convolutional layer
        self.conv_last = nn.Conv2d(64, out_classes, kernel_size=1)
        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        """
        Defines the data flow through the network, using the defined blocks.

        Args:
            x (Tensor): Input image.

        Returns:
            Tensor: Network outputs with class probabilities.
        """
        x, skip1_out = self.down_conv1(x)
        x, skip2_out = self.down_conv2(x)
        x, skip3_out = self.down_conv3(x)
        x, skip4_out = self.down_conv4(x)
        x = self.double_conv(x)
        x = self.up_conv4(x, skip4_out)
        x = self.up_conv3(x, skip3_out)
        x = self.up_conv2(x, skip2_out)
        x = self.up_conv1(x, skip1_out)
        x = self.conv_last(x)
        x = self.softmax(x)  # Apply softmax to the last layer
        return x


In [None]:
def split_image_into_segments(image_tensor, output_size):
    """
    Splits a large image into smaller segments.

    Args:
        image_tensor (Tensor): Input image.
        output_size (int): Size of each square segment.

    Returns:
        list: List of image segments.
        list: List of coordinates corresponding to each segment.
    """
    _, h, w = image_tensor.shape
    segments = []
    coords = []
    for i in range(0, h, output_size):
        for j in range(0, w, output_size):
            right = min(j + output_size, w)
            lower = min(i + output_size, h)
            segment = image_tensor[:, i:lower, j:right]
            segments.append(segment)
            coords.append((j, i))
    return segments, coords

def predict_and_recombine_segments(segments, coords, model, output_size, image_shape, device):
    """
    Predicts and recombines image segments to form the complete mask.

    Args:
        segments (list): Image segments.
        coords (list): Coordinates of each segment in the original image.
        model (UNet): U-Net model.
        output_size (int): Size of each segment.
        image_shape (tuple): Original dimensions of the image.
        device (torch.device): Device on which the model is running.

    Returns:
        Tensor: Complete reconstructed mask.
    """
    full_mask = torch.zeros((2, image_shape[1], image_shape[2]), device=device)
    for segment, (x, y) in zip(segments, coords):
        segment = segment.unsqueeze(0).to(device)
        with torch.no_grad():
            output = model(segment)
        full_mask[:, y:y+output_size, x:x+output_size] = output.squeeze(0)
    return full_mask

def calculate_iou(outputs, masks):
    """
    Calculates the Intersection over Union (IoU) by interpolating the outputs if necessary to match the size of the masks.

    Args:
        outputs (Tensor): Model output masks.
        masks (Tensor): True masks for comparison.

    Returns:
        float: Mean IoU score.
    """
    if outputs.size()[2:] != masks.size()[2:]:
        outputs = F.interpolate(outputs, size=masks.size()[2:], mode='nearest')
    outputs = torch.sigmoid(outputs) > 0.5
    outputs = outputs.float()
    outputs = outputs.cpu().numpy().astype(np.uint8)
    masks = masks.cpu().numpy().astype(np.uint8)
    ious = [jaccard_score(m.flatten(), o.flatten(), zero_division=1) for m, o in zip(masks, outputs)]
    return np.mean(ious)

def validate_model(val_loader, model, output_size, device, criterion):
    """
    Evaluates the model on a validation dataset.

    Args:
        val_loader (DataLoader): DataLoader that provides batches of images and masks.
        model (nn.Module): Neural network model to evaluate.
        output_size (int): Size of the segments into which each image is divided.
        device (torch.device): Device where computations are performed (CPU or GPU).
        criterion (loss function): Loss function used to evaluate model accuracy.

    Returns:
        tuple: Average validation losses and average IoU scores.
    """
    model.eval()  # Sets the model to evaluation mode (important for deactivating dropout, batchnorm, etc.)
    val_losses = []  # List to store losses for each batch
    iou_scores = []  # List to store IoU scores for each batch

    for images, masks in val_loader:
        images = images.to(device)  # Send images to the appropriate device
        masks = masks.to(device)  # Send masks to the appropriate device

        # Tensor to store predictions for all images in the batch
        predictions = torch.zeros(masks.size(0), 2, masks.size(2), masks.size(3), device=device)

        # Process each image in the batch individually
        for i, image in enumerate(images):
            # Split the image into segments and obtain the coordinates of each segment
            segments, coords = split_image_into_segments(image.cpu(), output_size)
            # Predict and reconstruct the complete mask from the segments
            full_mask = predict_and_recombine_segments(segments, coords, model, output_size, image.shape, device)
            predictions[i] = full_mask.to(device)  # Save the reconstructed mask in the predictions tensor
        
        # Calculate loss for the batch using the specified loss function
        loss = criterion(predictions, masks.argmax(dim=1))  # Use argmax if masks are in one-hot format
        val_losses.append(loss.item())  # Add the batch's loss to the list of losses

        # Calculate IoU for the batch using the defined function
        iou = calculate_iou(predictions.argmax(dim=1), masks.argmax(dim=1))  # Use argmax to compare predicted classes
        iou_scores.append(iou)  # Add the IoU of the batch to the list of IoUs
        
    # Calculate and return the average of the losses and IoUs of all processed batches
    return np.mean(val_losses), np.mean(iou_scores)


In [None]:
def visualize_image_and_segments(data_loader, output_size):
    """
    Visualizes the first image from a DataLoader and its resulting segments of a specified size.

    Args:
        data_loader (DataLoader): DataLoader that provides images and masks.
        output_size (int): Size of the square segments into which the image will be divided.
    """
    # Get a batch of images and masks
    images, masks = next(iter(data_loader))
    image = images[0]  # Take the first image from the batch
    mask = masks[0]    # Take the first mask from the batch

    # Split the image into segments using the defined function
    segments, coords = split_image_into_segments(image, output_size)

    # Setup for plotting
    num_segments = len(segments)
    cols = 4  # Number of columns in the plot
    rows = (num_segments // cols) + 2  # Calculate number of rows in the plot

    plt.figure(figsize=(cols * 4, rows * 4))

    # Display the original image in the first subplot
    plt.subplot(rows, cols, 1)
    plt.imshow(image.permute(1, 2, 0))  # Reorder from [C, H, W] to [H, W, C]
    plt.title("Original Image")
    plt.axis('off')

    # Display each segment in additional subplots
    for idx, segment in enumerate(segments, start=2):
        plt.subplot(rows, cols, idx)
        plt.imshow(segment.permute(1, 2, 0))  # Reorder from [C, H, W] to [H, W, C]
        plt.title(f"Segment {idx-1}")
        plt.axis('off')

    plt.tight_layout()
    plt.show()

# Assuming that `val_loader` and `output_size` are correctly defined
visualize_image_and_segments(val_loader, output_size=224)

In [None]:
# Load the model
model = UNet()
# Check if CUDA is available and move the model to the appropriate device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Define the loss function and the optimizer
criterion = nn.CrossEntropyLoss()  # Suitable for multi-class classification
optimizer = optim.Adam(model.parameters(), lr=1e-4)  # Adam optimizer with learning rate 0.0001



In [None]:
sizes = [(224, 224), (448, 448)]
num_epochs = 5
# Assuming you are running this from the main directory of your project
base_path = os.getcwd()  # Get the path of the current directory where the script is running
base_path = os.path.join(base_path, 'MBDS/png')  # Add 'images' subdirectory to the base path

best_loss = 0  # Initialize the best loss as zero for future comparisons

torch.cuda.empty_cache()  # Clear CUDA cache to free up GPU memory

for resize in sizes:
    print(f"Training model with image size: {resize}")  # Report the image size to be trained on
    train_loader, val_loader = create_dataloaders_onehot(base_path, resize[0])  # Create DataLoader for training and validation

    # Reset lists of losses and IoU for each size
    train_losses = []
    val_losses = []
    iou_scores = []

    for epoch in range(num_epochs):  # Loop over each epoch
        model.train()  # Set the model to training mode
        running_loss = 0.0
        train_tqdm = tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs} Training', leave=True)  # Use tqdm for progress bar
        for images, masks in train_tqdm:
            images, masks = images.to(device), masks.to(device)  # Move images and masks to the device (GPU or CPU)
            
            optimizer.zero_grad()  # Reset the optimizer gradients
            outputs = model(images)  # Get model outputs
            loss = criterion(outputs, masks)  # Calculate loss
            loss.backward()  # Backpropagate the loss
            optimizer.step()  # Update model weights
            
            running_loss += loss.item() * images.size(0)  # Accumulate loss adjusted for batch size
            train_tqdm.set_postfix(loss=(running_loss / (train_tqdm.last_print_n + 1)))  # Display average loss in tqdm
        
        epoch_loss = running_loss / len(train_loader.dataset)  # Calculate average loss for the epoch
        train_losses.append(epoch_loss)  # Save training loss

        val_loss, average_iou = validate_model(val_loader, model, output_size=1344, device=device, criterion=criterion)  # Validate the model
        val_losses.append(val_loss)  # Save validation loss
        iou_scores.append(average_iou)  # Save average IoU
        
        print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {train_losses[-1]:.4f}, Val Loss: {val_losses[-1]:.4f}, Average IoU: {iou_scores[-1]:.4f}')

        if average_iou > best_loss:  # Save the model if the average IoU is the best so far
            model_save_path = os.path.join('saved_models', f'model_cropped_{resize[0]}x{resize[1]}.pth')
            os.makedirs('saved_models', exist_ok=True)  # Create the directory if it does not exist
            torch.save(model.state_dict(), model_save_path)
            print(f"Model saved to {model_save_path}")
            best_loss = average_iou

    # Optionally, plot the results for each size
    plt.figure(figsize=(12, 5))
    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label='Train Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.title(f'Losses over Epochs for Resize: {resize}')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(iou_scores, label='IoU Score')
    plt.title(f'IoU Scores over Epochs for Resize: {resize}')
    plt.xlabel('Epochs')
    plt.ylabel('IoU Score')
    plt.legend()
    plt.show()

    model.apply(weight_reset)  # Reset model weights to avoid residual training effects in the next iteration


In [None]:
def create_test_dataloader(base_path, batch_size, resize):
    """
    Creates a DataLoader for the test dataset with specific transformations for images and masks.

    Args:
        base_path (str): Base path where image and mask directories are stored.
        batch_size (int): Number of samples per batch.
        resize (int): Size to which images and masks will be resized.

    Returns:
        DataLoader: DataLoader object configured for the test dataset.
    """
    # Transformations for images
    transform_image = transforms.Compose([
        transforms.Resize(1344),  # Resize images to a fixed size
        transforms.ToTensor(),  # Convert images to tensors
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize the images
    ])
    
    # Transformations for masks, including conversion to one-hot format
    transform_mask = transforms.Compose([
        transforms.Resize(1344),  # Resize masks to the same size as images
        ToOneHot(),  # Convert masks to one-hot format
        # Note: Tensor conversion is handled internally by ToOneHot if necessary
    ])
    
    # Directories where the test images and masks are located
    test_dir = os.path.join(base_path, 'test')
    test_mask_dir = os.path.join(base_path, 'test_labels')
    # Create a dataset using the defined transformations
    test_dataset = SatelliteBuildingDataset(test_dir, test_mask_dir, transform_image, transform_mask)
    # Create the DataLoader with the test data
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    return test_loader

def test_and_plot_model(val_loader, model, output_size, device, num_examples=1):
    """
    Evaluates the model and visualizes the results for a limited set of examples from the validation DataLoader.

    Args:
        val_loader (DataLoader): DataLoader that provides test or validation data.
        model (torch.nn.Module): Neural network model to be evaluated.
        output_size (int): Desired output size for the image segments.
        device (torch.device): Device on which the model is running (CPU or GPU).
        num_examples (int): Number of examples to visualize.
    """
    model.eval()  # Set the model to evaluation mode
    for batch_idx, (images, masks) in enumerate(val_loader):
        if batch_idx >= num_examples:
            break
        images = images.to(device)
        masks = masks.to(device)
        predictions = torch.zeros_like(masks)  # Prepare tensor to store predictions

        for i, (image, mask) in enumerate(zip(images, masks)):
            segments, coords = split_image_into_segments(image.cpu(), output_size)
            full_mask = predict_and_recombine_segments(segments, coords, model, output_size, image.shape, device)
            predictions[i] = full_mask.to(device)
        
            # Convert one-hot masks to a single channel for visualization
            mask_single_channel = torch.argmax(mask, dim=0).cpu()
            full_mask_single_channel = torch.argmax(full_mask, dim=0).cpu()
        
            # Setup plots to show the original image, the original mask, and the predicted mask
            plt.figure(figsize=(15, 5))
            plt.subplot(1, 3, 1)
            plt.imshow(image.cpu().permute(1, 2, 0))
            plt.title("Original Image")
            plt.axis("off")
        
            plt.subplot(1, 3, 2)
            plt.imshow(mask_single_channel, cmap="gray")
            plt.title("Original Mask")
            plt.axis("off")
        
            plt.subplot(1, 3, 3)
            plt.imshow(full_mask_single_channel, cmap="gray")
            plt.title("Predicted Full Mask")
            plt.axis("off")
            plt.show()


In [None]:
batch_size = 1  # Batch size set to 1, useful for detailed evaluations or where individual processing is required
current_directory = os.getcwd()  # Assuming 'current_directory' is defined somewhere else in the code
base_path = os.path.join(current_directory, 'MBDS/png')

# Loop over different image sizes to test the model with each size configuration
for resize in sizes:
    model_path = f'saved_models/model_cropped_{resize[0]}x{resize[1]}.pth'  # Path to the saved model
    test_loader = create_test_dataloader(base_path, batch_size, resize)  # Create DataLoader for the test set

    model = UNet()  # Creation of the model instance, assuming it is a U-Net
    model.load_state_dict(torch.load(model_path))  # Load the model weights from the saved file
    model.to(device)  # Move the model to the appropriate device (GPU or CPU)

    # Run the function that tests the model and visualizes the results
    # Note: Ensure that 'output_size' is correctly defined here to match the logic of 'test_and_plot_model'
    test_and_plot_model(test_loader, model, resize[0], device, num_examples=5)

In [None]:
# The results with this method are much better, improving the original result by fivefold in terms of IoU. 
# To further improve the model, I think it could be modified to use some type of pre-trained model to constitute the U-Net network.