In [None]:
import csv
import os

import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

import SimpleITK as sitk

%matplotlib inline

In [None]:
SEED = 42
IMG_DEPTH = 128
DATA_PATH = ".\\data\\"

In [None]:
import csv


def write_data_file_paths():
    with open(f"{DATA_PATH}\\file_paths.csv", "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(["image", "mask", "label"])
        for sbj in range(100):
            if os.path.exists(f"{DATA_PATH}\\0\\{sbj:02}\\CT_partition.npy"):
                label = 0
            elif os.path.exists(f"{DATA_PATH}\\1\\{sbj:02}\\CT_partition.npy"):
                label = 1
            else:
                continue

            writer.writerows(
                [
                    [
                        f"{label}\\{sbj:02}\\CT_partition.npy",
                        f"{label}\\{sbj:02}\\CT_mask.npy",
                        label,
                    ],
                    [
                        f"{label}\\{sbj:02}\\PT_partition.npy",
                        f"{label}\\{sbj:02}\\PT_mask.npy",
                        label,
                    ],
                ]
            )

write_data_file_paths()

### Classical Features

In [None]:
def load_haralick_features():
    try:
        data = pd.read_csv(f"haralick.csv", index_col=0)
        labels = data.pop("y")
        return data, labels

    except FileNotFoundError:
        print("File not found, generating...")
        data, labels = [], []

        with open(f"{DATA_PATH}\\file_paths.csv", "r") as f:
            reader = csv.reader(f)
            next(reader)
            for image_path, mask_path, label in reader:
                image = np.load(f"{DATA_PATH}\\{image_path}")
                mask = np.load(f"{DATA_PATH}\\{mask_path}")
                ...

## Scikit-Learn

In [None]:
from sklearn.decomposition import PCA
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import (
    accuracy_score,
    confusion_matrix,
    roc_auc_score,
)
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import MinMaxScaler
from sklearn.svm import SVC

In [None]:
def print_metrics(y_true, y_pred, model_name) -> None:
    cm = confusion_matrix(y_true, y_pred)
    tn, fp, fn, tp = cm.ravel()
    print(model_name)
    print(f"Sensitivity: {tp / (tp + fn) * 100:.1f}%")
    print(f"Specificity: {tn / (tn + fp) * 100:.1f}%")
    print(f"Accuracy: {accuracy_score(y_true, y_pred) * 100:.1f}%")
    print(f"ROC-AUC: {roc_auc_score(y_true, y_pred) * 100:.1f}%")

    plt.imshow(cm, cmap=mpl.colormaps["Blues"])
    plt.colorbar()
    plt.title("Confusion matrix")
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.xticks([0, 1], ["negative", "positive"])
    plt.yticks([0, 1], ["negative", "positive"])
    for (j, i), label in np.ndenumerate(cm):
        color = "darkblue" if label < cm.max() / 2 else "white"
        plt.text(i, j, label, color=color)
    plt.show()

In [None]:
X_train, X_test, y_train, y_test = train_test_split(
    data, labels, test_size=0.2, random_state=SEED
)

svc_pipe = Pipeline(
    [
        ("scaler", MinMaxScaler()),
        # ("pca", PCA(0.9)),
        ("svc", SVC(kernel="linear", random_state=SEED)),
    ]
)
rf_pipe = Pipeline(
    [
        ("scaler", MinMaxScaler()),
        # ("pca", PCA(0.9)),
        ("rf", RandomForestClassifier(criterion="entropy", random_state=SEED)),
    ]
)

SVC

In [None]:
svc_pipe.fit(X_train, y_train)
y_pred = svc_pipe.predict(X_test)
print_metrics(y_test, y_pred, "Linear SVC")

Random Forest

In [None]:
rf_pipe.fit(X_train, y_train)
y_pred = rf_pipe.predict(X_test)
print_metrics(y_test, y_pred, "Random Forest")

## PyTorch

In [None]:
import torch
import torch.nn as nn

from data import CTData
from unet import UNet, DiceLoss

In [None]:
IMAGE_DEPTH = 16
IMAGE_SIZE = 64
BATCH_SIZE = 8
EPOCHS = 10

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device

Load data

In [None]:
def save_rois():
    for sbj in range(100):
        if sbj % 10 == 9:
            print(f"{sbj + 1} / 100 patients loaded")

        if os.path.exists(f"{DATA_PATH}/CLASS1_MALIGNANT/LCp{sbj:04}_biobank"):
            label = "CLASS1_MALIGNANT"
        elif os.path.exists(f"{DATA_PATH}/CLASS2_BENIGN/LCp{sbj:04}_biobank"):
            label = "CLASS2_BENIGN"
        else:
            continue

        ct_scan = sitk.ReadImage(
            f"{DATA_PATH}/{label}/LCp{sbj:04}_biobank/LCp{sbj:04}_CT_partition.nii"
        )
        ct_scan = sitk.GetArrayFromImage(ct_scan)

        mask = sitk.ReadImage(
            f"{DATA_PATH}/{label}/LCp{sbj:04}_biobank/LCp{sbj:04}_CT_mask.nii"
        )
        mask = sitk.GetArrayFromImage(mask)

        roi = np.unique(np.where(mask == 1), axis=1)
        roi_cx, roi_cy, roi_cz = (roi.max(axis=1) + roi.min(axis=1)) // 2
        bounding_box = mask[
            roi_cx - IMAGE_DEPTH // 2 : roi_cx + IMAGE_DEPTH // 2,
            roi_cy - IMAGE_SIZE // 2 : roi_cy + IMAGE_SIZE // 2,
            roi_cz - IMAGE_SIZE // 2 : roi_cz + IMAGE_SIZE // 2,
        ]
        image_out = ct_scan[
            roi_cx - IMAGE_DEPTH // 2 : roi_cx + IMAGE_DEPTH // 2,
            roi_cy - IMAGE_SIZE // 2 : roi_cy + IMAGE_SIZE // 2,
            roi_cz - IMAGE_SIZE // 2 : roi_cz + IMAGE_SIZE // 2,
        ]

        os.mkdir(f"{DATA_PATH}/segmentation/{sbj:04}")
        np.save(f"{DATA_PATH}/segmentation/{sbj:04}/CT_image.npy", image_out)
        np.save(f"{DATA_PATH}/segmentation/{sbj:04}/CT_mask.npy", bounding_box)

save_rois()

In [None]:
images, masks = [], []
for sbj in range(100):
    try:
        images.append(np.load(f"{DATA_PATH}/segmentation/{sbj:04}/CT_image.npy"))
        masks.append(np.load(f"{DATA_PATH}/segmentation/{sbj:04}/CT_mask.npy"))

    except FileNotFoundError:
        continue

images = np.array(images)
masks = np.array(masks)

In [None]:
train_images, test_images, train_masks, test_masks = train_test_split(
    images, masks, test_size=0.3
)

In [None]:
train_dataloader = torch.utils.data.DataLoader(
    CTData(train_images, train_masks), BATCH_SIZE, shuffle=True
)
test_dataloader = torch.utils.data.DataLoader(
    CTData(test_images, test_masks), 1, shuffle=False
)

In [None]:
model = UNet(residual=False, cat=True).to(device)
optimizer = torch.optim.Adam(model.parameters())
model.load_state_dict(torch.load(".\\models\\conv_l1_cat_500.pt"))

In [None]:
def threshold(data: torch.Tensor, level: float = 0.5) -> torch.Tensor:
    scaled = (data - data.min()) / (data.max() - data.min())
    scaled[scaled < level] = 0
    scaled[scaled >= level] = 1
    return scaled


def pixel_accuracy(pred: torch.Tensor, true: torch.Tensor) -> torch.Tensor:
    return torch.sum(pred == true) / np.prod(pred.shape)


def iou(pred: torch.Tensor, true: torch.Tensor) -> torch.Tensor:
    intersection = torch.logical_and(true, pred)
    union = torch.logical_or(true, pred)
    return torch.sum(intersection) / torch.sum(union)


def dice_coeff(pred: torch.Tensor, true: torch.Tensor) -> torch.Tensor:
    intersection = torch.logical_and(true, pred)
    return 2 * torch.sum(intersection) / (torch.sum(true) + torch.sum(pred))


In [None]:
pixel_loss, iou_loss, dice_loss = [], [], []

for epochs in range(EPOCHS):
    print(f"Epoch {epochs + 1} / {EPOCHS}")
    model.train()
    for image, mask in train_dataloader:
        image = image.to(device)
        mask = mask.to(device)
        
        optimizer.zero_grad()
        pred = threshold(model(image))
        loss = iou(pred, mask)
        loss.backward()
        optimizer.step()

    model.eval()
    epoch_pixel_loss, epoch_iou_loss, epoch_dice_loss = [], [], []
    for image, mask in test_dataloader:
        image = image.to(device)
        mask = mask.to(device)
        pred = threshold(model(image))

        pixel = pixel_accuracy(pred, mask)
        jaccard = iou(pred, mask)
        dice = dice_coeff(pred, mask)
        
        print(f"Pixel accuracy: {pixel_accuracy(pred, mask):.4f}")
        print(f"IOU (Jaccard): {iou(pred, mask):.4f}")
        print(f"Dice coefficient (F1-score): {dice_coeff(pred, mask):.4f}")

        epoch_pixel_loss.append(pixel)
        epoch_iou_loss.append(jaccard)
        epoch_dice_loss.append(dice)
    
    pixel_loss.append(np.mean(epoch_pixel_loss))
    iou_loss.append(np.mean(epoch_iou_loss))
    dice_loss.append(np.mean(epoch_dice_loss))

In [None]:
iter_data = iter(test_dataloader)

In [None]:
image, mask = next(iter_data)
pred = threshold(model(image))

print(f"Pixel accuracy: {pixel_accuracy(pred, mask):.4f}")
print(f"IOU (Jaccard): {iou(pred, mask):.4f}")
print(f"Dice coefficient (F1-score): {dice_coeff(pred, mask):.4f}")

image = image.cpu().detach().numpy()
pred = pred.cpu().detach().numpy()
mask = mask.cpu().detach().numpy()

fig, ax = plt.subplots(4, 12)
for slc in range(16):
    r, c = divmod(slc, 4)

    ax[r, c].imshow(image[0, 0, slc, :, :])
    ax[r, c + 4].imshow(pred[0, 0, slc, :, :])
    ax[r, c + 8].imshow(mask[0, 0, slc, :, :])

    ax[r, c].axis("off")
    ax[r, c + 4].axis("off")
    ax[r, c + 8].axis("off")