In [None]:
import os
import cv2
import numpy as np

# Path to the folder containing the ground truth images
data_dir = r'C:\Users\Babak\Desktop\Automated_Dural_sack_measurement\05_Final_Ground_Truth_Data\Label_Images'

# Path to the folder where the binary masks for the TS region will be saved
output_dir = r'C:\Users\Babak\Desktop\Automated_Dural_sack_measurement\Output_TS'

# Define the color range to extract (in this case, the TS color)
lower_range = np.array([150, 150, 150])
upper_range = np.array([150, 150, 150])

# HEX and RGB values for the TS region
TS_HEX = '#969696' # got for example through https://imagecolorpicker.com/
TS_RGB = (150, 150, 150) # got for example through https://imagecolorpicker.com/

# Loop through each image in the ground truth folder
for filename in os.listdir(data_dir):
    # Load the image
    img = cv2.imread(os.path.join(data_dir, filename))

    # Create a binary mask with only the TS pixels
    mask = cv2.inRange(img, lower_range, upper_range)

    # Apply the mask to the original image
    ts_img = cv2.bitwise_and(img, img, mask=mask)


    # Save the binary mask in the output folder with the same filename
    output_path = os.path.join(output_dir, filename)
    cv2.imwrite(output_path, ts_img)

In [None]:
import os
import cv2
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Dropout, Conv2DTranspose, concatenate
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from sklearn.model_selection import train_test_split
from tensorflow.keras.callbacks import ModelCheckpoint
from sklearn.model_selection import KFold
from tensorflow.keras.layers import multiply
import tensorflow.keras.backend as K
from tensorflow.keras.layers import add, Activation
from tensorflow.keras.layers import BatchNormalization, add, Activation, concatenate
from tensorflow.keras.layers import UpSampling2D

# Preprocessing
def preprocess(img):
    img = cv2.resize(img, (320, 320))
    img = img / 255.0
    return img

def load_data(data_dir, label_dir):
    x_train, y_train = [], []
    for filename in os.listdir(data_dir):
        if filename.endswith('.png') and filename.startswith('T1'):
            # Load the image
            img = cv2.imread(os.path.join(data_dir, filename), cv2.IMREAD_GRAYSCALE)

            # Load the corresponding label image
            label_filename = filename.replace('T1', 'L1')
            label_img = cv2.imread(os.path.join(label_dir, label_filename), cv2.IMREAD_GRAYSCALE)

            # Preprocess the input and label images
            img = preprocess(img)
            label_img = preprocess(label_img)

            # Normalize the label image
            label_img = np.where(label_img == 150/255, 1.0, 0.0)

            # Append the images to the lists
            x_train.append(img)
            y_train.append(label_img)
    
    return np.array(x_train), np.array(y_train)

# Attention Gate
def attention_gate(inp, g, inter_channels):
    theta_x = Conv2D(inter_channels, [2, 2], strides=[1, 1], padding='same')(inp)
    phi_g = Conv2D(inter_channels, [1, 1], padding='same')(g)
    theta_x_upsampled = UpSampling2D(size=(2, 2))(theta_x)  # Upsample theta_x to match the dimensions of phi_g

    f = Activation('relu')(add([theta_x_upsampled, phi_g]))
    f_pooled = MaxPooling2D((2, 2))(f)  # Adjust the resolution of f to match inp
    psi_f = Conv2D(1, [1, 1], activation='sigmoid')(f_pooled)
    rate = multiply([inp, psi_f])

    return rate



def weighted_binary_crossentropy(pos_weight, neg_weight):
    def _weighted_binary_crossentropy(y_true, y_pred):
        bce = K.binary_crossentropy(y_true, y_pred)
        weight_vector = y_true * pos_weight + (1. - y_true) * neg_weight
        weighted_bce = weight_vector * bce
        return K.mean(weighted_bce)
    return _weighted_binary_crossentropy

# Set the weights for positive and negative classes
pos_weight = 20.0
neg_weight = 1.0

# U-Net model
def unet_model(input_size=(320, 320, 1)):
    inputs = Input(input_size)
    
    c1 = Conv2D(32, (3, 3), activation='relu', padding='same')(inputs)
    c1 = Conv2D(32, (3, 3), activation='relu', padding='same')(c1)
    p1 = MaxPooling2D((2, 2))(c1)

    c2 = Conv2D(64, (3, 3), activation='relu', padding='same')(p1)
    c2 = Conv2D(64, (3, 3), activation='relu', padding='same')(c2)
    p2 = MaxPooling2D((2, 2))(c2)

    c3 = Conv2D(128, (3, 3), activation='relu', padding='same')(p2)
    c3 = Conv2D(128, (3, 3), activation='relu', padding='same')(c3)
    p3 = MaxPooling2D((2, 2))(c3)

    c4 = Conv2D(256, (3, 3), activation='relu', padding='same')(p3)
    c4 = Conv2D(256, (3, 3), activation='relu', padding='same')(c4)

    u5 = Conv2DTranspose(128, (2, 2), strides=(2, 2), padding='same')(c4)
    u5 = concatenate([u5, c3])
    c5 = Conv2D(128, (3, 3), activation='relu', padding='same')(u5)
    c5 = Conv2D(128, (3, 3), activation='relu', padding='same')(c5)

    u6 = Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(c5)
    u6 = concatenate([u6, c2])
    c6 = Conv2D(64, (3, 3), activation='relu', padding='same')(u6)
    c6 = Conv2D(64, (3, 3), activation='relu', padding='same')(c6)

    u7 = Conv2DTranspose(32, (2, 2), strides=(2, 2), padding='same')(c6)
    u7 = concatenate([u7, c1])
    c7 = Conv2D(32, (3, 3), activation='relu', padding='same')(u7)
    c7 = Conv2D(32, (3, 3), activation='relu', padding='same')(c7)

    outputs = Conv2D(1, (1, 1), activation='sigmoid')(c7)

    model = Model(inputs=[inputs], outputs=[outputs])
    # Compile the model with the custom loss function
    model.compile(optimizer='adam', loss=weighted_binary_crossentropy(pos_weight, neg_weight), metrics=['accuracy'])

    return model

def attention_unet_model(input_size=(320, 320, 1)):
    inputs = Input(input_size)
    
    c1 = Conv2D(32, (3, 3), activation='relu', padding='same')(inputs)
    c1 = Conv2D(32, (3, 3), activation='relu', padding='same')(c1)
    p1 = MaxPooling2D((2, 2))(c1)

    c2 = Conv2D(64, (3, 3), activation='relu', padding='same')(p1)
    c2 = Conv2D(64, (3, 3), activation='relu', padding='same')(c2)
    p2 = MaxPooling2D((2, 2))(c2)

    c3 = Conv2D(128, (3, 3), activation='relu', padding='same')(p2)
    c3 = Conv2D(128, (3, 3), activation='relu', padding='same')(c3)
    p3 = MaxPooling2D((2, 2))(c3)

    c4 = Conv2D(256, (3, 3), activation='relu', padding='same')(p3)
    c4 = Conv2D(256, (3, 3), activation='relu', padding='same')(c4)

    att5 = attention_gate(c4, c3, 128)  # Adding attention gate
    u5 = Conv2DTranspose(128, (2, 2), strides=(2, 2), padding='same')(att5)
    c5 = Conv2D(128, (3, 3), activation='relu', padding='same')(u5)
    c5 = Conv2D(128, (3, 3), activation='relu', padding='same')(c5)

    att6 = attention_gate(c5, c2, 64)  # Adding attention gate
    u6 = Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(att6)
    u6 = concatenate([u6, c2])
    c6 = Conv2D(64, (3, 3), activation='relu', padding='same')(u6)
    c6 = Conv2D(64, (3, 3), activation='relu', padding='same')(c6)

    att7 = attention_gate(c6, c1, 32)  # Adding attention gate
    u7 = Conv2DTranspose(32, (2, 2), strides=(2, 2), padding='same')(att7)
    c7 = Conv2D(32, (3, 3), activation='relu', padding='same')(u7)
    c7 = Conv2D(32, (3, 3), activation='relu', padding='same')(c7)

    outputs = Conv2D(1, (1, 1), activation='sigmoid')(c7)

    model = Model(inputs=[inputs], outputs=[outputs])
    # Compile the model with the custom loss function
    model.compile(optimizer='adam', loss=weighted_binary_crossentropy(pos_weight, neg_weight), metrics=['accuracy'])

    return model

def conv2d_bn(x, filters, num_row, num_col, padding='same', strides=(1, 1), activation='relu', name=None):
    '''
    2D Convolutional layers
    
    Arguments:
        x {keras layer} -- input layer 
        filters {int} -- number of filters
        num_row {int} -- number of rows in filters
        num_col {int} -- number of columns in filters
    
    Keyword Arguments:
        padding {str} -- mode of padding (default: {'same'})
        strides {tuple} -- stride of convolution operation (default: {(1, 1)})
        activation {str} -- activation function (default: {'relu'})
        name {str} -- name of the layer (default: {None})
    
    Returns:
        [keras layer] -- [output layer]
    '''

    x = Conv2D(filters, (num_row, num_col), strides=strides, padding=padding, use_bias=False)(x)
    x = BatchNormalization(axis=3, scale=False)(x)

    if(activation == None):
        return x

    x = Activation(activation, name=name)(x)

    return x

def trans_conv2d_bn(x, filters, num_row, num_col, padding='same', strides=(2, 2), name=None):
    '''
    2D Transposed Convolutional layers
    
    Arguments:
        x {keras layer} -- input layer 
        filters {int} -- number of filters
        num_row {int} -- number of rows in filters
        num_col {int} -- number of columns in filters
    
    Keyword Arguments:
        padding {str} -- mode of padding (default: {'same'})
        strides {tuple} -- stride of convolution operation (default: {(2, 2)})
        name {str} -- name of the layer (default: {None})
    
    Returns:
        [keras layer] -- [output layer]
    '''

    x = Conv2DTranspose(filters, (num_row, num_col), strides=strides, padding=padding)(x)
    x = BatchNormalization(axis=3, scale=False)(x)
    
    return x

# Adding the MultiResUNet model
def MultiResBlock(U, inp, alpha = 1.67):
    '''
    MultiRes Block
    
    Arguments:
        U {int} -- Number of filters in a corrsponding UNet stage
        inp {keras layer} -- input layer 
    
    Returns:
        [keras layer] -- [output layer]
    '''

    W = alpha * U

    shortcut = inp

    shortcut = conv2d_bn(shortcut, int(W*0.167) + int(W*0.333) +
                         int(W*0.5), 1, 1, activation=None, padding='same')

    conv3x3 = conv2d_bn(inp, int(W*0.167), 3, 3,
                        activation='relu', padding='same')

    conv5x5 = conv2d_bn(conv3x3, int(W*0.333), 3, 3,
                        activation='relu', padding='same')

    conv7x7 = conv2d_bn(conv5x5, int(W*0.5), 3, 3,
                        activation='relu', padding='same')

    out = concatenate([conv3x3, conv5x5, conv7x7], axis=3)
    out = BatchNormalization(axis=3)(out)

    out = add([shortcut, out])
    out = Activation('relu')(out)
    out = BatchNormalization(axis=3)(out)

    return out

def ResPath(filters, length, inp):
    '''
    ResPath
    
    Arguments:
        filters {int} -- [description]
        length {int} -- length of ResPath
        inp {keras layer} -- input layer 
    
    Returns:
        [keras layer] -- [output layer]
    '''


    shortcut = inp
    shortcut = conv2d_bn(shortcut, filters, 1, 1,
                         activation=None, padding='same')

    out = conv2d_bn(inp, filters, 3, 3, activation='relu', padding='same')

    out = add([shortcut, out])
    out = Activation('relu')(out)
    out = BatchNormalization(axis=3)(out)

    for i in range(length-1):

        shortcut = out
        shortcut = conv2d_bn(shortcut, filters, 1, 1,
                             activation=None, padding='same')

        out = conv2d_bn(out, filters, 3, 3, activation='relu', padding='same')

        out = add([shortcut, out])
        out = Activation('relu')(out)
        out = BatchNormalization(axis=3)(out)

    return out

def MultiResUnet(input_size=(320, 320, 1)):
    inputs = Input(input_size)

    mresblock1 = MultiResBlock(32, inputs)
    pool1 = MaxPooling2D(pool_size=(2, 2))(mresblock1)
    mresblock1 = ResPath(32, 4, mresblock1)

    mresblock2 = MultiResBlock(32*2, pool1)
    pool2 = MaxPooling2D(pool_size=(2, 2))(mresblock2)
    mresblock2 = ResPath(32*2, 3, mresblock2)

    mresblock3 = MultiResBlock(32*4, pool2)
    pool3 = MaxPooling2D(pool_size=(2, 2))(mresblock3)
    mresblock3 = ResPath(32*4, 2, mresblock3)

    mresblock4 = MultiResBlock(32*8, pool3)
    pool4 = MaxPooling2D(pool_size=(2, 2))(mresblock4)
    mresblock4 = ResPath(32*8, 1, mresblock4)

    mresblock5 = MultiResBlock(32*16, pool4)

    up6 = concatenate([Conv2DTranspose(32*8, (2, 2), strides=(2, 2), padding='same')(mresblock5), mresblock4], axis=3)
    mresblock6 = MultiResBlock(32*8, up6)

    up7 = concatenate([Conv2DTranspose(32*4, (2, 2), strides=(2, 2), padding='same')(mresblock6), mresblock3], axis=3)
    mresblock7 = MultiResBlock(32*4, up7)

    up8 = concatenate([Conv2DTranspose(32*2, (2, 2), strides=(2, 2), padding='same')(mresblock7), mresblock2], axis=3)
    mresblock8 = MultiResBlock(32*2, up8)

    up9 = concatenate([Conv2DTranspose(32, (2, 2), strides=(2, 2), padding='same')(mresblock8), mresblock1], axis=3)
    mresblock9 = MultiResBlock(32, up9)

    conv10 = conv2d_bn(mresblock9, 1, 1, 1, activation='sigmoid')

    model = Model(inputs=[inputs], outputs=[conv10])
    model.compile(optimizer='adam', loss=weighted_binary_crossentropy(pos_weight, neg_weight), metrics=['accuracy'])

    return model

# Train the model
data_dir = r'C:\Users\Babak\Desktop\Automated_Dural_sack_measurement\04_Intermediary_Ground_Truth_Data\T1_Output'
label_dir = r'C:\Users\Babak\Desktop\Automated_Dural_sack_measurement\Output_TS'

x_train, y_train = load_data(data_dir, label_dir)


# Define 5-fold cross validation
kfold = KFold(n_splits=5, shuffle=True, random_state=42)

# Lists to store metrics
val_loss_per_fold_unet = []
val_loss_per_fold_att_unet = []
val_loss_per_fold_multi_unet = []

# To keep track of the best model
best_model_unet = None
best_model_att_unet = None
best_model_multi_unet = None
lowest_val_loss_unet = float('inf')
lowest_val_loss_att_unet = float('inf')
lowest_val_losses_multi_unet = float('inf')

# Define models
model_functions = [unet_model, attention_unet_model, MultiResUnet]
model_names = ['U-Net', 'Attention U-Net', 'MultiResUNet']
best_models = [best_model_unet, best_model_att_unet, best_model_multi_unet]
lowest_val_losses = [lowest_val_loss_unet, lowest_val_loss_att_unet, lowest_val_losses_multi_unet]
val_losses = [val_loss_per_fold_unet, val_loss_per_fold_att_unet, val_loss_per_fold_multi_unet]

# Iterate over both models
for i, model_function in enumerate(model_functions):
    print('Training model:', model_names[i])

    # Reset fold counter for each model
    fold_no = 0

    # Iterate through each fold
    for train, test in kfold.split(x_train, y_train):
        # Generate a print
        print('------------------------------------------------------------------------')
        print(f'Training for fold {fold_no + 1}')

        # Create a new model for each fold
        model = model_function()

        # Fit the model
        history = model.fit(
            x_train[train], y_train[train],
            batch_size=32,
            epochs=20,
            validation_data=(x_train[test], y_train[test])
        )

        # Evaluate and store the metrics for each fold
        scores = model.evaluate(x_train[test], y_train[test], verbose=0)
        print(f'Score for fold {fold_no + 1}: {model.metrics_names[0]} of {scores[0]}')
        val_losses[i].append(scores[0])

        # Save the model if it has a lower validation loss than the current best model
        if scores[0] < lowest_val_losses[i]:
            lowest_val_losses[i] = scores[0]
            best_models[i] = model

        # Increment fold counter
        fold_no += 1

    # Average metrics
    print('Average scores for all folds:')
    print(f'> Loss: {np.mean(val_losses[i])}')
    print('------------------------------------------------------------------------')

# Save the best models
best_models[0].save('unet_dural_sack_new.h5')
best_models[1].save('attention_unet_dural_sack_new.h5')
best_models[2].save('multiresunet_dural_sack_new.h5')

In [None]:
import tensorflow as tf
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, mean_absolute_error, mean_squared_error
from scipy.stats import pearsonr

# Set the pixel size in mm
pixel_size_mm = 0.6875  # Adjust this value according to your image resolution

# Calculate the dural sack cross-sectional area for each image
def calculate_areas(mask, pixel_size):
    areas = []
    for m in mask:
        area = np.sum(m) * pixel_size**2
        areas.append(area)
    return np.array(areas)

model_filenames = [
    'unet_dural_sack_new.h5', 
    'attention_unet_dural_sack_new.h5', 
    'multiresunet_dural_sack_new.h5'
]

model_names = ["U-Net", "Attention U-Net", "MultiResUNet"]  # Ensure this list corresponds with the model_filenames list

for i, model_filename in enumerate(model_filenames):
    # Load the trained model
    model = tf.keras.models.load_model(model_filename, custom_objects={'_weighted_binary_crossentropy': weighted_binary_crossentropy(pos_weight, neg_weight)})

    # Predict on the validation set
    y_pred = model.predict(x_val)
    y_pred_binary = np.where(y_pred > 0.5, 1, 0)

    # Calculate areas for predicted and ground truth masks
    areas_pred = calculate_areas(y_pred_binary, pixel_size_mm)
    areas_gt = calculate_areas(y_val, pixel_size_mm)

    # Calculate Mean Absolute Error (MAE) and Mean Squared Error (MSE)
    mae = mean_absolute_error(areas_gt, areas_pred)
    mse = mean_squared_error(areas_gt, areas_pred)
    print(f"Model: {model_names[i]}")  # Changed to model_names[i]
    print(f"Mean Absolute Error: {mae:.4f} mm²")
    print(f"Mean Squared Error: {mse:.4f} mm²")

    # Calculate Pearson correlation coefficient
    r, p = pearsonr(areas_gt, areas_pred)
    print(f"Pearson correlation coefficient: {r:.4f}")

    # Visualize the results with a scatter plot
    plt.figure(figsize=(10, 6))
    plt.scatter(areas_gt, areas_pred)
    plt.xlabel('Ground truth area (mm²)')
    plt.ylabel('Predicted area (mm²)')
    plt.title(f'Scatter plot of predicted vs ground truth areas for {model_names[i]}')  # Changed to model_names[i]
    plt.plot([min(areas_gt), max(areas_gt)], [min(areas_gt), max(areas_gt)], 'r--', label='Identity line')
    plt.legend()
    plt.show()

    # Bland-Altman plot
    mean_diff = np.mean(areas_pred - areas_gt)
    std_diff = np.std(areas_pred - areas_gt)
    upper_limit = mean_diff + 1.96 * std_diff
    lower_limit = mean_diff - 1.96 * std_diff

    print(f"Mean difference: {mean_diff:.4f} mm²")
    print(f"Limits of agreement: [{lower_limit:.4f}, {upper_limit:.4f}] mm²")

    plt.figure(figsize=(10, 6))
    plt.scatter((areas_gt + areas_pred) / 2, areas_pred - areas_gt)
    plt.axhline(mean_diff, color='red', linestyle='--', label='Mean difference')
    plt.axhline(upper_limit, color='blue', linestyle='--', label='Upper limit (1.96 SD)')
    plt.axhline(lower_limit, color='blue', linestyle='--', label='Lower limit (1.96 SD)')
    plt.xlabel('Mean of ground truth and predicted areas (mm²)')
    plt.ylabel('Difference between predicted and ground truth areas (mm²)')
    plt.title(f'Bland-Altman plot for {model_names[i]}')  # Changed to model_names[i]
    plt.legend()
    plt.show()


In [None]:
import numpy as np
from sklearn.metrics import mean_absolute_error

pixel_size_mm = 0.6875

def calculate_area(mask, pixel_size_mm):
    num_pixels = np.sum(mask)
    return num_pixels * (pixel_size_mm ** 2)

# Load the trained models
model_filenames = [
    'unet_dural_sack_new.h5', 
    'attention_unet_dural_sack_new.h5', 
    'multiresunet_dural_sack_new.h5'
]

model_names = ["U-Net", "Attention U-Net", "MultiResUNet"]

# Calculate Mean Absolute Error (MAE) for each slice for each model and store it in a list
mae_per_slice_all_models = []

for model_filename in model_filenames:
    model = tf.keras.models.load_model(model_filename, custom_objects={'_weighted_binary_crossentropy': weighted_binary_crossentropy(pos_weight, neg_weight)})
    y_pred = model.predict(x_val)
    y_pred_binary = np.where(y_pred > 0.5, 1, 0)
    areas_pred = calculate_areas(y_pred_binary, pixel_size_mm)
    areas_gt = calculate_areas(y_val, pixel_size_mm)

    # Calculate Mean Absolute Error (MAE) for each slice
    mae_per_slice = np.abs(areas_gt - areas_pred)
    mae_per_slice_all_models.append(mae_per_slice)

# Get the MAE values for each model
mae_unet, mae_attention_unet, mae_multiresunet = mae_per_slice_all_models

# Identify the indices where MultiResUNet performed better than Attention U-Net, and Attention U-Net performed better than U-Net
better_indices = np.where((mae_multiresunet < mae_attention_unet) & (mae_attention_unet < mae_unet))[0]

# From these indices, find the one with the smallest average MAE
average_mae_at_better_indices = np.mean(np.array(mae_per_slice_all_models)[:, better_indices], axis=0)
best_slice_index = better_indices[np.argmin(average_mae_at_better_indices)]


# Visualize the best predicted slice with cross-sectional area for all models
fig, axes = plt.subplots(len(model_filenames), 3, figsize=(15, 5 * len(model_filenames)))

for i, model_filename in enumerate(model_filenames):
    model = tf.keras.models.load_model(model_filename, custom_objects={'_weighted_binary_crossentropy': weighted_binary_crossentropy(pos_weight, neg_weight)})
    y_pred = model.predict(x_val)
    y_pred_binary = np.where(y_pred > 0.5, 1, 0)

    predicted_area = calculate_area(y_pred_binary[best_slice_index, :, :, 0], pixel_size_mm)
    gt_area = calculate_area(y_val[best_slice_index, :, :, 0], pixel_size_mm)
    axes[i, 0].imshow(x_val[best_slice_index, :, :, 0], cmap='gray')
    axes[i, 0].set_title(f"Input image\n{model_names[i]}")  # use model names here
    axes[i, 1].imshow(y_val[best_slice_index, :, :, 0], cmap='gray')
    axes[i, 1].set_title(f"Ground truth\nArea: {gt_area:.2f} mm²")
    axes[i, 2].imshow(y_pred_binary[best_slice_index, :, :, 0], cmap='gray')
    axes[i, 2].set_title(f"Predicted segmentation\nArea: {predicted_area:.2f} mm²")

plt.tight_layout()
plt.show()
