In [None]:
import nibabel as nib
import numpy as np
from scipy import ndimage
from tqdm import tqdm
import os
import random
import tensorflow as tf
from tensorflow import keras
from keras import layers
from sklearn.metrics import confusion_matrix, recall_score, accuracy_score, roc_auc_score

# **Preprocessing**

## Function: Read images and a header (metadata) from a NIfTI file

In [None]:
def read_image_header(path):
    load = nib.load(path)
    image = load.get_fdata()
    header = load.header
    return image, header

## Functions: Preprocessing for original images

In [None]:
# scale the CT density (Hounsfield Units) of the images after extracting brain and hematoma
def density_scaling(image):
    image[image < 0] = 0  # use the area with Hounsfield Units between 0 and 100,
    image[image > 100] = 0  # which includes brain and hematoma
    image = image / 100
    return image

# reslice and align the number of slices
def reslice_and_align_slice(image, header, new_spacing=2, new_slices=80):
    # reslice - set new spacing (new_spacing), with which reslice the images
    z_axis_spacing = header['pixdim'][3].round(2)  # get the image spacing from the header
    spacing_factor = z_axis_spacing / new_spacing
    image = ndimage.zoom(image, (1, 1, spacing_factor))

    # align the number of slices - set the new number of slices (new_slices)
    slices = header['dim'][3]  # get the number of slices from the header
    slices_resliced = slices * spacing_factor  # culculate the number of slices after reslicing
    slices_resliced_ = int(slices_resliced.round(0))
    if slices_resliced_ > new_slices:  # eliminate redundant slices without brain to match the new number of slices
        num_reduce = slices_resliced_ - new_slices
        image = np.delete(image, np.s_[:num_reduce], 2)
        return image
    elif slices_resliced_ < new_slices:  # add slices with values of zero to match the new number of slices
        num_add = new_slices - slices_resliced_
        ndarray_zero = np.zeros((image.shape[0], image.shape[1], num_add))
        image = np.concatenate([ndarray_zero, image], 2)
        return image
    else:
        return image

# unify the pixel size
def unify(image, header):
    # calculate the magnification for resizing the images assuming the new pixel size is 0.5 x 0.5
    x_pixel = header['pixdim'][1]  # Since the pixel is square, only the x-axis of the pixel is calculated.
    magnification = x_pixel / 0.5
    # unify
    image = ndimage.zoom(image, (magnification, magnification, 1))
    # padding to restore the image shape to '512 x 512 x 80' after the unification
    padding_0 = int((513 - image.shape[0]) / 2)
    padding_1 = int((513 - image.shape[1]) / 2)
    ndarray_zero = np.zeros((padding_0, image.shape[1], image.shape[2]))  # anterior of the brain
    image = np.concatenate([ndarray_zero, image], 0)
    ndarray_zero = np.zeros((padding_0, image.shape[1], image.shape[2]))  # posterior of the brain
    image = np.concatenate([image, ndarray_zero], 0)
    ndarray_zero = np.zeros((image.shape[0], padding_1, image.shape[2]))  # right of the brain
    image = np.concatenate([ndarray_zero, image], 1)
    ndarray_zero = np.zeros((image.shape[0], padding_1, image.shape[2]))  # left of the brain
    image = np.concatenate([image, ndarray_zero], 1)
    image[image < 0] = 0  # During the unification process, some pixel values may become
    image[image > 1] = 1  # slightly lower than 0 or higher than 1, so the excess are aligned to 0 or 1.
    return image

# resize the images
def resize(image,
           image_size=256  # the image size to be created
           ):
    # resize the image size '512 x 512' to the size specified by the parameter
    image = ndimage.zoom(image, (image_size/512, image_size/512, 1))
    image[image < 0] = 0  # During the resizing process, some pixel values may become
    image[image > 1] = 1  # slightly lower than 0 or higher than 1, so the excess are aligned to 0 or 1.
    return image

# get processed images using above defined functions
def get_processed_images(path):
    image, header = read_image_header(path)
    image = density_scaling(image)
    image = reslice_and_align_slice(image, header)
    image = unify(image, header)
    image = resize(image)
    # rotate the images by 90 degrees
    image = ndimage.rotate(image, 90, reshape=False)
    return image.round(10)

## Functions: Preprocessing for masked images

In [None]:
# reslice and align the number of slices
def reslice_and_align_slice_mask(image, header, new_spacing=2, new_slices=80):
    # reslice - set new spacing (new_spacing), with which reslice the images
    z_axis_spacing = header['pixdim'][3].round(2)  # get the image spacing from the header
    spacing_factor = z_axis_spacing / new_spacing
    image = ndimage.zoom(image, (1, 1, spacing_factor))
    image[image < 0.2] = 0
    image[image >= 0.2] = 1

    # align the number of slices - set the new number of slices (new_slices)
    slices = header['dim'][3]  # get the number of slices from the header
    slices_resliced = slices * spacing_factor  # culculate the number of slices after reslicing
    slices_resliced_ = int(slices_resliced.round(0))
    if slices_resliced_ > new_slices:  # eliminate redundant slices without brain to match the new number of slices
        num_reduce = slices_resliced_ - new_slices
        image = np.delete(image, np.s_[:num_reduce], 2)
        return image
    elif slices_resliced_ < new_slices:  # add slices with values of zero to match the new number of slices
        num_add = new_slices - slices_resliced_
        ndarray_zero = np.zeros((image.shape[0], image.shape[1], num_add))
        image = np.concatenate([ndarray_zero, image], 2)
        return image
    else:
        return image

# unify the pixel size
def unify_mask(image, header):
    # calculate the magnification for resizing the images assuming the new pixel size is 0.5 x 0.5
    x_pixel = header['pixdim'][1]  # Since the pixel is square, only the x-axis of the pixel is calculated.
    magnification = x_pixel / 0.5
    # unify
    image = ndimage.zoom(image, (magnification, magnification, 1))
    # padding to restore the image shape to '512 x 512 x 80' after the unification
    padding_0 = int((513 - image.shape[0]) / 2)
    padding_1 = int((513 - image.shape[1]) / 2)
    ndarray_zero = np.zeros((padding_0, image.shape[1], image.shape[2]))  # anterior of the brain
    image = np.concatenate([ndarray_zero, image], 0)
    ndarray_zero = np.zeros((padding_0, image.shape[1], image.shape[2]))  # posterior of the brain
    image = np.concatenate([image, ndarray_zero], 0)
    ndarray_zero = np.zeros((image.shape[0], padding_1, image.shape[2]))  # right of the brain
    image = np.concatenate([ndarray_zero, image], 1)
    ndarray_zero = np.zeros((image.shape[0], padding_1, image.shape[2]))  # left of the brain
    image = np.concatenate([image, ndarray_zero], 1)
    image[image < 0.2] = 0   # after the unification, the values of the images are no longer binary,
    image[image >= 0.2] = 1  # therefore, binarize with a threshold of 0.2
    return image

# resize the images
def resize_mask(image,
                image_size=256  # the image size to be created
               ):
    # resize the image size '512 x 512' to the size specified by the parameter
    image = ndimage.zoom(image, (image_size/512, image_size/512, 1))
    image[image < 0.2] = 0   # after resizing, the values of the images are no longer binary,
    image[image >= 0.2] = 1  # therefore, binarize with a threshold of 0.2
    return image

# get processed images using above defined functions
def get_processed_images_mask(path):
    image, header = read_image_header(path)
    image = reslice_and_align_slice_mask(image, header)
    image = unify_mask(image, header)
    image = resize_mask(image)
    # rotate the images by 90 degrees
    image = ndimage.rotate(image, 90, reshape=False)
    return image.round(10)

## Function: Extract hematoma only

In [None]:
def extract_hematoma(path, path_mask):
    list = [os.path.join(path, i) for i in sorted(os.listdir(path))]
    original = np.array([get_processed_images(j) for j in tqdm(list)])
    original = original.astype('float32')
    list_mask = [os.path.join(path_mask, i) for i in sorted(os.listdir(path_mask))]
    mask = np.array([get_processed_images_mask(j) for j in tqdm(list_mask)])
    mask = mask.astype('float32')
    hematoma = original - 1 + mask
    hematoma[hematoma <= 0] = 0
    return hematoma

## Specify the paths where NIfTI files are stored

The NIfTI files should be stored in each directory according to the following classification method.
*   training and validation, expansion, original CT images
*   training and validation, expansion, masked images
*   training and validation, no expansion, original CT images
*   training and validation, no expansion, masked images
*   test, expansion, original CT images
*   test, expansion, masked images
*   test, no expansion, original CT images
*   test, no expansion, masked images

In [None]:
path_expansion_train_val = "PATH"
path_expansion_train_val_mask = "PATH"
path_no_expansion_train_val = "PATH"
path_no_expansion_train_val_mask = "PATH"
path_expansion_test = "PATH"
path_expansion_test_mask = "PATH"
path_no_expansion_test = "PATH"
path_no_expansion_test_mask = "PATH"

## Create data arrays

In [None]:
list_expansion_train_val = [os.path.join(path_expansion_train_val, i) for i in sorted(os.listdir(path_expansion_train_val))]
expansion_train_val = np.array([get_processed_images(j) for j in tqdm(list_expansion_train_val)])
list_no_expansion_train_val = [os.path.join(path_no_expansion_train_val, i) for i in sorted(os.listdir(path_no_expansion_train_val))]
no_expansion_train_val = np.array([get_processed_images(j) for j in tqdm(list_no_expansion_train_val)])
list_expansion_test = [os.path.join(path_expansion_test, i) for i in sorted(os.listdir(path_expansion_test))]
expansion_test = np.array([get_processed_images(j) for j in tqdm(list_expansion_test)])
list_no_expansion_test = [os.path.join(path_no_expansion_test, i) for i in sorted(os.listdir(path_no_expansion_test))]
no_expansion_test = np.array([get_processed_images(j) for j in tqdm(list_no_expansion_test)])

## Convert array type to float32



In [None]:
expansion_train_val = expansion_train_val.astype('float32')
no_expansion_train_val = no_expansion_train_val.astype('float32')
expansion_test = expansion_test.astype('float32')
no_expansion_test = no_expansion_test.astype('float32')

## Shuffle the data in the arrays



In [None]:
random.seed(0)
np.random.shuffle(expansion_train_val)
np.random.shuffle(no_expansion_train_val)

## 70% were assigned to the training set and the rest to the validation set

In [None]:
expansion_train = expansion_train_val[:int(len(expansion_train_val)*0.7)]
expansion_val = expansion_train_val[int(len(expansion_train_val)*0.7):]
no_expansion_train = no_expansion_train_val[:int(len(no_expansion_train_val)*0.7)]
no_expansion_val = no_expansion_train_val[int(len(no_expansion_train_val)*0.7):]

## Data augmentation by rotating images

In [None]:
from tqdm import tqdm
from scipy import ndimage
angles = [30]
for j in range(len(angles)):
    for i in tqdm(range(len(expansion_train))):
        rotation = ndimage.rotate(expansion_train[i], angles[j], reshape=False)
        rotation[rotation < 0] = 0
        rotation[rotation > 1] = 1
        rotation = np.expand_dims(rotation, axis=0)
        expansion_train = np.concatenate((expansion_train, rotation), axis=0)

## Data augmentation by flipping images

In [None]:
expansion_train = np.concatenate((expansion_train, np.flip(expansion_train, 2)), axis=0)

## Labeling

In [None]:
label_expansion_train = np.array([1 for i in range(len(expansion_train))])
label_expansion_val = np.array([1 for i in range(len(expansion_val))])
label_no_expansion_train = np.array([0 for i in range(len(no_expansion_train))])
label_no_expansion_val = np.array([0 for i in range(len(no_expansion_val))])
label_expansion_test = np.array([1 for i in range(len(expansion_test))])
label_no_expansion_test = np.array([0 for i in range(len(no_expansion_test))])

## Concatenate expansion and no expansion arrays

In [None]:
x_train = np.concatenate((expansion_train, no_expansion_train), axis=0)
x_val = np.concatenate((expansion_val, no_expansion_val), axis=0)
y_train = np.concatenate((label_expansion_train, label_no_expansion_train), axis=0)
y_val = np.concatenate((label_expansion_val, label_no_expansion_val), axis=0)
x_test = np.concatenate((expansion_test, no_expansion_test), axis=0)
y_test = np.concatenate((label_expansion_test, label_no_expansion_test), axis=0)

## Add a dimension to arrays

In [None]:
x_train = tf.expand_dims(x_train, axis=4)
x_val = tf.expand_dims(x_val, axis=4)
x_test = tf.expand_dims(x_test, axis=4)

#  **Training and validation**

## Model architecture

In [None]:
def model_image(height=expansion_train.shape[1], width=expansion_train.shape[2], number_of_slices=expansion_train.shape[3]):

    inputs = keras.Input((height, width, number_of_slices, 1))

    l = layers.Conv3D(filters=64, kernel_size=(19,19,7), kernel_initializer="he_normal")(inputs)
    l = layers.BatchNormalization()(l)
    l = layers.Activation("relu")(l)
    l = layers.MaxPool3D(pool_size=2)(l)

    l = layers.Conv3D(filters=64, kernel_size=(19,19,7), kernel_initializer="he_normal")(l)
    l = layers.BatchNormalization()(l)
    l = layers.Activation("relu")(l)
    l = layers.MaxPool3D(pool_size=2)(l)

    l = layers.Conv3D(filters=128, kernel_size=(14,14,5), kernel_initializer="he_normal")(l)
    l = layers.BatchNormalization()(l)
    l = layers.Activation("relu")(l)
    l = layers.MaxPool3D(pool_size=2)(l)

    l = layers.Conv3D(filters=256, kernel_size=(11,11,4), kernel_initializer="he_normal")(l)
    l = layers.BatchNormalization()(l)
    l = layers.Activation("relu")(l)
    l = layers.MaxPool3D(pool_size=2)(l)

    l = layers.GlobalAveragePooling3D()(l)
    l = layers.Dense(units=512, activation="relu", kernel_initializer="he_normal")(l)
    l = layers.Dropout(0.3)(l)

    outputs = layers.Dense(units=1, activation="sigmoid")(l)

    model = keras.Model(inputs, outputs, name="3DCNN")
    return model

tf.random.set_seed(42)
model = model_image()
model.summary()

## Compile and fit model

In [None]:
model.compile(loss="binary_crossentropy", optimizer="adam", metrics=["AUC", tf.keras.metrics.Recall()])
checkpoint_cb = keras.callbacks.ModelCheckpoint(filepath=os.path.join('PATH', '{epoch:03d}.h5'))
model.fit(x_train, y_train, validation_data=(x_val, y_val), batch_size=2, epochs=70, callbacks=[checkpoint_cb])

# **Test**

## Specify the path to the HFD5 file that had better sensitivity and AUC in the validation

In [None]:
model.load_weights("PATH")

## Results

In [None]:
y_prediction = model.predict(x_test, batch_size=2, verbose=1)
y_prediction[y_prediction < 0.5] = 0
y_prediction[y_prediction >= 0.5] = 1
y_prediction = y_prediction.astype("int64")
result = confusion_matrix(y_test, y_prediction)
sensitivity = recall_score(y_test, y_prediction)
specificity = recall_score(y_test, y_prediction, pos_label=0)
accuracy = accuracy_score(y_test, y_prediction)
auc = roc_auc_score(y_test, y_prediction)
print(result)
print("sensitivity: {:.3f}".format(sensitivity))
print("specificity: {:.3f}".format(specificity))
print("accuracy: {:.3f}".format(accuracy))
print("AUC: {:.3f}".format(auc))