### UNet Model for Image Segmentation with Data Loading and Augmentation

In [None]:
import numpy as np
import pandas as pd
from PIL import Image
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from albumentations import Compose, RandomRotate90, Flip, Transpose
from torchvision import transforms
import cv2

train = pd.read_csv('kvasir-instrument/train.txt')
test = pd.read_csv('kvasir-instrument/test.txt')

# Printing the number of samples in the training and test datasets

print(f'Total training samples: {len(train)}')
print(f'Total test samples: {len(test)}')

# Defining Augmentation transformations
aug_transform = Compose([
    RandomRotate90(),
    Flip(),
    Transpose(),
])

class CustomDataset(Dataset):
    def __init__(self, data, transform=None, target_size=(256, 256)):
        self.data = data
        self.transform = transform
        self.target_size = target_size

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        name = self.data.values[idx][0]
        img_name = 'kvasir-instrument/images/images/' + name + '.jpg'
        img_mask = 'kvasir-instrument/masks/masks/' + name + '.png'

        img = Image.open(img_name).convert('RGB')
        mask = Image.open(img_mask).convert('L')
        
        # Resizing images and masks to the target size
        img = img.resize(self.target_size, Image.Resampling.BILINEAR)
        mask = mask.resize(self.target_size, Image.Resampling.NEAREST)

        # Applying transformations
        if self.transform:
            augmented = self.transform(image=np.array(img), mask=np.array(mask))
            img = augmented['image']
            mask = augmented['mask']
        
        # Converting PIL images to tensors
        img = transforms.ToTensor()(img)
        mask = transforms.ToTensor()(mask)
        
        return img, mask

train_dataset = CustomDataset(train, transform=aug_transform)
test_dataset = CustomDataset(test, transform=None)  # No augmentation since its not needed for test dataset

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False)

class UNet(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(UNet, self).__init__()
        
        # Encoder (contracting path)
        self.encoder = nn.Sequential(
            nn.Conv2d(in_channels, 64, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        
        # Middle (bottleneck)
        self.middle = nn.Sequential(
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        
        # Decoder (expansive path)
        self.decoder = nn.Sequential(
            nn.Conv2d(256, 128, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 64, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.ConvTranspose2d(64, out_channels, kernel_size=2, stride=2)
        )

    def forward(self, x):
        x1 = self.encoder(x)
        x2 = self.middle(x1)
        x3 = self.decoder(x2)
        return x3

# Initialising the model, loss function, and optimizer
in_channels = 3  
out_channels = 1  
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')  
model = UNet(in_channels, out_channels).to(device)
criterion = nn.BCEWithLogitsLoss()   
optimizer = optim.Adam(model.parameters(), lr=0.001)

### UNet Model Training and Testing Loop

In [None]:
# Training loop
print("Training UNetModel:")

num_epochs = 10

for epoch in range(num_epochs):
    model.train()
    total_loss = 0.0
    for i, (images, masks) in enumerate(train_loader):
        images, masks = images.to(device), masks.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        
        # Resizing the output to match the mask size
        outputs = nn.functional.interpolate(outputs, size=masks.size()[2:], mode='bilinear', align_corners=True)
        
        loss = criterion(outputs, masks)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        
    average_loss = total_loss / len(train_loader)
    print(f'Epoch [{epoch + 1}/{num_epochs}], Average Loss: {average_loss:.4f}')
print('Finished Training')



torch.save(model.state_dict(), 'unet_model.pth')
print('Trained model saved successfully.')

In [None]:
# Testing loop
print("Testing UNetModel:")
model.eval()
total_loss = 0.0
with torch.no_grad():
    for i, (images, masks) in enumerate(test_loader):
        images, masks = images.to(device), masks.to(device)
        outputs = model(images)
        
        # Resizing the output to match the mask size
        outputs = nn.functional.interpolate(outputs, size=masks.size()[2:], mode='bilinear', align_corners=True)
        
        loss = criterion(outputs, masks)
        total_loss += loss.item()

average_loss = total_loss / len(test_loader)
print(f'Average Test Loss: {average_loss:.4f}')
print('Finished Testing')

In [None]:
import matplotlib.pyplot as plt

# Provided training loss values
epoch_numbers = list(range(1, 11))
training_loss_values = [0.2884, 0.1682, 0.1488, 0.1507, 0.1495, 0.1450, 0.1358, 0.1334, 0.1338, 0.1332]

# Plotting the training loss over epochs
plt.figure(figsize=(10, 5))
plt.plot(epoch_numbers, training_loss_values, color='blue', marker='o')
plt.xlabel('Epochs')
plt.ylabel('Training Loss')
plt.title('Training Loss of UNet Model over Epochs')
plt.grid(True)
plt.xticks(epoch_numbers)
plt.tight_layout()
plt.show()


### Model Summary with Layer-wise Input and Output Shapes

In [None]:
def summary(model, input_size):
    def forward_hook(module, input, output):
        # Printing layer information
        print(f"{module.__class__.__name__.ljust(20)} | "
              f"Input shape: {str(input[0].shape).ljust(30)} | "
              f"Output shape: {str(output.shape).ljust(30)}")

    # Registering the hook to each layer
    hooks = []
    for layer in model.children():
        hook = layer.register_forward_hook(forward_hook)
        hooks.append(hook)

    # Generating a dummy input
    input_tensor = torch.rand(input_size).unsqueeze(0)

    # Performing a forward pass to print the summary
    print("\nModel Summary:")
    print("=" * 75)
    model(input_tensor)

    # Removing the hooks
    for hook in hooks:
        hook.remove()

input_size = (in_channels, 256, 256)
summary(model, input_size)


### Model Evaluation Metrics: 
##### Test Loss and Intersection over Union (IoU) with Visualization

In [None]:
from sklearn.metrics import confusion_matrix, classification_report, jaccard_score
import matplotlib.pyplot as plt

# Function to calculate metrics
def calculate_metrics(model, dataloader, criterion):
    model.eval()
    total_loss = 0.0
    all_predictions = []
    all_masks = []

    with torch.no_grad():
        for i, (images, masks) in enumerate(dataloader):
            images, masks = images.to(device), masks.to(device)

            # Forward pass
            outputs = model(images)
            outputs = nn.functional.interpolate(outputs, size=masks.size()[2:], mode='bilinear', align_corners=True)

            # Calculate loss
            loss = criterion(outputs, masks)
            total_loss += loss.item()

            # Converting to binary predictions
            predictions = torch.sigmoid(outputs) > 0.5
            all_predictions.extend(predictions.cpu().numpy())
            all_masks.extend(masks.cpu().numpy())

    average_loss = total_loss / len(dataloader)

    return average_loss, np.array(all_predictions), np.array(all_masks)

# Calculating metrics on the test set
test_loss, test_predictions, test_masks = calculate_metrics(model, test_loader, criterion)

# Calculating IoU
iou = jaccard_score(test_masks.flatten(), test_predictions.flatten(), average='micro')

# Print
print(f'Test Loss: {test_loss:.4f}')
print(f'Intersection over Union (IoU): {iou:.4f}')

# Visualise the results using bar plots
metrics_names = ['Test Loss', 'Intersection over Union (IoU)']
metrics_values = [test_loss, iou]

plt.figure(figsize=(8, 6))
plt.bar(metrics_names, metrics_values, color=['teal', 'salmon'])
plt.title('Evaluation Metrics')
plt.xlabel('Metrics')
plt.ylabel('Values')
plt.show()

### Evaluation Metrics for U-Net Semantic Segmentation: 
##### Confusion Matrix, Precision-Recall Curve, ROC Curve, and Dice Coefficient

In [None]:
from sklearn.metrics import precision_recall_curve, roc_curve, auc
from sklearn.metrics import f1_score, average_precision_score
from torchvision.transforms.functional import to_pil_image
from torchvision import utils

# Loading the trained model
model = UNet(in_channels, out_channels).to(device)
model.load_state_dict(torch.load('unet_model.pth'))
model.eval()

# Initialising empty lists to store predictions and true labels
all_predictions = []
all_labels = []

with torch.no_grad():
    for images, masks in test_loader:
        images, masks = images.to(device), masks.to(device)
        outputs = model(images)
        
        # Converting the model output to binary predictions based on a threshold
        predictions = (torch.sigmoid(outputs) > 0.5).cpu().numpy()
        
        # Converting true labels to numpy array
        labels = masks.cpu().numpy()
        
        # Appending predictions and true labels to the lists
        all_predictions.append(predictions)
        all_labels.append(labels)

all_predictions = np.concatenate(all_predictions)
all_labels = np.concatenate(all_labels)

# Flatten the predictions and true labels
flat_predictions = all_predictions.flatten()
flat_labels = all_labels.flatten()

# Both arrays have the same shape
min_length = min(len(flat_predictions), len(flat_labels))
flat_predictions = flat_predictions[:min_length]
flat_labels = flat_labels[:min_length]

# 1. Confusion Matrix
conf_matrix = confusion_matrix(flat_labels, flat_predictions)


# 2. Precision-Recall Curve
precision, recall, _ = precision_recall_curve(flat_labels, flat_predictions)
average_precision = average_precision_score(flat_labels, flat_predictions)

# 3. ROC Curve
fpr, tpr, _ = roc_curve(flat_labels, flat_predictions)
roc_auc = auc(fpr, tpr)

# 4. Dice Coefficient
intersection = np.sum(flat_predictions * flat_labels)
dice_coefficient = (2.0 * intersection) / (np.sum(flat_predictions) + np.sum(flat_labels))

# Displaying
print(f"Confusion Matrix:\n{conf_matrix}")
print(f"Average Precision: {average_precision:.4f}")
print(f"Area under ROC Curve: {roc_auc:.4f}")
print(f"Dice Coefficient: {dice_coefficient:.4f}")


In [None]:
# Plottin Precision-Recall Curve
plt.figure(figsize=(8, 6))
plt.plot(recall, precision, color='blue', lw=2, label='Precision-Recall Curve (AP={:.2f})'.format(average_precision))
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title('Precision-Recall Curve')
plt.legend(loc='best')
plt.show()

# Plotting ROC Curve
plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, color='darkorange', lw=2, label='ROC Curve (AUC={:.2f})'.format(roc_auc))
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc='best')
plt.show()

### Visualisation Feature Maps in a Convolutional Layer of the Model

In [None]:
for name, layer in model.named_modules():
    if isinstance(layer, nn.Conv2d):
        # Choosing a specific layer to visualize
        if layer.in_channels == 3:
            activation = layer(images)
            num_feature_maps = activation.size(1)

            # Number of rows and columns in the grid
            rows = num_feature_maps // 8 + 1
            columns = min(8, num_feature_maps)

            plt.figure(figsize=(16, 2 * rows))
            plt.suptitle(f'Feature Maps in Layer: {name}', y=1.02)

            for i in range(num_feature_maps):
                plt.subplot(rows, columns, i + 1)
                plt.imshow(activation[0, i].cpu().detach().numpy(), cmap='viridis')
                plt.axis('off')

            plt.tight_layout()
            plt.show()
            break  # Stopping after visualisation the first convolutional layer


### Model Prediction Visualisation on Training Dataset

In [None]:
from torchvision import utils
from torchvision.transforms.functional import to_pil_image

# Defining a new dataset for visualisation to compare
visualisation_dataset = CustomDataset(train, transform=None)  # No augmentation for visualization
visualisation_loader = DataLoader(visualization_dataset, batch_size=8, shuffle=True)

# Visualising images using the trained model
with torch.no_grad():
    for i, (images, masks) in enumerate(visualization_loader):
        images, masks = images.to(device), masks.to(device)

        # Display original images
        plt.figure(figsize=(15, 6))
        plt.subplot(1, 3, 1)
        plt.title('Original Image')
        plt.imshow(utils.make_grid(images.cpu(), nrow=4).permute(1, 2, 0))

        # Applying the trained model to get predictions
        outputs = model(images)
        outputs = nn.functional.interpolate(outputs, size=masks.size()[2:], mode='bilinear', align_corners=True)

        # Converting the model output and mask to binary predictions
        predictions = torch.sigmoid(outputs) > 0.5

        # Display model predictions
        plt.subplot(1, 3, 2)
        plt.title('Model Prediction')
        plt.imshow(utils.make_grid(predictions.cpu(), nrow=4).permute(1, 2, 0)[:, :, 0], cmap='gray')

        # Display masks
        plt.subplot(1, 3, 3)
        plt.title('Mask')
        plt.imshow(utils.make_grid(masks.cpu(), nrow=4).permute(1, 2, 0)[:, :, 0], cmap='gray')

        plt.show()

        if i == 0:
            break


In [None]:
# visualisation 3 images using the trained model
with torch.no_grad():
    for i, (images, masks) in enumerate(visualization_loader):
        images, masks = images.to(device), masks.to(device)

        # Displaying original images, model predictions, and masks in separate rows
        for j in range(min(3, images.size(0))):
            plt.figure(figsize=(8, 4))

            # Original image
            plt.subplot(1, 3, 1)
            plt.title('Original Image')
            plt.imshow(to_pil_image(images[j].cpu()))

            # Applying the trained model to get predictions
            outputs = model(images[j].unsqueeze(0))
            outputs = nn.functional.interpolate(outputs, size=masks.size()[2:], mode='bilinear', align_corners=True)

            # Converting the model output and mask to numpy arrays
            predictions = torch.sigmoid(outputs).cpu().squeeze(0).numpy()
            mask_np = masks[j].cpu().numpy()

            # Displaying model predictions
            plt.subplot(1, 3, 2)
            plt.title('Model Prediction')
            plt.imshow(predictions[0], cmap='gray', vmin=0, vmax=1)  # Assuming predictions is a single-channel image

            # Displaying mask
            plt.subplot(1, 3, 3)
            plt.title('Mask')
            plt.imshow(mask_np[0], cmap='gray', vmin=0, vmax=1)  # Assuming mask_np is a single-channel image

            plt.show()

        break


### Visualisation of Model Predictions on Test Images and Augmented Versions

In [None]:
# Testing loop with augmentation visualisation
print("Testing UNetModel:")
model.eval()
with torch.no_grad():
    for i, (images, masks) in enumerate(test_loader):
        images, masks = images.to(device), masks.to(device)

        # Original image
        original_output = model(images)
        original_output = nn.functional.interpolate(original_output, size=masks.size()[2:], mode='nearest')

        # Augmented images
        augmented_outputs = []
        for transform_name, transform_func in zip(['Rotate', 'Flip', 'Transpose'],
                                                 [aug_transform.transforms[0], aug_transform.transforms[1], aug_transform.transforms[2]]):
            # Applying the transformation to each image in the batch, including the original image
            augmented_images = torch.stack([torch.from_numpy(transform_func(image=img.permute(1, 2, 0).cpu().numpy())['image']).permute(2, 0, 1) for img in images], dim=0)

            augmented_output = model(augmented_images)
            augmented_output = nn.functional.interpolate(augmented_output, size=masks.size()[2:], mode='nearest')
            augmented_outputs.append((transform_name, augmented_output))

        # Plotting
        num_plots = len(augmented_outputs) + 3  # 3 for the original image and prediction
        plt.figure(figsize=(5 * num_plots, 5))

        # Plotting the original image
        plt.subplot(1, num_plots, 1)
        plt.title('Original Image')
        plt.imshow((images[0].permute(1, 2, 0).cpu().numpy() * 255).astype('uint8'), cmap='gray_r', interpolation='nearest')
        plt.axis('off')

        # Plotting the model's prediction for the original image
        plt.subplot(1, num_plots, 2)
        plt.title('Original Prediction')
        plt.imshow((torch.sigmoid(original_output[0, 0]).cpu().numpy() * 255).astype('uint8'), cmap='gray_r', interpolation='nearest')
        plt.axis('off')

        # Plotting the augmented images and their predictions
        for j, (transform_name, augmented_output) in enumerate(augmented_outputs):
            plt.subplot(1, num_plots, j + 3)
            plt.title(f'{transform_name} Augmented Prediction')
            plt.imshow((torch.sigmoid(augmented_output[0, 0]).cpu().numpy() * 255).astype('uint8'), cmap='gray_r', interpolation='nearest')
            plt.axis('off')

        plt.show()

        if i == 2:  
            break  # Break after visualising a 3 samples
