In [None]:
pip install SimpleITK

Collecting SimpleITK
  Downloading SimpleITK-2.3.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (52.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m52.7/52.7 MB[0m [31m8.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: SimpleITK
Successfully installed SimpleITK-2.3.1


In [None]:
import numpy as np
import pandas as pd
import SimpleITK as sitk
from glob import glob
from tqdm import tqdm
from sklearn.model_selection import train_test_split
import cv2
from google.colab import drive

drive.mount('/content/drive')

# Define paths and load the dataset
file_path_0 = "/content/drive/MyDrive/Image Processing/LUNA/subsets/subset0"
file_path_1 = "/content/drive/MyDrive/Image Processing/LUNA/subset1/subset1"
annotations_path = "/content/drive/MyDrive/Image Processing/LUNA/annotations.csv"

# Getting list of image files from both subsets
file_list_0 = glob(file_path_0 + "/*.mhd")
file_list_1 = glob(file_path_1 + "/*.mhd")

# Combine file lists and remove duplicates
file_list = list(set(file_list_0 + file_list_1))

# Function to make rectangular mask
def make_rectangular_mask(center, diam, z, width, height, spacing, origin):
    mask = np.zeros([height, width], dtype=np.uint8)
    v_center = (center - origin) / spacing
    v_xmin = int(v_center[0] - diam / spacing[0] / 2)
    v_xmax = int(v_center[0] + diam / spacing[0] / 2)
    v_ymin = int(v_center[1] - diam / spacing[1] / 2)
    v_ymax = int(v_center[1] + diam / spacing[1] / 2)

    # Ensure the coordinates are within the image boundaries
    v_xmin = max(v_xmin, 0)
    v_xmax = min(v_xmax, width - 1)
    v_ymin = max(v_ymin, 0)
    v_ymax = min(v_ymax, height - 1)

    mask[v_ymin:v_ymax, v_xmin:v_xmax] = 1

    return mask

# Function to get filename
def get_filename(file_list, case):
    for f in file_list:
        if case in f:
            return f

# Load annotations
df_node = pd.read_csv(annotations_path)
df_node["file"] = df_node["seriesuid"].map(lambda file_name: get_filename(file_list, file_name))
df_node = df_node.dropna()

# Define DataFrame columns
columns = ["seriesuid", "sliceindex", "imagedata", "maskdata", "class"]
data = []

# Define target size for downsampling
target_size = (256, 256)

for img_file in tqdm(file_list):
    mini_df = df_node[df_node["file"] == img_file]
    if mini_df.shape[0] > 0:
        itk_img = sitk.ReadImage(img_file)
        img_array = sitk.GetArrayFromImage(itk_img)
        num_z, height, width = img_array.shape
        origin = np.array(itk_img.GetOrigin())
        spacing = np.array(itk_img.GetSpacing())

        for _, row in mini_df.iterrows():
            node_x, node_y, node_z = row["coordX"], row["coordY"], row["coordZ"]
            diam = row["diameter_mm"]
            center = np.array([node_x, node_y, node_z])
            v_center = np.rint((center - origin) / spacing)
            i_z = int(v_center[2])

            masks = np.zeros((num_z, height, width), dtype=np.uint8)
            mask = make_rectangular_mask(center, diam, i_z * spacing[2] + origin[2], width, height, spacing, origin)
            masks[i_z] = mask

            classes = np.zeros(num_z, dtype=np.uint8)
            classes[i_z] = 1

            for idx in range(num_z):
                # Downsample image and mask
                img_resized = cv2.resize(img_array[idx], target_size, interpolation=cv2.INTER_AREA)
                mask_resized = cv2.resize(masks[idx], target_size, interpolation=cv2.INTER_NEAREST)
                data.append([row["seriesuid"], idx, img_resized, mask_resized, classes[idx]])

df_slices = pd.DataFrame(data, columns=columns)
df_slices.drop_duplicates(subset=['seriesuid', 'sliceindex', 'class'], inplace=True)
df_slices.sort_values(by='class', ascending=False, inplace=True)
df_slices.drop_duplicates(subset=['seriesuid', 'sliceindex'], inplace=True)

# Split the data into training and validation sets
train_df, val_df = train_test_split(df_slices, test_size=0.2, stratify=df_slices['class'], random_state=42)


Mounted at /content/drive


100%|██████████| 89/89 [06:08<00:00,  4.14s/it]


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision.models.detection.retinanet import retinanet_resnet50_fpn_v2, RetinaNet_ResNet50_FPN_V2_Weights
import torch.nn.functional as F
import matplotlib.pyplot as plt
from tqdm import tqdm
from torch.cuda.amp import autocast, GradScaler
import numpy as np
from sklearn.model_selection import train_test_split
from glob import glob
import SimpleITK as sitk

# Focal Loss Definition
class FocalLoss(nn.Module):
    def __init__(self, alpha=1, gamma=2, logits=False, reduce=True):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.logits = logits
        self.reduce = reduce

    def forward(self, inputs, targets):
        if self.logits:
            BCE_loss = F.binary_cross_entropy_with_logits(inputs, targets, reduction='none')
        else:
            BCE_loss = F.binary_cross_entropy(inputs, targets, reduction='none')
        pt = torch.exp(-BCE_loss)
        F_loss = self.alpha * (1-pt)**self.gamma * BCE_loss

        if self.reduce:
            return torch.mean(F_loss)
        else:
            return F_loss

# Function to calculate Intersection over Union (IoU)
def calculate_iou(box1, box2):
    xA = max(box1[0], box2[0])
    yA = max(box1[1], box2[1])
    xB = min(box1[2], box2[2])
    yB = min(box1[3], box2[3])
    interArea = max(0, xB - xA + 1) * max(0, yB - yA + 1)
    box1Area = (box1[2] - box1[0] + 1) * (box1[3] - box1[1] + 1)
    box2Area = (box2[2] - box2[0] + 1) * (box2[3] - box2[1] + 1)
    iou = interArea / float(box1Area + box2Area - interArea)
    return iou

# Function to calculate mean Average Precision (mAP)
def calculate_map(pred_boxes, true_boxes, iou_threshold=0.5):
    true_positives = []
    false_positives = []
    scores = []
    num_gt_boxes = len(true_boxes)

    for pred_box in pred_boxes:
        scores.append(pred_box[4])
        pred_box = pred_box[:4]

        if len(true_boxes) == 0:
            false_positives.append(1)
            true_positives.append(0)
            continue

        ious = np.array([calculate_iou(pred_box, gt_box) for gt_box in true_boxes])
        max_iou_idx = np.argmax(ious)
        max_iou = ious[max_iou_idx]

        if max_iou >= iou_threshold:
            true_positives.append(1)
            false_positives.append(0)
            true_boxes.pop(max_iou_idx)
        else:
            true_positives.append(0)
            false_positives.append(1)

    cum_true_positives = np.cumsum(true_positives)
    cum_false_positives = np.cumsum(false_positives)

    precision = cum_true_positives / (cum_true_positives + cum_false_positives)
    recall = cum_true_positives / num_gt_boxes

    return np.mean(precision), np.mean(recall)

def calculate_metrics(model, val_loader):
    model.eval()  # Set the model to evaluation mode
    iou_scores = []
    map_scores = []

    with torch.no_grad():  # No gradient calculation
        for x_val, y_val in val_loader:
            x_val = x_val.to(device)
            y_val = [{k: v.to(device) for k, v in t.items()} for t in y_val]

            outputs = model(x_val)

            for output, target in zip(outputs, y_val):
                pred_boxes = output['boxes'].cpu().numpy()
                pred_scores = output['scores'].cpu().numpy()
                true_boxes = target['boxes'].cpu().numpy()

                pred_boxes_with_scores = [np.append(pred_box, score) for pred_box, score in zip(pred_boxes, pred_scores)]

                # Calculate IoU
                iou = np.mean([calculate_iou(pred_box, true_box) for pred_box, true_box in zip(pred_boxes, true_boxes)])
                iou_scores.append(iou)

                # Calculate mAP
                map_score, recall = calculate_map(pred_boxes_with_scores, true_boxes.tolist())
                map_scores.append(map_score)

    val_iou = np.mean(iou_scores)
    val_map = np.mean(map_scores)

    return val_iou, val_map


In [None]:
def train_model(model, train_loader, val_loader, optimizer, epochs=20):
    criterion_cls = FocalLoss(logits=True)  # Use focal loss for classification
    criterion_reg = nn.MSELoss()  # Keep MSELoss for regression
    scaler = GradScaler()  # Initialize the gradient scaler for mixed precision

    train_losses = []
    val_iou_scores = []
    val_map_scores = []
    val_accuracy_scores = []

    for epoch in range(epochs):
        model.train()  # Set model to training mode.
        running_loss = 0.0  # Initialize the running loss.
        correct_classifications = 0
        total_samples = 0

        for x_train, y_train in tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}"):
            optimizer.zero_grad()  # Zero the gradients at the start of a new batch.
            x_train = x_train.to(device)
            y_train = [{k: v.to(device) for k, v in t.items()} for t in y_train]

            with autocast():
                outputs = model(x_train)
                loss_dict = model(x_train, y_train)
                losses = sum(loss for loss in loss_dict.values())

            scaler.scale(losses).backward()  # Backpropagation with mixed precision
            scaler.step(optimizer)  # Update weights with scaled gradients
            scaler.update()  # Update the scale for the next iteration
            running_loss += losses.item()  # Accumulate the loss

        train_losses.append(running_loss / len(train_loader))

        # Calculate validation metrics
        val_iou, val_map = calculate_metrics(model, val_loader)
        val_iou_scores.append(val_iou)
        val_map_scores.append(val_map)

        print(f"Epoch {epoch+1}/{epochs}, Loss: {running_loss/len(train_loader):.4f}, Val IOU: {val_iou:.4f}, Val mAP: {val_map:.4f}")

    # Plotting loss and metrics
    epochs_range = range(1, epochs + 1)
    plt.figure(figsize=(12, 6))

    plt.subplot(1, 2, 1)
    plt.plot(epochs_range, train_losses, label='Training Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.title('Training Loss Over Epochs')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(epochs_range, val_iou_scores, label='Validation IOU')
    plt.plot(epochs_range, val_map_scores, label='Validation mAP')
    plt.xlabel('Epochs')
    plt.ylabel('Score')
    plt.title('Validation Metrics Over Epochs')
    plt.legend()

    plt.tight_layout()
    plt.show()

# Initialize and train the model
train_dataset = CustomDataset(train_df)
val_dataset = CustomDataset(val_df)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, num_workers=2, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False, num_workers=2, pin_memory=True)

model = retinanet_resnet50_fpn_v2(weights=RetinaNet_ResNet50_FPN_V2_Weights.DEFAULT)
model = model.to(device)

optimizer = optim.SGD(model.parameters(), lr=0.00001, momentum=0.9)
train_model(model, train_loader, val_loader, optimizer, epochs=20)



Downloading: "https://download.pytorch.org/models/retinanet_resnet50_fpn_v2_coco-5905b1c5.pth" to /root/.cache/torch/hub/checkpoints/retinanet_resnet50_fpn_v2_coco-5905b1c5.pth
100%|██████████| 146M/146M [00:02<00:00, 64.4MB/s]
Epoch 1/20:   0%|          | 0/921 [00:00<?, ?it/s]

##TESTING!!

In [None]:
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from torch.utils.data import DataLoader, Dataset


# Define a dataset class for the test data
class TestDataset(Dataset):
    def __init__(self, file_list):
        self.file_list = file_list

    def __len__(self):
        return len(self.file_list)

    def __getitem__(self, idx):
        file_path = self.file_list[idx]
        itk_img = sitk.ReadImage(file_path)
        img_array = sitk.GetArrayFromImage(itk_img)
        img_array = (img_array - img_array.min()) / (img_array.max() - img_array.min())
        img_tensor = torch.tensor(img_array, dtype=torch.float32).unsqueeze(0)  # Add channel dimension
        return img_tensor, file_path

# Load the test data
subset9_path = "/content/drive/MyDrive/Image Processing/LUNA/subsets/subset0"
file_list = glob(subset9_path + "/*.mhd")

test_dataset = TestDataset(file_list)
test_loader = DataLoader(test_dataset, batch_size=5, shuffle=False)  # Adjust the batch size as needed

# Function to visualize the results
def visualize_detections(images, file_paths, boxes, scores, threshold=0.5):
    batch_size = len(images)
    fig, axes = plt.subplots(batch_size, 2, figsize=(15, 5 * batch_size))

    for i in range(batch_size):
        image = images[i][0].cpu().numpy()
        file_path = file_paths[i]
        box = boxes[i]
        score = scores[i]

        # Normalize the image for visualization
        image = (image - image.min()) / (image.max() - image.min())

        axes[i, 0].imshow(image, cmap='gray')
        axes[i, 0].set_title(f'Image: {file_path}')
        axes[i, 0].axis('off')

        axes[i, 1].imshow(image, cmap='gray')
        has_nodule = False
        for b, s in zip(box, score):
            if s > threshold:
                has_nodule = True
                x1, y1, x2, y2 = b
                color = 'red' if s > 0.8 else 'blue'
                rect = patches.Rectangle((x1, y1), x2 - x1, y2 - y1, fill=False, edgecolor=color, linewidth=2)
                axes[i, 1].add_patch(rect)
                axes[i, 1].text(x1, y1, f'{s:.2f}', bbox=dict(facecolor='yellow', alpha=0.5))
        if not has_nodule:
            axes[i, 1].set_title('No Nodule Detected', color='green')
        else:
            axes[i, 1].set_title('Detections')
        axes[i, 1].axis('off')

    plt.tight_layout()
    plt.show()

# Function to run the model on the test data and visualize the results
def test_model(model, test_loader, device, threshold=0.5):
    model.eval()
    with torch.no_grad():
        for images, file_paths in tqdm(test_loader, desc="Testing"):
            images = images.to(device)
            outputs = model(images)

            # Get the predicted boxes and scores
            boxes = [output['boxes'].cpu().numpy() for output in outputs]
            scores = [output['scores'].cpu().numpy() for output in outputs]

            # Visualize the detections
            visualize_detections(images.cpu(), file_paths, boxes, scores, threshold)

# Use the trained model from the current session
model = model.to(device)  # Ensure the model is on the correct device

# Run the test and visualize results
test_model(model, test_loader, device)
