In [1]:
# Clone the GitHub repository
!git clone https://github.com/loressa/DataScience_MPhill_practicals.git
# Install the pydicom package
!pip install pydicom
!pip install torchmetrics

fatal: destination path 'DataScience_MPhill_practicals' already exists and is not an empty directory.


In [2]:
import pydicom
from pydicom import dcmread
import os
from os import listdir
from os.path import isfile, join
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
import matplotlib.pyplot as plt
import torchmetrics
from torchmetrics.classification import BinaryAccuracy
import numpy as np
from torchsummary import summary
from torch.optim import Adam
from tqdm import tqdm

In [3]:
class UNet(nn.Module):
  def __init__(self, in_channels, out_channels):
    super().__init__()
    self.conv1 = self.conv_block(in_channels, 16, 3, 1, 1)
    self.maxpool1 = self.maxpool_block(2, 2, 0)
    self.conv2 = self.conv_block(16, 32, 3, 1, 1)
    self.maxpool2 = self.maxpool_block(2, 2, 0)
    self.conv3 = self.conv_block(32, 64, 3, 1, 1)
    self.maxpool3 = self.maxpool_block(2, 2, 0)
    self.middle = self.conv_block(64, 128, 3, 1, 1)
    self.upsample3 = self.transposed_block(128, 64, 3, 2, 1, 1)
    self.upconv3 = self.conv_block(128, 64, 3, 1, 1)
    self.upsample2 = self.transposed_block(64, 32, 3, 2, 1, 1)
    self.upconv2 = self.conv_block(64, 32, 3, 1, 1)
    self.upsample1 = self.transposed_block(32, 16, 3, 2, 1, 1)
    self.upconv1 = self.conv_block(32, 16, 3, 1, 1)
    self.final = self.final_layer(16, 1, 1, 1, 0)

  def conv_block(self, in_channels, out_channels, kernel_size, stride, padding):
    convolution = nn.Sequential(
                  nn.Conv2d(in_channels, out_channels, kernel_size=kernel_size, stride=stride, padding=padding),
                  nn.BatchNorm2d(out_channels),
                  nn.ReLU(inplace=True),
                  nn.Conv2d(out_channels, out_channels, kernel_size=kernel_size, stride=stride, padding=padding),
                  nn.BatchNorm2d(out_channels),
                  nn.ReLU(inplace=True))
    return convolution

  def maxpool_block(self, kernel_size, stride, padding):
      maxpool = nn.Sequential(nn.MaxPool2d(kernel_size=kernel_size, stride=stride, padding=padding),
      nn.Dropout2d(0.5))
      return maxpool

  def transposed_block(self, in_channels, out_channels, kernel_size, stride, padding, output_padding):
      transposed = torch.nn.ConvTranspose2d(in_channels, out_channels, kernel_size=kernel_size, stride=stride,
      padding=padding, output_padding=output_padding)
      return transposed

  def final_layer(self, in_channels, out_channels, kernel_size, stride, padding):
      final = nn.Conv2d(in_channels, out_channels, kernel_size=kernel_size, stride=stride, padding=padding)
      return final

  def forward(self, x):
      # downsampling part
      conv1 = self.conv1(x)
      maxpool1 = self.maxpool1(conv1)
      conv2 = self.conv2(maxpool1)
      maxpool2 = self.maxpool2(conv2)
      conv3 = self.conv3(maxpool2)
      maxpool3 = self.maxpool3(conv3)
      # middle part
      middle = self.middle(maxpool3)
      # upsampling part
      upsample3 = self.upsample3(middle)
      upconv3 = self.upconv3(torch.cat([upsample3, conv3], 1))
      upsample2 = self.upsample2(upconv3)
      upconv2 = self.upconv2(torch.cat([upsample2, conv2], 1))
      upsample1 = self.upsample1(upconv2)
      upconv1 = self.upconv1(torch.cat([upsample1, conv1], 1))
      final_layer = self.final(upconv1)
      return final_layer

In [4]:
# Function to convert DICOM files to numpy array
def dicom_to_numpy(dicom_dir):
    dicom_slices = [dcmread(os.path.join(dicom_dir, filename)) for filename in os.listdir(dicom_dir)]
    dicom_slices.sort(key=lambda x: float(x.ImagePositionPatient[2]))  # Sort slices by z-coordinate
    pixel_arrays = [slice.pixel_array for slice in dicom_slices]
    volume = np.stack(pixel_arrays, axis=-1)
    return volume

In [5]:
def prepare_images_tensor(dataset_dir):
  # List of case directories
  case_dirs = [os.path.join(dataset_dir, f"Case_{str(i).zfill(3)}") for i in range(0, 12)]
  # Convert DICOM datasets to numpy arrays
  case_volumes = []
  for case_dir in case_dirs:
      case_volume = dicom_to_numpy(case_dir)
      case_volumes.append(case_volume)
  # Reshape each case volume to (512, 512) format
  reshaped_slices = []

  for case_volume in case_volumes:
      num_slices = case_volume.shape[-1]  # Get the number of slices
      for i in range(num_slices):
          reshaped_slices.append(case_volume[:, :, i])

  # Combine all reshaped slices into a single array
  combined_slices = np.stack(reshaped_slices, axis=0)
  # Convert combined_slices to a PyTorch tensor with data type torch.float32
  images_tensor = torch.from_numpy(combined_slices.astype(np.float32))
  return images_tensor


In [6]:
def prepare_masks_tensor(dataset_dir):
  # List to store all the loaded 3D arrays
  masks_all = []
  # Iterate over each file
  for i in range(12):
      # Construct the file path for each case
      image_file = os.path.join(dataset_dir, f"Case_{str(i).zfill(3)}_seg.npz")

      # Load segmentation masks from the NPZ file
      segmentation_data = np.load(image_file)
      image_array = segmentation_data['masks']
      # print(image_array.shape)
      for j in range(len(image_array)):
        image_array_j = image_array[j,:,:]
        # Append the 3D array to the list
        masks_all.append(image_array_j)
    # Combine all reshaped slices into a single array
  masks_all = np.stack(masks_all, axis=0)
  # Convert combined_slices to a PyTorch tensor with data type torch.float32
  masks_tensor = torch.from_numpy(masks_all.astype(np.float32))
  return masks_tensor


In [7]:
def split_dataset(images_tensor, masks_tensor):
  train_samples = 1151
  # Split the tensors into train and test sets
  train_images = images_tensor[:train_samples]
  train_masks = masks_tensor[:train_samples]
  test_images = images_tensor[train_samples:]
  test_masks = masks_tensor[train_samples:]
  # Add a new channel dimension
  train_images = train_images.unsqueeze(1)
  train_masks = train_masks.unsqueeze(1)
  test_images = test_images.unsqueeze(1)
  test_masks = test_masks.unsqueeze(1)
  return train_images, train_masks, test_images, test_masks


In [8]:
def soft_dice_loss(pred, target, smooth=1e-5):
    intersection = torch.sum(pred * target)
    dice_coeff = (2. * intersection + smooth) / (torch.sum(pred) + torch.sum(target) + smooth)
    dice_loss = 1. - dice_coeff
    return dice_loss

def custom_loss(pred, target, alpha=0.5):
    bce_loss = torch.nn.BCEWithLogitsLoss()(pred, target)
    dice_loss = soft_dice_loss(torch.sigmoid(pred), target)
    total_loss = alpha * bce_loss + (1 - alpha) * dice_loss
    return total_loss

In [17]:
def training(model, train_images, train_masks, device, epochs=10, learning_rate=0.1):
    model.to(device)
    train_loader = DataLoader(TensorDataset(train_images, train_masks), batch_size=3, shuffle=True)
    optimizer = Adam(model.parameters(), lr=learning_rate)
    accuracy_function = BinaryAccuracy(threshold=0.5).to(device)

    train_losses = []
    train_accuracies = []

    for epoch in range(epochs):
        # Set the model in training mode
        model.train()
        # Initialize the total training loss and training accuracy
        epoch_train_loss = 0.0
        epoch_train_accuracy = 0.0

        # Loop over the training set
        for (x, y) in tqdm(train_loader, desc=f'Epoch {epoch+1}'):
            # Send the input to the device
            x, y = x.to(device), y.to(device)
            # Perform a forward pass and calculate the training loss
            pred = model(x)
            loss = custom_loss(pred, y, alpha=0.5)
            # Zero out any previously accumulated gradients, perform backpropagation, and update model parameters
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            # Add the loss to the total training loss so far
            epoch_train_loss += loss.item()
            epoch_train_accuracy += accuracy_function(torch.sigmoid(pred), y.int())

        # Calculate average loss and accuracy for the epoch
        avg_train_loss = epoch_train_loss / len(train_loader)
        avg_train_accuracy = epoch_train_accuracy / len(train_loader)

        # Append to lists
        train_losses.append(avg_train_loss)
        train_accuracies.append(avg_train_accuracy.item())

        # Print epoch metrics
        print(f'Epoch {epoch+1}, loss: {avg_train_loss:.4f}, Binary Accuracy: {avg_train_accuracy:.4f}')

    # Save trained model
    torch.save(model.state_dict(), 'trained_model.pt')

    return model, train_losses, train_accuracies


In [20]:
def plot_loss_accuracy(train_losses, train_accuracies):
    epochs_idx = range(1, len(train_losses) + 1)

    plt.figure(figsize=(10, 5))

    # Plot training loss
    plt.subplot(1, 2, 1)
    plt.plot(epochs_idx, train_losses, marker='o', color='b', linestyle='-', label='Training Loss')
    plt.title('Training Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.xticks(epochs_idx)
    plt.grid(True)
    plt.legend()

    # Plot training accuracy
    plt.subplot(1, 2, 2)
    plt.plot(epochs_idx, train_accuracies, marker='o', color='r', linestyle='-', label='Training Accuracy')
    plt.title('Training Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.xticks(epochs_idx)
    plt.grid(True)
    plt.legend()

    plt.tight_layout()
    plt.savefig('Training_loss_acc.png')
    plt.show()

In [26]:
def dice_similarity_coefficient(pred, target, smooth=1e-5):
  # pred = torch.sigmoid(pred)
  pred = (pred > 0.5).float()
  intersection = torch.sum(pred * target)
  dice_coeff = (2. * intersection + smooth) / (torch.sum(pred) + torch.sum(target) + smooth)
  return dice_coeff

In [34]:
def evaluate_model(model, test_images, test_masks, device):
    model.eval()  # Set model to evaluation mode
    test_loader = DataLoader(TensorDataset(test_images, test_masks), batch_size=1, shuffle=False)  # Batch size 1 for inference
    accuracy_function = BinaryAccuracy(threshold=0.5).to(device)
    real_list, pred_list, ground_list = [], [], []
    dsc_list, accuracy_list = [], []
    num_samples = 0

    for images, masks in tqdm(test_loader, desc='Evaluating'):
        with torch.no_grad():
            # Move data to device
            images, masks = images.to(device), masks.to(device)

            # Make predictions
            preds = model(images)
            preds = torch.sigmoid(preds)

            # Compute DSC for each sample
            dsc = dice_similarity_coefficient(preds, masks, smooth=1e-5).cpu()
            accuracy = accuracy_function(preds, masks.int())
            dsc_list.append(dsc)
            accuracy_list.append(accuracy.item())

            # remove the batch dimension
            real_list.append(images.squeeze(0))
            pred_list.append(preds.squeeze(0))
            ground_list.append(masks.squeeze(0))

    return dsc_list, accuracy_list, real_list, pred_list, ground_list

In [None]:
def plot_dsc_accuracy_histogram(dsc_list, accuracy_list):
    plt.figure(figsize=(10, 5))

    # Plot histogram for Dice Similarity Coefficient
    plt.subplot(1, 2, 1)
    plt.hist(dsc_list, bins=20, color='skyblue', edgecolor='black', alpha=0.7)
    plt.title('Dice Similarity Coefficient Histogram')
    plt.xlabel('Dice Similarity Coefficient')
    plt.ylabel('Frequency')
    plt.grid(True)

    # Plot histogram for Test Accuracy
    plt.subplot(1, 2, 2)
    plt.hist(accuracy_list, bins=20, color='salmon', edgecolor='black', alpha=0.7)
    plt.title('Accuracy Histogram')
    plt.xlabel('Accuracy')
    plt.ylabel('Frequency')
    plt.grid(True)

    plt.tight_layout()
    plt.savefig('dsc_accuracy_histogram.png')
    plt.show()

In [45]:
def find_examples(images, masks, predictions, dsc_values, num_examples=3):
    dsc_values = np.array(dsc_values)  # Convert to NumPy array
    best_indices = np.argsort(dsc_values)[-num_examples:][::-1]
    worst_indices = np.argsort(dsc_values)[:num_examples]
    median_indices = np.argsort(np.abs(dsc_values - 0.5))[:num_examples]

    examples = {
        'best': [],
        'worst': [],
        'median': []
    }

    for index in best_indices:
        examples['best'].append((images[index], masks[index], predictions[index]))
    for index in worst_indices:
        examples['worst'].append((images[index], masks[index], predictions[index]))
    for index in median_indices:
        examples['median'].append((images[index], masks[index], predictions[index]))

    return examples

In [62]:
def plot_examples(examples, save_path='example_plots/'):
    import os

    categories = ['best', 'worst', 'median']

    # Create a directory to save the plots if it doesn't exist
    os.makedirs(save_path, exist_ok=True)

    for category in categories:
        fig, axes = plt.subplots(nrows=3, ncols=3, figsize=(15, 15))
        fig.suptitle(category.capitalize(), fontsize=16)  # Add category name to the main title

        for i, (image, mask, prediction) in enumerate(examples[category]):
            # Move tensors to CPU before converting to NumPy arrays
            image_np = image.squeeze().cpu().numpy()  # Squeeze singleton dimension
            mask_np = mask.squeeze().cpu().numpy()  # Squeeze singleton dimension
            prediction_np = prediction.squeeze().cpu().numpy()  # Squeeze singleton dimension

            axes[i, 0].imshow(image_np, cmap='gray')
            axes[i, 0].set_title('Image')
            axes[i, 0].axis('off')

            axes[i, 1].imshow(mask_np, cmap='gray')
            axes[i, 1].set_title('Ground Truth')
            axes[i, 1].axis('off')

            axes[i, 2].imshow(prediction_np, cmap='gray')
            axes[i, 2].set_title('Prediction')
            axes[i, 2].axis('off')

        plt.tight_layout()
        plt.savefig(os.path.join(save_path, f'{category}_examples.png'))  # Save the plot
        plt.close(fig)  # Close the figure to release memory



In [None]:
if __name__ == '__main__':
  # Prepare traing dataset and testing dataset
  images_dataset_dir = "DataScience_MPhill_practicals/Dataset/Images"
  images_tensor = prepare_images_tensor(images_dataset_dir)
  masks_dataset_dir = "DataScience_MPhill_practicals/Dataset/Segmentations"
  masks_tensor = prepare_masks_tensor(masks_dataset_dir)
  train_images, train_masks, test_images, test_masks = split_dataset(images_tensor, masks_tensor)
  device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
  model = UNet(in_channels=1, out_channels=1).to(device)
  model, train_losses, train_accuracies = training(model, train_images, train_masks, device, epochs=10, learning_rate=0.1)
  plot_loss_accuracy(train_losses, train_accuracies)
  # If predicting or evaluating from a saved model:
  model.load_state_dict(torch.load("trained_model.pt"))
  print('Reading completed.\nPredicting testing images using pre-trained model trained_model.pt')
  dsc_test, accuracy_test, real_test, pred_test, ground_test = evaluate_model(model, test_images, test_masks, device)
  plot_dsc_accuracy_histogram(dsc_test, accuracy_test)
  examples = find_examples(real_test, ground_test, pred_test, dsc_test, num_examples=3)
  plot_examples(examples, save_path='example_plots_test/')
  dsc_train, accuracy_train, real_train, pred_train, ground_train = evaluate_model(model, train_images, train_masks, device)
  plot_dsc_accuracy_histogram(dsc_train, accuracy_train)
  examples = find_examples(real_train, ground_train, pred_train, dsc_train, num_examples=3)
  plot_examples(examples, save_path='example_plots_train/')