In [None]:
import os
## If you want to select specific video device
# os.environ["CUDA_VISIBLE_DEVICES"]="0"

import torch
import random
import numpy as np
import pandas as pd
import nibabel as nib

import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import transforms
from scipy import stats
import scipy.ndimage
from scipy.ndimage import label, generate_binary_structure
from scipy.ndimage.measurements import sum as nd_sum
import matplotlib.pyplot as plt

# Seeds
seed_value = 42
random.seed(seed_value)
np.random.seed(seed_value)
torch.manual_seed(seed_value)
torch.cuda.manual_seed_all(seed_value)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
# Connect to Google Drive (optional)
from google.colab import drive
drive.mount('/content/drive')

In [2]:
# Enter the path to the dataset
folder = 'D:/Task07_Pancreas/'
imagesTr = folder + 'imagesTr/' 
labelsTr = folder + 'labelsTr/'

In [3]:
# Function to return the list of paths to dataset files
def get_file_paths(data):
    file_paths = []
    for root, directories, files in os.walk(data):
        for filename in files:
            file_paths.append(os.path.join(root, filename))
    return file_paths

In [4]:
# Get the paths of CT files and the masks to variables
ct_paths = sorted(get_file_paths(imagesTr))
label_paths = sorted(get_file_paths(labelsTr))
print(f'ct_paths len: {len(ct_paths)}')
print(f'label_paths len: {len(label_paths)}')
# for file_path in  label_paths:
#     print(file_path)

for i in range(len(ct_paths)):
    if ct_paths[i][-20:-7] != label_paths[i][-20:-7]:
        print(f'Error in {ct_paths[i][-20:-7]}')

ct_paths len: 232
label_paths len: 232


In [5]:
# Fix the files indexes for training and testing sets
indexes = list(range(len(ct_paths)))
random.shuffle(indexes)
split_index = int(0.8 * len(indexes))
train_indexes = indexes[:split_index]
test_indexes = indexes[split_index:]

ct_paths_train = [ct_paths[i] for i in train_indexes]
label_paths_train = [label_paths[i] for i in train_indexes]
ct_paths_test = [ct_paths[i] for i in test_indexes]
label_paths_test = [label_paths[i] for i in test_indexes]
print("Training set size:", len(ct_paths_train))
print("Testing set size:", len(ct_paths_test))
for i in range(len(ct_paths_train)):
    if ct_paths_train[i][-20:-7] != label_paths_train[i][-20:-7]:
        print(f'Error in {ct_paths_train[i][-20:-7]}')
for i in range(len(ct_paths_test)):
    if ct_paths_test[i][-20:-7] != label_paths_test[i][-20:-7]:
        print(f'Error in {ct_paths_test[i][-20:-7]}')

Training set size: 185
Testing set size: 47


In [6]:
# Pancreas data generator class
class PancreasDataset():
    def __init__(self, ct_paths, label_paths, transform):
        self.ct_paths = ct_paths
        self.label_paths = label_paths
        self.transform = transform

    def __len__(self):
        return len(self.ct_paths)

    def __getitem__(self, idx):
        ct_path = self.ct_paths[idx]
        label_path = self.label_paths[idx]

        ct_image = nib.load(ct_path)
        ct_image = ct_image.get_fdata()

        label_image = nib.load(label_path)
        label_image = label_image.get_fdata()

        label_image[label_image == 2] = 1

        zoom_factors = (128 / ct_image.shape[0], 128 / ct_image.shape[1], 64 / ct_image.shape[2])
        ct_image = scipy.ndimage.zoom(ct_image, zoom_factors, order=1)
        label_image = scipy.ndimage.zoom(label_image, zoom_factors, order=0)
        
        min_value = np.min(ct_image)
        max_value = np.max(ct_image)
        ct_image = (ct_image - min_value) / (max_value - min_value)

        ct_image = self.transform(ct_image.astype(np.float32))
        label_image = self.transform(label_image.astype(np.float32))

        return {'ct': ct_image, 'label': label_image}

# Transform
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Lambda(lambda x: x.unsqueeze(0)),  # Add a channel dimension
])

train_dataset = PancreasDataset(ct_paths_train, label_paths_train, transform)
test_dataset = PancreasDataset(ct_paths_test, label_paths_test, transform)

# Enter the batch size
batch_size = 1
trainloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0)
testloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=0)

print(f'Trainloader size with batch size = {batch_size}: {len(trainloader)}')
print(f'Testloader size with batch size = {batch_size}: {len(testloader)}')

Trainloader size with batch size = 1: 185
Testloader size with batch size = 1: 47


In [7]:
# Just to check the tensor
for batch in trainloader:
    ct_images = batch['ct']
    label_images = batch['label']

    test = label_images
    # print({test})

    print(f'Unique values: {torch.unique(test)}')
    print(f'Minimum value in test slices: {torch.min(test)}')
    print(f'Maximum value in test slices: {torch.max(test)}')

    print(f'Batch CT slices shape: {ct_images.shape}')
    print(f'Batch Label slices shape: {label_images.shape}')

    ct_slices_1 = ct_images[0].squeeze().numpy().squeeze()
    label_slices_1 = label_images[0].squeeze().numpy().squeeze()
    break

Unique values: tensor([0., 1.])
Minimum value in test slices: 0.0
Maximum value in test slices: 1.0
Batch CT slices shape: torch.Size([1, 1, 64, 128, 128])
Batch Label slices shape: torch.Size([1, 1, 64, 128, 128])


In [None]:
# Just to visualize the tensor
for i in range(ct_slices_1.shape[0]):
    ar = ct_slices_1[i,:,:]
    ar_lbl = label_slices_1[i,:,:]
    fig = plt.figure(figsize=(8,5))


    plt.subplot(1,2,1)
    plt.imshow(ar)
    plt.title('Abdominal CT')

    plt.subplot(1,2,2)
    plt.imshow(ar_lbl)
    plt.title('Pancreas Mask')

    plt.show()

In [8]:
# U-Net 3D structure
class DoubleConv3D(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(DoubleConv3D, self).__init__()
        self.double_conv = nn.Sequential(
            nn.Conv3d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm3d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv3d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm3d(out_channels),
            nn.ReLU(inplace=True),
        )

    def forward(self, x):
        return self.double_conv(x)


class UNet3D(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(UNet3D, self).__init__()
        self.encoder1 = DoubleConv3D(in_channels, 64)
        self.pool1 = nn.MaxPool3d(kernel_size=2, stride=2)
        self.encoder2 = DoubleConv3D(64, 128)
        self.pool2 = nn.MaxPool3d(kernel_size=2, stride=2)
        self.encoder3 = DoubleConv3D(128, 256)
        self.pool3 = nn.MaxPool3d(kernel_size=2, stride=2)

        self.bottleneck = DoubleConv3D(256, 512)

        self.upconv3 = nn.ConvTranspose3d(512, 256, kernel_size=2, stride=2)
        self.decoder3 = DoubleConv3D(512, 256)
        self.upconv2 = nn.ConvTranspose3d(256, 128, kernel_size=2, stride=2)
        self.decoder2 = DoubleConv3D(256, 128)
        self.upconv1 = nn.ConvTranspose3d(128, 64, kernel_size=2, stride=2)
        self.decoder1 = DoubleConv3D(128, 64)

        self.final_conv = nn.Conv3d(64, out_channels, kernel_size=1)

    def forward(self, x):
        enc1 = self.encoder1(x)
        enc1_pool = self.pool1(enc1)
        enc2 = self.encoder2(enc1_pool)
        enc2_pool = self.pool2(enc2)
        enc3 = self.encoder3(enc2_pool)
        enc3_pool = self.pool3(enc3)

        bottleneck = self.bottleneck(enc3_pool)

        dec3 = self.upconv3(bottleneck)
        dec3 = torch.cat([enc3, dec3], dim=1)
        dec3 = self.decoder3(dec3)
        dec2 = self.upconv2(dec3)
        dec2 = torch.cat([enc2, dec2], dim=1)
        dec2 = self.decoder2(dec2)
        dec1 = self.upconv1(dec2)
        dec1 = torch.cat([enc1, dec1], dim=1)
        dec1 = self.decoder1(dec1)

        output = self.final_conv(dec1)

        return output

In [9]:
# Just to check that U-Net is running correctly
in_channels = 1
out_channels = 1
model = UNet3D(in_channels, out_channels).to(device)

# Enter desired tensor shape
input_data = torch.randn(1, 1, 64, 128, 128).to(device)
output_data = model(input_data)
print(output_data.size())

torch.Size([1, 1, 64, 128, 128])


In [None]:
# Clear the memory
del input_data
del output_data
del model
torch.cuda.empty_cache()
print(torch.cuda.memory_allocated()/1024**2)
print(torch.cuda.memory_cached()/1024**2)

In [None]:
# Model initialization
model = UNet3D(1,1)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

criterion = nn.BCEWithLogitsLoss()
# criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

num_epochs = 100

# Initializing lists for storing loss data
train_avg_losses = []
train_losses = []
test_losses = []

best_test_loss = float('inf')

for epoch in range(num_epochs):

    # Training
    model.train()
    train_loss = 0.0
    for batch in trainloader:
        ct_images = batch['ct'].to(device)
        labels = batch['label'].to(device)
        labels = labels

        optimizer.zero_grad()

        outputs = model(ct_images)

        loss = criterion(outputs, labels)

        loss.backward()
        optimizer.step()

        train_loss += loss.item()
    
    train_loss /= len(trainloader)
    train_avg_losses.append(train_loss)
    train_losses.append(loss.item())

    # Testing
    model.eval()
    test_loss = 0.0

    with torch.no_grad():
        for batch in testloader:
            ct_images_test = batch['ct'].to(device)
            labels_test = batch['label'].to(device)
            labels_test = labels_test

            outputs_test = model(ct_images_test)
            test_loss += criterion(outputs_test, labels_test).item()

    # Saving loss data
    test_loss /= len(testloader)
    test_losses.append(test_loss)

    # Saving the best model
    if test_loss < best_test_loss:
        best_test_loss = test_loss
        # Enter the path for saving the model
        torch.save(model.state_dict(), 'D:/Task07_Pancreas/best_model.pth')

    # Training progress
    print(f'Epoch [{epoch + 1}/{num_epochs}], Train Last Loss: {loss.item():.4f}, Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}')

In [None]:
# Loss function change chart
plt.figure(figsize=(10, 5))
plt.plot(train_avg_losses, label='Train Loss', color='blue')
plt.plot(train_losses, label='Train Last Loss', linestyle='dashed', color='blue')

plt.plot(test_losses, label='Test Loss', color='red')

plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Testing Loss Over Time')
plt.legend()
plt.ylim(0, 0.2)
# plt.xlim(0, 150)

plt.show()

In [11]:
# Load saved model
model = UNet3D(1,1)
# Enter the path to the saved model
model.load_state_dict(torch.load('D:/Task07_Pancreas/best_model.pth'))
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
model.eval()

UNet3D(
  (encoder1): DoubleConv3D(
    (double_conv): Sequential(
      (0): Conv3d(1, 64, kernel_size=(3, 3, 3), stride=(1, 1, 1), padding=(1, 1, 1))
      (1): BatchNorm3d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU(inplace=True)
      (3): Conv3d(64, 64, kernel_size=(3, 3, 3), stride=(1, 1, 1), padding=(1, 1, 1))
      (4): BatchNorm3d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (5): ReLU(inplace=True)
    )
  )
  (pool1): MaxPool3d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (encoder2): DoubleConv3D(
    (double_conv): Sequential(
      (0): Conv3d(64, 128, kernel_size=(3, 3, 3), stride=(1, 1, 1), padding=(1, 1, 1))
      (1): BatchNorm3d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU(inplace=True)
      (3): Conv3d(128, 128, kernel_size=(3, 3, 3), stride=(1, 1, 1), padding=(1, 1, 1))
      (4): BatchNorm3d(128, eps=1e-05, momentum=0.1, affine=True, tr

In [12]:
data = []
n = 0
for batch in testloader:
    ct_images = batch['ct'].to(device)
    labels = batch['label'].to(device)
    with torch.no_grad():
        outputs = model(ct_images)
        outputs = torch.sigmoid(outputs)
        for i in range(ct_images.shape[0]):
            filename = ct_paths[n][-19:-7]
            n += 1
            ct_images_np = ct_images[i].squeeze().cpu().numpy().squeeze()
            outputs_np = outputs[i].squeeze().cpu().numpy().squeeze()
            labels_np = labels[i].squeeze().cpu().numpy().squeeze()
            mean_value = 0.492
            outputs_np = (outputs_np > mean_value)
            outputs_np_float = outputs_np.astype(float)

            ## Code block for isolating the largest 3D object in the array
            # structure = generate_binary_structure(3, 1)  # Define the structure for determining connected components
            # labeled_array, num_features = label(outputs_np, structure)
            # component_sizes = nd_sum(outputs_np, labeled_array, range(1, num_features + 1))
            # largest_component_index = np.argmax(component_sizes) + 1
            # largest_component = np.zeros_like(outputs_np)
            # largest_component[labeled_array == largest_component_index] = 1
            # outputs_np = largest_component

            # IoU (Jaccard Index)
            intersection = np.sum((labels_np == 1) & (outputs_np == 1))
            union = np.sum((labels_np == 1) | (outputs_np == 1))
            iou = intersection / union
            
            # Dice Coefficient (F1 Score)
            dice = 2 * np.sum((labels_np == 1) & (outputs_np == 1)) / (np.sum(labels_np == 1) + np.sum(outputs_np == 1))

            # # Code block for visualizing the work of the model
            # print(filename)
            # print(iou)
            # print(dice)
            # if dice >= 0.85:
            #     for i in range(ct_images_np.shape[0]):
            #         ar = ct_images_np[i,:,:]
            #         ar_lbl = labels_np[i,:,:]
            #         ar_out = outputs_np[i,:,:]
            #         if np.sum(ar_lbl) > 0: # to show only slices with pancreas
            #             fig = plt.figure(figsize=(18,15))

            #             plt.subplot(1,3,1)
            #             plt.imshow(ar)

            #             plt.subplot(1,3,2)
            #             plt.imshow(ar_lbl)

            #             plt.subplot(1,3,3)
            #             plt.imshow(ar_out)

            #             plt.show()
            # print()

            data.append([filename, iou, dice])

df = pd.DataFrame(data, columns=['Filename', 'IoU', 'Dice'])

In [None]:
df

In [13]:
# Statistics
# Calculating mean and median for the IoU and Dice
iou_mean = df['IoU'].mean()
dice_mean = df['Dice'].mean()
iou_median = df['IoU'].median()
dice_median = df['Dice'].median()

n = len(df)
confidence_level = 0.95

# Calculating the standard deviation for the IoU and Dice
iou_std = df['IoU'].std()
dice_std = df['Dice'].std()

# Calculating the standard error for the IoU and Dice
iou_stderr = iou_std / (n**0.5)
dice_stderr = dice_std / (n**0.5)

# Calculating T-value for the given confidence level
t_value = stats.t.ppf((1 + confidence_level) / 2, n - 1)

# Calculating the margin of error for the IoU and Dice
iou_margin_error = t_value * iou_stderr
dice_margin_error = t_value * dice_stderr

print(f"Mean IoU value: {iou_mean:.4f} ± {iou_margin_error:.4f}")
print(f"Mean Dice value: {dice_mean:.4f} ± {dice_margin_error:.4f}")

print(f"Median IoU value: {iou_median:.4f}")
print(f"Median Dice value: {dice_median:.4f}")

Mean IoU value: 0.6957 ± 0.0372
Mean Dice value: 0.8136 ± 0.0281
Median IoU value: 0.7491
Median Dice value: 0.8565
