<a href="https://colab.research.google.com/github/SaraElwatany/Lung-TumorDetection-Segmentation/blob/main/comparison.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
import torch
import torch.nn as nn
from PIL import Image
from google.colab import drive
import matplotlib.pyplot as plt
import torchvision.transforms as T
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader, random_split

In [None]:
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

## **DataLoading & Splitting**

In [None]:
dataset_path = '/content/drive/MyDrive/LungTumorDetectionAndSegmentation'

In [None]:
class LungTumorSegmentationDataset(Dataset):

    def __init__(self, root_path, transform=None, mask_transform=None):

        self.images = []
        for subject in os.listdir(os.path.join(root_path, 'images')):
            subject_path = os.path.join(root_path, 'images', subject)
            for image_file in os.listdir(subject_path):
                self.images.append(os.path.join(subject_path, image_file))

        self.masks = [img_path.replace('images', 'masks') for img_path in self.images]
        self.transform = transform
        self.mask_transform = mask_transform


    def __len__(self):
        return len(self.images)


    def __getitem__(self, idx):

        image_path = self.images[idx]
        mask_path = self.masks[idx]

        image = Image.open(image_path).convert("L")  # grayscale image
        mask = Image.open(mask_path).convert("L")    # mask as single-channel

        if self.transform:
            image = self.transform(image)
        if self.mask_transform:
            mask = self.mask_transform(mask)
            mask = (mask > 0.5).float()  # Ensure binary values

        return image_path, image, mask

In [None]:
image_transform = T.Compose([
                              T.Resize((256, 256)),
                              T.ToTensor(),
                           ])

In [None]:
train_dataset = LungTumorSegmentationDataset(os.path.join(dataset_path, 'train'), image_transform, image_transform)
val_dataset = LungTumorSegmentationDataset(os.path.join(dataset_path, 'val'), image_transform, image_transform)

In [None]:
print('Length of the training data:', len(train_dataset))
print('Length of the validation data:', len(val_dataset))

Length of the training data: 1832
Length of the validation data: 98


In [None]:
print('Length of the training data:', len(train_dataset))
print('Length of the validation data:', len(val_dataset))

Length of the training data: 1500
Length of the test data: 332
Length of the validation data: 98


In [None]:
train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=False)
val_dataloader = DataLoader(val_dataset, batch_size=32, shuffle=False)

In [None]:
train_batches = iter(train_dataloader)
val_batches = iter(val_dataloader)

for train_sample, val_sample in zip(train_batches, val_batches):
    print('Shape of the first training sample & corresponding mask:', train_sample[1][0].shape, train_sample[2][0].shape)
    print('Shape of the first validation sample & corresponding mask:', val_sample[1][0].shape, val_sample[2][0].shape)
    break

Shape of the first training sample & corresponding mask: torch.Size([1, 256, 256]) torch.Size([1, 256, 256])
Shape of the first test sample & corresponding mask: torch.Size([1, 256, 256]) torch.Size([1, 256, 256])
Shape of the first validation sample & corresponding mask: torch.Size([1, 256, 256]) torch.Size([1, 256, 256])


## **Validation on Whole Images**

In [None]:
def double_convolution(in_channels, out_channels):
    """
    In the original paper implementation, the convolution operations were
    not padded but we are padding them here. This is because, we need the
    output result size to be same as input size.
    """
    conv_op = nn.Sequential(
                            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
                            nn.ReLU(inplace=True),

                            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
                            nn.ReLU(inplace=True)
                           )
    return conv_op

In [None]:
class UNet(nn.Module):


    def __init__(self, num_classes):

        super(UNet, self).__init__()

        self.max_pool2d = nn.MaxPool2d(kernel_size=2, stride=2)

        # Contracting path.

        # Each convolution is applied twice.
        self.down_convolution_1 = double_convolution(1, 64)
        self.down_convolution_2 = double_convolution(64, 128)
        self.down_convolution_3 = double_convolution(128, 256)
        self.down_convolution_4 = double_convolution(256, 512)
        self.down_convolution_5 = double_convolution(512, 1024)


        # Expanding path.
        self.up_transpose_1 = nn.ConvTranspose2d(
                                                in_channels=1024, out_channels=512,
                                                kernel_size=2,
                                                stride=2)

        # Below, `in_channels` again becomes 1024 as we are concatinating.
        self.up_convolution_1 = double_convolution(1024, 512)

        self.up_transpose_2 = nn.ConvTranspose2d(
                                                  in_channels=512, out_channels=256,
                                                  kernel_size=2,
                                                  stride=2)

        self.up_convolution_2 = double_convolution(512, 256)

        self.up_transpose_3 = nn.ConvTranspose2d(
                                                  in_channels=256, out_channels=128,
                                                  kernel_size=2,
                                                  stride=2)

        self.up_convolution_3 = double_convolution(256, 128)

        self.up_transpose_4 = nn.ConvTranspose2d(
                                                  in_channels=128, out_channels=64,
                                                  kernel_size=2,
                                                  stride=2)

        self.up_convolution_4 = double_convolution(128, 64)

        # output => `out_channels` as per the number of classes.
        self.out = nn.Conv2d(
                              in_channels=64, out_channels=num_classes,
                              kernel_size=1
                            )



    def forward(self, x):

        down_1 = self.down_convolution_1(x)
        down_2 = self.max_pool2d(down_1)
        down_3 = self.down_convolution_2(down_2)
        down_4 = self.max_pool2d(down_3)
        down_5 = self.down_convolution_3(down_4)
        down_6 = self.max_pool2d(down_5)
        down_7 = self.down_convolution_4(down_6)
        down_8 = self.max_pool2d(down_7)
        down_9 = self.down_convolution_5(down_8)

        # *** DO NOT APPLY MAX POOL TO down_9 ***

        up_1 = self.up_transpose_1(down_9)
        x = self.up_convolution_1(torch.cat([down_7, up_1], 1))

        up_2 = self.up_transpose_2(x)
        x = self.up_convolution_2(torch.cat([down_5, up_2], 1))

        up_3 = self.up_transpose_3(x)
        x = self.up_convolution_3(torch.cat([down_3, up_3], 1))

        up_4 = self.up_transpose_4(x)
        x = self.up_convolution_4(torch.cat([down_1, up_4], 1))

        out = self.out(x)

        return out

In [None]:
def evaluate_model(test_loader, model, criterion, device):

    model.eval()
    test_loss = 0.0  # Initialize the test loss

    with torch.no_grad():  # Disable gradient computation
        for images_path, images, masks in test_loader:
            images, masks = images.to(device), masks.to(device)

            output = model(images)
            loss = criterion(output, masks)

            test_loss += loss.item()

    test_loss /= len(test_loader)  # Average over all batches
    print(f'Test Loss: {test_loss:.4f}')

    return test_loss

In [None]:
model = UNet(num_classes=1).to(device)

In [None]:
model.load_state_dict(torch.load(f"/content/drive/MyDrive/unet_lung_segmentation_0.002502174032005397.pth", map_location=device))
model.to(device)

UNet(
  (max_pool2d): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (down_convolution_1): Sequential(
    (0): Conv2d(1, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU(inplace=True)
  )
  (down_convolution_2): Sequential(
    (0): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU(inplace=True)
  )
  (down_convolution_3): Sequential(
    (0): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU(inplace=True)
  )
  (down_convolution_4): Sequential(
    (0): Conv2d(256, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(51

In [None]:
criterion = nn.BCEWithLogitsLoss()

In [None]:
# Evaluate on validation data
val_loss = evaluate_model(val_dataloader, model, criterion, device)

## **Validation on detected images (2 stages modelling)**