In [None]:
# Importing all necessary libraries

import os
import cv2
import glob
import PIL
import shutil
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import plotly.graph_objects as go
from skimage import data
from skimage.util import montage 
import skimage.transform as skTrans
from skimage.transform import rotate
from skimage.transform import resize
from PIL import Image, ImageOps  


# neural imaging
import nilearn as nl
import nibabel as nib
import nilearn.plotting as nlplt
# !pip install git+https://github.com/miykael/gif_your_nifti # nifti to gif 
# import gif_your_nifti.core as gif2nif


# ml libs
import keras
import keras.backend as K
from keras.callbacks import CSVLogger
from keras.layers import Flatten
import tensorflow as tf
from tensorflow.keras.utils import plot_model
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from tensorflow.keras.models import *
from tensorflow.keras.layers import *
from tensorflow.keras.optimizers import *
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, EarlyStopping, TensorBoard
#from tensorflow.keras.layers.experimental import preprocessing

# Make numpy printouts easier to read.
np.set_printoptions(precision=3, suppress=True)

In [None]:
# DEFINE seg-areas  
SEGMENT_CLASSES = {
    0 : 'NOT tumor',
    1 : 'NECROTIC/CORE', # or NON-ENHANCING tumor CORE
    2 : 'EDEMA',
    3 : 'ENHANCING' # original 4 -> converted into 3 later
}

# there are 155 slices per volume
# to start at 5 and use 145 slices means we will skip the first 5 and last 5 
VOLUME_SLICES = 100 
VOLUME_START_AT = 22 # first slice of volume that we will include
IMG_SIZE=128

TRAIN_DATASET_PATH = os.path.join(os.getcwd(), 'BraTS2020_TrainingData/MICCAI_BraTS2020_TrainingData/')
VALIDATION_DATASET_PATH = os.path.join(os.getcwd(), 'BraTS2020_ValidationData/MICCAI_BraTS2020_ValidationData/')

# Neural Network Tumor Segmentation Classifier Starts Here 

## Base model from Kaggle see https://www.kaggle.com/code/rastislav/3d-mri-brain-tumor-segmentation-u-net made by Ratislav Kopál
## (Adjusted to required specification)

In [None]:
# dice loss as defined above for 4 classes
def dice_coef(y_true, y_pred, smooth=1.0):
    class_num = 4
    for i in range(class_num):
        y_true_f = K.flatten(y_true[:,:,:,i])
        y_pred_f = K.flatten(y_pred[:,:,:,i])
        intersection = K.sum(y_true_f * y_pred_f)
        loss = ((2. * intersection + smooth) / (K.sum(y_true_f) + K.sum(y_pred_f) + smooth))
   #     K.print_tensor(loss, message='loss value for class {} : '.format(SEGMENT_CLASSES[i]))
        if i == 0:
            total_loss = loss
        else:
            total_loss = total_loss + loss
    total_loss = total_loss / class_num
#    K.print_tensor(total_loss, message=' total dice coef: ')
    return total_loss


 
# define per class evaluation of dice coef
# inspired by https://github.com/keras-team/keras/issues/9395
def dice_coef_necrotic(y_true, y_pred, epsilon=1e-6):
    intersection = K.sum(K.abs(y_true[:,:,:,1] * y_pred[:,:,:,1]))
    return (2. * intersection) / (K.sum(K.square(y_true[:,:,:,1])) + K.sum(K.square(y_pred[:,:,:,1])) + epsilon)

def dice_coef_edema(y_true, y_pred, epsilon=1e-6):
    intersection = K.sum(K.abs(y_true[:,:,:,2] * y_pred[:,:,:,2]))
    return (2. * intersection) / (K.sum(K.square(y_true[:,:,:,2])) + K.sum(K.square(y_pred[:,:,:,2])) + epsilon)

def dice_coef_enhancing(y_true, y_pred, epsilon=1e-6):
    intersection = K.sum(K.abs(y_true[:,:,:,3] * y_pred[:,:,:,3]))
    return (2. * intersection) / (K.sum(K.square(y_true[:,:,:,3])) + K.sum(K.square(y_pred[:,:,:,3])) + epsilon)

# Computing Precision 
def precision(y_true, y_pred):
        true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
        predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
        precision = true_positives / (predicted_positives + K.epsilon())
        return precision
    
# Computing Sensitivity      
def sensitivity(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    return true_positives / (possible_positives + K.epsilon())

# Computing Specificity
def specificity(y_true, y_pred):
    true_negatives = K.sum(K.round(K.clip((1-y_true) * (1-y_pred), 0, 1)))
    possible_negatives = K.sum(K.round(K.clip(1-y_true, 0, 1)))
    return true_negatives / (possible_negatives + K.epsilon())

In [None]:
# source https://naomi-fridman.medium.com/multi-class-image-segmentation-a5cc671e647a

def build_unet(inputs, ker_init, dropout):
    conv1 = Conv2D(32, 3, activation = 'relu', padding = 'same', kernel_initializer = ker_init)(inputs)
    conv1 = Conv2D(32, 3, activation = 'relu', padding = 'same', kernel_initializer = ker_init)(conv1)
    
    pool = MaxPooling2D(pool_size=(2, 2))(conv1)
    conv = Conv2D(64, 3, activation = 'relu', padding = 'same', kernel_initializer = ker_init)(pool)
    conv = Conv2D(64, 3, activation = 'relu', padding = 'same', kernel_initializer = ker_init)(conv)
    
    pool1 = MaxPooling2D(pool_size=(2, 2))(conv)
    conv2 = Conv2D(128, 3, activation = 'relu', padding = 'same', kernel_initializer = ker_init)(pool1)
    conv2 = Conv2D(128, 3, activation = 'relu', padding = 'same', kernel_initializer = ker_init)(conv2)
    
    pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)
    conv3 = Conv2D(256, 3, activation = 'relu', padding = 'same', kernel_initializer = ker_init)(pool2)
    conv3 = Conv2D(256, 3, activation = 'relu', padding = 'same', kernel_initializer = ker_init)(conv3)
    
    
    pool4 = MaxPooling2D(pool_size=(2, 2))(conv3)
    conv5 = Conv2D(512, 3, activation = 'relu', padding = 'same', kernel_initializer = ker_init)(pool4)
    conv5 = Conv2D(512, 3, activation = 'relu', padding = 'same', kernel_initializer = ker_init)(conv5)
    drop5 = Dropout(dropout)(conv5)

    up7 = Conv2D(256, 2, activation = 'relu', padding = 'same', kernel_initializer = ker_init)(UpSampling2D(size = (2,2))(drop5))
    merge7 = concatenate([conv3,up7], axis = 3)
    conv7 = Conv2D(256, 3, activation = 'relu', padding = 'same', kernel_initializer = ker_init)(merge7)
    conv7 = Conv2D(256, 3, activation = 'relu', padding = 'same', kernel_initializer = ker_init)(conv7)

    up8 = Conv2D(128, 2, activation = 'relu', padding = 'same', kernel_initializer = ker_init)(UpSampling2D(size = (2,2))(conv7))
    merge8 = concatenate([conv2,up8], axis = 3)
    conv8 = Conv2D(128, 3, activation = 'relu', padding = 'same', kernel_initializer = ker_init)(merge8)
    conv8 = Conv2D(128, 3, activation = 'relu', padding = 'same', kernel_initializer = ker_init)(conv8)

    up9 = Conv2D(64, 2, activation = 'relu', padding = 'same', kernel_initializer = ker_init)(UpSampling2D(size = (2,2))(conv8))
    merge9 = concatenate([conv,up9], axis = 3)
    conv9 = Conv2D(64, 3, activation = 'relu', padding = 'same', kernel_initializer = ker_init)(merge9)
    conv9 = Conv2D(64, 3, activation = 'relu', padding = 'same', kernel_initializer = ker_init)(conv9)
    
    up = Conv2D(32, 2, activation = 'relu', padding = 'same', kernel_initializer = ker_init)(UpSampling2D(size = (2,2))(conv9))
    merge = concatenate([conv1,up], axis = 3)
    conv = Conv2D(32, 3, activation = 'relu', padding = 'same', kernel_initializer = ker_init)(merge)
    conv = Conv2D(32, 3, activation = 'relu', padding = 'same', kernel_initializer = ker_init)(conv)
    
    conv10 = Conv2D(4, (1,1), activation = 'softmax')(conv)
    
    return Model(inputs = inputs, outputs = conv10)

input_layer = Input((IMG_SIZE, IMG_SIZE, 2))

model = build_unet(input_layer, 'he_normal', 0.2)
model.compile(loss="categorical_crossentropy", optimizer=keras.optimizers.Adam(learning_rate=0.001), metrics = ['accuracy',tf.keras.metrics.MeanIoU(num_classes=4), dice_coef, precision, sensitivity, specificity, dice_coef_necrotic, dice_coef_edema ,dice_coef_enhancing] )

In [None]:
# lists of directories with studies
train_and_val_directories = [f.path for f in os.scandir(TRAIN_DATASET_PATH) if f.is_dir()]

# file BraTS20_Training_355 has ill formatted name for for seg.nii file
train_and_val_directories.remove(TRAIN_DATASET_PATH+'BraTS20_Training_355')

def pathListIntoIds(dirList):
    x = []
    for i in range(0,len(dirList)):
        x.append(dirList[i][dirList[i].rfind('/')+1:])
    return x

train_and_test_ids = pathListIntoIds(train_and_val_directories); 

print(f'total number of pre-labeled MRI scans: {len(train_and_test_ids)}')
# Seperate data into 80% model build, 20% Validation
train_test_ids, val_ids = train_test_split(train_and_test_ids,test_size=0.2) 
# 50% of training data to train NN, 50% of training data to calibrate conformal framework, MUCH less is required for conformal framework to hold
model_ids, conformal_ids = train_test_split(train_test_ids,test_size=0.5)
# Split NN training date into 85% train, 15% test
train_ids, test_ids = train_test_split(model_ids,test_size=0.15) 

In [None]:
class DataGenerator(keras.utils.Sequence):
    'Generates data for Keras'
    def __init__(self, list_IDs, dim=(IMG_SIZE,IMG_SIZE), batch_size = 1, n_channels = 2, shuffle=True):
        'Initialization'
        self.dim = dim
        self.batch_size = batch_size
        self.list_IDs = list_IDs
        self.n_channels = n_channels
        self.shuffle = shuffle
        self.on_epoch_end()

    def __len__(self):
        'Denotes the number of batches per epoch'
        return int(np.floor(len(self.list_IDs) / self.batch_size))

    def __getitem__(self, index):
        'Generate one batch of data'
        # Generate indexes of the batch
        indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]

        # Find list of IDs
        Batch_ids = [self.list_IDs[k] for k in indexes]

        # Generate data
        X, y = self.__data_generation(Batch_ids)

        return X, y

    def on_epoch_end(self):
        'Updates indexes after each epoch'
        self.indexes = np.arange(len(self.list_IDs))
        if self.shuffle == True:
            np.random.shuffle(self.indexes)

    def __data_generation(self, Batch_ids):
        'Generates data containing batch_size samples' # X : (n_samples, *dim, n_channels)
        # Initialization
        X = np.zeros((self.batch_size*VOLUME_SLICES, *self.dim, self.n_channels))
        y = np.zeros((self.batch_size*VOLUME_SLICES, 240, 240))
        Y = np.zeros((self.batch_size*VOLUME_SLICES, *self.dim, 4))

        
        # Generate data
        for c, i in enumerate(Batch_ids):
            case_path = os.path.join(TRAIN_DATASET_PATH, i)

            data_path = os.path.join(case_path, f'{i}_flair.nii');
            flair = nib.load(data_path).get_fdata()    

            data_path = os.path.join(case_path, f'{i}_t1ce.nii');
            ce = nib.load(data_path).get_fdata()
            
            data_path = os.path.join(case_path, f'{i}_seg.nii');
            seg = nib.load(data_path).get_fdata()
        
            for j in range(VOLUME_SLICES):
                 X[j +VOLUME_SLICES*c,:,:,0] = cv2.resize(flair[:,:,j+VOLUME_START_AT], (IMG_SIZE, IMG_SIZE));
                 X[j +VOLUME_SLICES*c,:,:,1] = cv2.resize(ce[:,:,j+VOLUME_START_AT], (IMG_SIZE, IMG_SIZE));

                 y[j +VOLUME_SLICES*c] = seg[:,:,j+VOLUME_START_AT];
                    
        # Generate masks
        y[y==4] = 3;
        mask = tf.one_hot(y, 4);
        Y = tf.image.resize(mask, (IMG_SIZE, IMG_SIZE));
        return X/np.max(X), Y
        
training_generator = DataGenerator(train_ids)
valid_generator = DataGenerator(val_ids)
test_generator = DataGenerator(test_ids)

### We import pretrained weights to save time, uncomment to self train

In [None]:
csv_logger = CSVLogger('training.log', separator=',', append=False)


callbacks = [
#     keras.callbacks.EarlyStopping(monitor='loss', min_delta=0,
#                               patience=2, verbose=1, mode='auto'),
      keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.2,
                              patience=2, min_lr=0.000001, verbose=1),
#  keras.callbacks.ModelCheckpoint(filepath = 'model_.{epoch:02d}-{val_loss:.6f}.m5',
#                             verbose=1, save_best_only=True, save_weights_only = True)
        csv_logger
    ]

In [None]:
K.clear_session()

# history =  model.fit(training_generator,
#                     epochs=35,
#                     steps_per_epoch=len(train_ids),
#                     callbacks= callbacks,
#                     validation_data = valid_generator
#                     )  
# model.save("model_x1_1.h5")

In [None]:
############ load trained model ################
model = keras.models.load_model(os.path.join(os.getcwd(), 'model_per_class.h5'), 
                                   custom_objects={ 'accuracy' : tf.keras.metrics.MeanIoU(num_classes=4),
                                                   "dice_coef": dice_coef,
                                                   "precision": precision,
                                                   "sensitivity":sensitivity,
                                                   "specificity":specificity,
                                                   "dice_coef_necrotic": dice_coef_necrotic,
                                                   "dice_coef_edema": dice_coef_edema,
                                                   "dice_coef_enhancing": dice_coef_enhancing
                                                  }, compile=False)

history = pd.read_csv(os.path.join(os.getcwd(), 'training_per_class.log'), sep=',', engine='python')

hist=history

############### ########## ####### #######

# hist=history.history

acc=hist['accuracy']
val_acc=hist['val_accuracy']

epoch=range(len(acc))

loss=hist['loss']
val_loss=hist['val_loss']

train_dice=hist['dice_coef']
val_dice=hist['val_dice_coef']

f,ax=plt.subplots(1,4,figsize=(16,8))

ax[0].plot(epoch,acc,'b',label='Training Accuracy')
ax[0].plot(epoch,val_acc,'r',label='Validation Accuracy')
ax[0].legend()

ax[1].plot(epoch,loss,'b',label='Training Loss')
ax[1].plot(epoch,val_loss,'r',label='Validation Loss')
ax[1].legend()

ax[2].plot(epoch,train_dice,'b',label='Training dice coef')
ax[2].plot(epoch,val_dice,'r',label='Validation dice coef')
ax[2].legend()

ax[3].plot(epoch,hist['mean_io_u'],'b',label='Training mean IOU')
ax[3].plot(epoch,hist['val_mean_io_u'],'r',label='Validation mean IOU')
ax[3].legend()

plt.show()

In [None]:
# mri type must one of 1) flair 2) t1 3) t1ce 4) t2 ------- or even 5) seg
# returns volume of specified study at `path`
def imageLoader(path):
    image = nib.load(path).get_fdata()
    X = np.zeros((self.batch_size*VOLUME_SLICES, *self.dim, self.n_channels))
    for j in range(VOLUME_SLICES):
        X[j +VOLUME_SLICES*c,:,:,0] = cv2.resize(image[:,:,j+VOLUME_START_AT], (IMG_SIZE, IMG_SIZE));
        X[j +VOLUME_SLICES*c,:,:,1] = cv2.resize(ce[:,:,j+VOLUME_START_AT], (IMG_SIZE, IMG_SIZE));

        y[j +VOLUME_SLICES*c] = seg[:,:,j+VOLUME_START_AT];
    return np.array(image)


# load nifti file at `path`
# and load each slice with mask from volume
# choose the mri type & resize to `IMG_SIZE`
def loadDataFromDir(path, list_of_files, mriType, n_images):
    scans = []
    masks = []
    for i in list_of_files[:n_images]:
        fullPath = glob.glob( i + '/*'+ mriType +'*')[0]
        currentScanVolume = imageLoader(fullPath)
        currentMaskVolume = imageLoader( glob.glob( i + '/*seg*')[0] ) 
        # for each slice in 3D volume, find also it's mask
        for j in range(0, currentScanVolume.shape[2]):
            scan_img = cv2.resize(currentScanVolume[:,:,j], dsize=(IMG_SIZE,IMG_SIZE), interpolation=cv2.INTER_AREA).astype('uint8')
            mask_img = cv2.resize(currentMaskVolume[:,:,j], dsize=(IMG_SIZE,IMG_SIZE), interpolation=cv2.INTER_AREA).astype('uint8')
            scans.append(scan_img[..., np.newaxis])
            masks.append(mask_img[..., np.newaxis])
    return np.array(scans, dtype='float32'), np.array(masks, dtype='float32')
        
#brains_list_test, masks_list_test = loadDataFromDir(VALIDATION_DATASET_PATH, test_directories, "flair", 5)

# Visualise Base Tumor Segmentation Classifier

In [None]:
def predictByPath(case_path, case):
    files = next(os.walk(case_path))[2]
    X = np.empty((VOLUME_SLICES, IMG_SIZE, IMG_SIZE, 2))
  #  y = np.empty((VOLUME_SLICES, IMG_SIZE, IMG_SIZE))
    
    vol_path = os.path.join(case_path, f'BraTS20_Training_{case}_flair.nii');
    flair=nib.load(vol_path).get_fdata()
    
    vol_path = os.path.join(case_path, f'BraTS20_Training_{case}_t1ce.nii');
    ce=nib.load(vol_path).get_fdata() 
    
 #   vol_path = os.path.join(case_path, f'BraTS20_Training_{case}_seg.nii');
 #   seg=nib.load(vol_path).get_fdata()  

    
    for j in range(VOLUME_SLICES):
        X[j,:,:,0] = cv2.resize(flair[:,:,j+VOLUME_START_AT], (IMG_SIZE,IMG_SIZE))
        X[j,:,:,1] = cv2.resize(ce[:,:,j+VOLUME_START_AT], (IMG_SIZE,IMG_SIZE))
 #       y[j,:,:] = cv2.resize(seg[:,:,j+VOLUME_START_AT], (IMG_SIZE,IMG_SIZE))
        
  #  model.evaluate(x=X,y=y[:,:,:,0], callbacks= callbacks)
    return model.predict(X/np.max(X), verbose=1)

def showPredictsById(case, start_slice = 60):
    path = os.path.join(os.getcwd(), f'BraTS2020_TrainingData/MICCAI_BraTS2020_TrainingData/BraTS20_Training_{case[-3:]}')
    gt = nib.load(os.path.join(path, f'BraTS20_Training_{case}_seg.nii')).get_fdata()
    origImage = nib.load(os.path.join(path, f'BraTS20_Training_{case}_flair.nii')).get_fdata()
    p = predictByPath(path,case)

    core = p[:,:,:,1]
    edema= p[:,:,:,2]
    enhancing = p[:,:,:,3]

    plt.figure(figsize=(18, 50))
    f, axarr = plt.subplots(1,6, figsize = (18, 50)) 

    for i in range(6): # for each image, add brain background
        axarr[i].imshow(cv2.resize(origImage[:,:,start_slice+VOLUME_START_AT], (IMG_SIZE, IMG_SIZE)), cmap="gray", interpolation='none')
    
    axarr[0].imshow(cv2.resize(origImage[:,:,start_slice+VOLUME_START_AT], (IMG_SIZE, IMG_SIZE)), cmap="gray")
    axarr[0].title.set_text('Original image flair')
    curr_gt=cv2.resize(gt[:,:,start_slice+VOLUME_START_AT], (IMG_SIZE, IMG_SIZE), interpolation = cv2.INTER_NEAREST)
    axarr[1].imshow(curr_gt, cmap="Reds", interpolation='none', alpha=0.3) # ,alpha=0.3,cmap='Reds'
    axarr[1].title.set_text('Ground truth')
    axarr[2].imshow(p[start_slice,:,:,1:4], cmap="Reds", interpolation='none', alpha=0.3)
    axarr[2].title.set_text('all classes')
    axarr[3].imshow(edema[start_slice,:,:], cmap="OrRd", interpolation='none', alpha=0.3)
    axarr[3].title.set_text(f'{SEGMENT_CLASSES[1]} predicted')
    axarr[4].imshow(core[start_slice,:,], cmap="OrRd", interpolation='none', alpha=0.3)
    axarr[4].title.set_text(f'{SEGMENT_CLASSES[2]} predicted')
    axarr[5].imshow(enhancing[start_slice,:,], cmap="OrRd", interpolation='none', alpha=0.3)
    axarr[5].title.set_text(f'{SEGMENT_CLASSES[3]} predicted')
    plt.show()
    
    
showPredictsById(case=test_ids[0][-3:])
showPredictsById(case=test_ids[1][-3:])
showPredictsById(case=test_ids[2][-3:])
showPredictsById(case=test_ids[3][-3:])
showPredictsById(case=test_ids[4][-3:])
showPredictsById(case=test_ids[5][-3:])
showPredictsById(case=test_ids[6][-3:])


# mask = np.zeros((10,10))
# mask[3:-3, 3:-3] = 1 # white square in black background
# im = mask + np.random.randn(10,10) * 0.01 # random image
# masked = np.ma.masked_where(mask == 0, mask)

# plt.figure()
# plt.subplot(1,2,1)
# plt.imshow(im, 'gray', interpolation='none')
# plt.subplot(1,2,2)
# plt.imshow(im, 'gray', interpolation='none')
# plt.imshow(masked, 'jet', interpolation='none', alpha=0.7)
# plt.show()

# Conformal Starts Here

In [None]:
n = IMG_SIZE * IMG_SIZE * VOLUME_SLICES
target_fnr = 0.05

In [None]:
def conformal_prediction_set(smx_scores, lambda_hat):
    """
    Generates prediction sets based on softmax outputs and a quantile threshold.

    Parameters:
    - smx_scores: A 2D numpy array of softmax outputs. Each row corresponds to an instance,
                  and each column corresponds to a class.
    - lambda_hat: The quantile threshold used to determine the prediction sets.

    Returns:
    - prediction_sets: A boolean 2D numpy array indicating the classes included in the
                       prediction set for each instance.
    """
    # Sort the softmax outputs in descending order
    val_pi = smx_scores.argsort(1)[:, ::-1]
    
    # Calculate the cumulative sums of the sorted softmax outputs
    val_srt = np.take_along_axis(smx_scores, val_pi, axis=1).cumsum(axis=1)
    
    # Determine the boundary for each instance's prediction set
    exceeds_lambda = val_srt >= lambda_hat
    prediction_set_boundaries = np.argmax(exceeds_lambda, axis=1)
    
    # For instances where lambda_hat is never exceeded, include all classes
    all_included = np.all(~exceeds_lambda, axis=1)
    prediction_set_boundaries[all_included] = val_srt.shape[1] - 1  # Set to the last index for these cases
    
    # Initialize the prediction sets array
    prediction_sets = np.zeros_like(val_srt, dtype=bool)
    
    # Generate a mask for values within the prediction set boundaries
    col_indices = np.arange(val_srt.shape[1])
    within_boundary_mask = col_indices <= prediction_set_boundaries[:, None]
    
    # Apply the mask according to the sorted indices
    prediction_sets[np.arange(val_srt.shape[0])[:, None], val_pi] = within_boundary_mask
    
    return prediction_sets

def fnr_loss(prediction_sets, ground_truth):
    """
    Calculates the False Negative Rate (FNR) given prediction sets and ground truth labels.

    Parameters:
    - prediction_sets: A boolean 2D numpy array indicating the classes included in the
                       prediction set for each instance.
    - ground_truth: A 1D numpy array of ground truth class labels.

    Returns:
    - FNR: The False Negative Rate as a float.
    """
    # False Negatives: Instances where the ground truth is a tumor (1, 2, or 3),
    # but the prediction set includes only class 0 (no tumor).
    true_positive_rate = np.sum((ground_truth > 0) & np.any(prediction_sets[:, 1:] == True, axis=1)) / np.sum(ground_truth > 0)

    return 1 - true_positive_rate if np.sum(ground_truth > 0) > 0 else 0

### small example of how conformal prediction sets are formed:

In [None]:
smx_scores = np.array([
    [0.9, 0.05, 0.03, 0.02],  # Slice 1: High confidence for class 0 (No tumor)
    [0.2, 0.5, 0.2, 0.1],     # Slice 2: Moderate confidence for class 1 (Tumor type 1)
    [0.1, 0.2, 0.35, 0.35],   # Slice 3: Close probabilities for classes 2 and 3 (Tumor types 2 and 3)
    [0.15, 0.15, 0.2, 0.5]    # Slice 4: Leaning towards class 3 (Tumor type 3), but uncertain
])

# Set qhat to define our prediction confidence threshold
lambda_hat = 0.7  # This means we want at least 60% confidence to include a class in the prediction set

# Deploy
prediction_sets = conformal_prediction_set(smx_scores, lambda_hat)

# Print prediction sets for inspection
print("Prediction Sets:")
print(prediction_sets)

# Conformal Calibration Starts here

In [None]:
def load_data(case):
    path = os.path.join(os.getcwd(), f'BraTS2020_TrainingData/MICCAI_BraTS2020_TrainingData/BraTS20_Training_{case[-3:]}')
    #original_image = nib.load(os.path.join(path, f'BraTS20_Training_{case[-3:]}_flair.nii')).get_fdata()
    ground_truth = nib.load(os.path.join(path, f'BraTS20_Training_{case[-3:]}_seg.nii')).get_fdata()
    ground_truth[ground_truth == 4] = 3
    ground_truth = cv2.resize(ground_truth, (IMG_SIZE, IMG_SIZE))[:, :, VOLUME_START_AT:VOLUME_START_AT+VOLUME_SLICES]
    ground_truth = ground_truth.astype(int)
    soft_max = predictByPath(path,case[-3:])
    soft_max = np.transpose(soft_max, (1, 2, 0, 3))
    return soft_max, ground_truth

def process_and_store_data(id_list):
    """
    Process each dataset in id_list and store or return for further processing.
    """
    all_data = []
    
    for case_id in id_list:
        sftmax, gt = load_data(case_id[-3:])
        
        all_data.append((sftmax, gt))
        
    return all_data

conformal_cal_data = process_and_store_data(conformal_ids)

In [None]:
# Flatten the softmax scores and ground truth labels across all cases
cal_smx_scores = np.vstack([sftmax.reshape(-1, sftmax.shape[-1]) for sftmax, _ in conformal_cal_data])
cal_gt_labels = np.hstack([gt.flatten() for _, gt in conformal_cal_data])

# Calculate calibration scores for the aggregated data
cal_pi = cal_smx_scores.argsort(1)[:, ::-1]
cal_srt = np.take_along_axis(cal_smx_scores, cal_pi, axis=1).cumsum(axis=1)
cal_scores = np.take_along_axis(cal_srt, cal_pi.argsort(axis=1), axis=1)[np.arange(cal_gt_labels.size), cal_gt_labels]

# Determine the quantile for lambda_hat that controls the FNR across all data
target_fnr = 0.05  # for example
n = cal_gt_labels.size
true_positive_scores = cal_scores[cal_gt_labels > 0].min(axis=1)
q_hat = np.quantile(cal_srt[cal_gt_labels> 0, 1:].min(axis=1), np.ceil((n+1)*(1-target_fnr))/n, interpolation='higher')

print(f"Calibrated lambda_hat for all data: {q_hat}")

In [None]:
# Set your target FNR and range for lambda values
target_fnr = 0.05
lambda_values = np.linspace(1, 0, 200)  # Starting from 1 (smallest sets) to 0 (largest sets)

# Placeholder for the chosen lambda_hat
lambda_hat = 1  # Start with the smallest set (most conservative)

# Loop through lambda values from most conservative (small sets) to least (large sets)
for lambda_val in lambda_values:
    # Generate prediction sets using the current lambda value
    prediction_sets = conformal_prediction_set(cal_smx_scores, lambda_val)
    
    # Compute the FNR for these prediction sets
    current_fnr = fnr_loss(prediction_sets, cal_gt_labels)
    print(f'{lambda_val} : {current_fnr}')
    
    # Check if the current FNR is below the target, if so, update lambda_hat and continue
    if current_fnr <= target_fnr:
        lambda_hat = lambda_val
    else:
        # If FNR exceeds target, stop the loop; the last lambda_hat is the largest lambda meeting the FNR criterion
        break

# Print the chosen lambda_hat and the corresponding FNR
print(f"Chosen lambda_hat: {lambda_hat}")

# Assume you have validation softmax outputs (val_smx) and labels (val_labels)
# print(f"Validation FNR at lambda_hat: {fnr_loss(conformal_prediction_set(val_smx, lambda_hat), val_labels)}")

In [None]:
# Alternatively lambda hat can be found by using binary search which is more efficient
def binary_search_lambda(all_smx_scores, all_gt_labels, target_fnr, low=0, high=1, tol=0.0005):
    """
    Use binary search to find the lambda_hat value that achieves the target FNR.
    """
    while low < high - tol:
        mid = (low + high) / 2
        prediction_sets = conformal_prediction_set(all_smx_scores, mid)
        current_fnr = fnr_loss(prediction_sets, all_gt_labels)

        if current_fnr > target_fnr:
            low = mid
        else:
            high = mid

    return (low + high) / 2

# Calculate the optimal lambda_hat using binary search
lambda_hat = binary_search_lambda(all_smx_scores, all_gt_labels, target_fnr)
print(f"Optimized lambda_hat: {lambda_hat}")
print(fnr_loss(conformal_prediction_set(all_smx_scores, lambda_hat), all_gt_labels))

In [None]:
## Make up space on memory
# del conformal_cal_data 
# del cal_smx_scores
# del cal_gt_labels

conformal_val_data = process_and_store_data(val_ids)

# Flatten the softmax scores and ground truth labels across all cases
val_smx_scores = np.vstack([sftmax.reshape(-1, sftmax.shape[-1]) for sftmax, _ in conformal_val_data])
val_gt_labels = np.hstack([gt.flatten() for _, gt in conformal_val_data])

In [None]:
# Collect all the FNR for each prediction in the validation set
fnr_list = []
set_size_fraction_list = []

for sftmax, gt in conformal_val_data:
    sftmax_flat = sftmax.reshape(-1, sftmax.shape[-1])
    gt_flat = gt.flatten()
    
    prediction_sets = conformal_prediction_set(sftmax_flat, lambda_hat)
    fnr = fnr_loss(prediction_sets, gt_flat)
    fnr_list.append(fnr)
    
    # Calculate set size as a fraction of polyp size (assuming 'polyp' is equivalent to tumor)
    # This is an example calculation that you will need to adjust based on your actual data
    set_size = conformal_prediction_set(sftmax_flat, lambda_hat)[np.sum(conformal_prediction_set(sftmax_flat, lambda_hat)[:,1:],axis=1) > 0].shape[0]  # Sum over classes
    polyp_size = np.sum(gt_flat > 0)  # Assuming non-zero labels are tumors
    set_size_fraction = set_size / polyp_size
    set_size_fraction_list.extend(set_size_fraction[polyp_size > 0])  # Exclude cases without polyps

# Generate prediction sets for the validation data using the calibrated lambda_hat
val_prediction_sets = conformal_prediction_set(val_smx_scores, lambda_hat)

# Calculate the FNR loss on the validation data
validation_fnr_loss = fnr_loss(val_prediction_sets, val_gt_labels)

print(f"Validation FNR: {validation_fnr_loss}")

df = pd.DataFrame({
    'risk': fnr_list,
    'set_size_fraction': set_size_fraction_list
})

# Begin plotting
sns.set(palette='pastel', font='serif')
sns.set_style('white')

plt.rcParams['figure.dpi'] = 300
fig, axs = plt.subplots(nrows=1, ncols=2, figsize=(12,3))

# Plot risk histogram (FNR distribution)
axs[0].hist(df['risk'].to_numpy(), bins=30, alpha=0.7,  density=True)
axs[0].set_xlabel('Risk (False Negative Rate)')
axs[0].locator_params(axis='x', nbins=10)
axs[0].axvline(x=target_fnr,c='#999999',linestyle='--',alpha=0.7)
axs[0].set_ylabel('Density')

# Plot set size as fraction of polyp size histogram
axs[1].hist(df['set_size_fraction'], bins=np.logspace(np.log10(0.1), np.log10(5), 50), alpha=0.7, density=True)
axs[1].set_xlabel('Set size as a fraction of polyp size')
axs[1].locator_params(axis='x', nbins=10)
axs[1].set_yscale('log')
axs[1].set_ylabel('Density')


sns.despine(top=True, right=True, ax=axs[0])
sns.despine(top=True, right=True, ax=axs[1])
plt.tight_layout()
plt.show()

print(f"The mean and standard deviation of the risk over {len(df)} trials are {df['risk'].mean():.4f} and {df['risk'].std():.4f} respectively.")

In [None]:
# Set the style
sns.set(palette='pastel', font='serif')
sns.set_style('white')

# Begin plotting with the specified figure size and dpi
plt.figure(figsize=(8, 6), dpi=300)
plt.scatter(df['set_size_fraction'], df['risk'], alpha=0.7)

# Setting the scale of x-axis to log scale for set size fractions

# Adding labels and title
plt.xlabel('set size as fraction of polyp size')
plt.ylabel('risk (false negative rate)')
plt.title('risk and set size')

# Remove top and right borders
sns.despine()

# Show the plot
plt.show()

In [None]:
# Generate prediction sets for the validation data using the calibrated lambda_hat
val_prediction_sets = conformal_prediction_set(val_smx_scores, lambda_hat)

# Calculate the FNR loss on the validation data
validation_fnr_loss = fnr_loss(val_prediction_sets, val_gt_labels)

print(f"Validation FNR: {validation_fnr_loss}")

# 3D Visualisation tool we build for NN Initial Segmentation + Conformal Uncertainty Quantification

Note: there is an error in visual studio code (most likely also in other jupyter notebook IDE's) where files with output larger than 150MB will corrupt the notebook.

hence first clear all outputs running the following part.

Trust me we have found out the hard way ;)

In [None]:
# Download Conformal Calibration case
case = '004'
path = os.path.join(os.getcwd(), f'BraTS2020_TrainingData/MICCAI_BraTS2020_TrainingData/BraTS20_Training_{case[-3:]}')
cal_gt = nib.load(os.path.join(path, f'BraTS20_Training_{case}_seg.nii')).get_fdata()
cal_gt[cal_gt == 4] = 3
cal_gt = cv2.resize(cal_gt, (IMG_SIZE, IMG_SIZE))[:, :, VOLUME_START_AT:VOLUME_START_AT+VOLUME_SLICES]
cal_gt = cal_gt.astype(int)
origImage = nib.load(os.path.join(path, f'BraTS20_Training_{case}_flair.nii')).get_fdata(); origImage = cv2.resize(origImage, (IMG_SIZE, IMG_SIZE))[:, :, VOLUME_START_AT:VOLUME_START_AT+VOLUME_SLICES]

cal_smx = predictByPath(path,case)
cal_smx = np.transpose(cal_smx, (1, 2, 0, 3)); p = cal_smx
# Get scores.
cal_smx = cal_smx.reshape(-1, cal_smx.shape[-1])
cal_labels = cal_gt.reshape(-1); n=cal_labels.shape[0]
cal_pi = cal_smx.argsort(1)[:,::-1] # Sorted indices in descending order of probabilities
cal_srt = np.take_along_axis(cal_smx, cal_pi, axis=1).cumsum(axis=1) # Cumulative sums of sorted probabilities
cal_scores = np.take_along_axis(cal_srt,cal_pi.argsort(axis=1),axis=1)[range(n),cal_labels]

In [None]:

# Convert probabilities to category assignments
category_assignments = np.argmax(p, axis=-1)

# Ensure there are non-zero categories
if np.all(category_assignments == 0):
    raise ValueError("All category assignments are 0. No tumor categories to plot.")


# Generate grid coordinates
# np.indices will create a set of arrays (one for each dimension), each containing the indices along that dimension
x, y, z = np.indices((IMG_SIZE, IMG_SIZE, VOLUME_SLICES))

z = z * 128/244
# Create a figure
fig = go.Figure()

# Create a single volume with multiple isosurfaces
fig.add_trace(go.Volume(
    x=x.flatten(), # X coordinates
    y=y.flatten(), # Y coordinates
    z=z.flatten(), # Z coordinates
    value=origImage.flatten(), # Assignments
    isomin=1.5,
    isomax=4,  # Adjust based on the number of categories; here it's set to include 1-3
    opacity=0.1,  # Adjust opacity as needed
    surface_count=1,  # One surface for each category
    colorscale='Greys',  # Use a perceptually uniform colorscale
    name='Brain MRI'
))


# Create a single volume with multiple isosurfaces
fig.add_trace(go.Volume(
    x=x.flatten(), # X coordinates
    y=y.flatten(), # Y coordinates
    z=z.flatten(), # Z coordinates
    value=conformal_prediction_set(cal_smx, lambda_hat).reshape(IMG_SIZE, IMG_SIZE, VOLUME_SLICES, -1).sum(axis=-1).flatten(), # Assignments
    isomin=1.5,
    isomax=4,  # Adjust based on the number of categories; here it's set to include 1-3
    opacity=0.5,  # Adjust opacity as needed
    surface_count=3,  # One surface for each category
    colorscale='Reds',  # Use a perceptually uniform colorscale
    name='Conformal Uncertainty'
))

# Create a single volume with multiple isosurfaces
fig.add_trace(go.Volume(
    x=x.flatten(), # X coordinates
    y=y.flatten(), # Y coordinates
    z=z.flatten(), # Z coordinates
    value=category_assignments.flatten(), # Assignments
    isomin=0.5,
    isomax=3.5,  # Adjust based on the number of categories; here it's set to include 1-3
    opacity=0.2,  # Adjust opacity as needed
    surface_count=3,  # One surface for each category
    colorscale='Jet',  # Use a perceptually uniform colorscale
    name='Tumor Classification'
))


fig.update_layout(
    title="3D Volumetric Plot of Brain Tumors",
    scene=dict(
        xaxis_title='X Axis',
        yaxis_title='Y Axis',
        zaxis_title='Z Axis'
    ),
    width = 1400,
    height = 900
)

fig.show()
# fig.write_html("3D-Tumor-Volume.html")

Gray volume: the brain of the patient

Purple Volume: EDEMA

Green Volume: Necrotic / Core

### Conformal Volumes (varying degree of uncertainty)
White Volume: prediction set contains 2 labels

Red Volume: prediction set contains 3 labels

Dark Red Volume: prediction set contains all 4 labels

Since there is only one 'healthy' label, if the prediction set contains more then one label it contains a tumor label.