In [2]:
!pip install Medpy



In [18]:
import copy
import os
import numpy as np
from medpy.io import load, save
import nibabel
from keras import backend as K
import keras
import tensorflow as tf
from keras.layers import Add, Concatenate, Multiply, Reshape, Dense
from keras.layers import Input
from keras.layers.convolutional import Conv2D, Conv2DTranspose
from keras.layers.convolutional import UpSampling2D
from keras.layers.core import Activation
from keras.layers.normalization import BatchNormalization
from keras.models import Model
from keras.optimizers import Adam, Nadam
from skimage.transform import resize
from keras.callbacks import ModelCheckpoint
from sklearn.metrics import recall_score, precision_score
import cv2
from tensorflow.keras.metrics import Precision, Recall

image_rows = 256
image_cols = 256
smooth = 1.
# Download the weight and adjust the path accordingly
weight_path = "/content/drive/My Drive/3DIRCAD-Result/weight/weights.100-0.01.h5"

K.set_image_data_format('channels_last') 


In [10]:
#-----Create folders-----
# Original test folder, this is where original 3DIRCAD dataset is put.
testing_folder = '/content/drive/My Drive/3DIRCAD/Data'
# After 3DIRCAD dataset is processed, it is saved here.
processed_testing_folder = '/content/drive/My Drive/3DIRCAD-Preprocessed-test'
# This is where 3DIRCAD dataset is transformed into numpy array file saved. 
numpy_folder = '/content/drive/My Drive/3DIRCAD-Preprocessed-test/numpy'

if not os.path.exists(testing_folder):
  print('Creat test folder')
  os.mkdir(testing_folder)
else:
  print('Test folder exists')

if not os.path.exists(processed_testing_folder):
  print('Creat processed test folder')
  os.mkdir(processed_testing_folder)
else:
  print('Processed test folder exists')

if not os.path.exists(numpy_folder):
  print('Creat numpy array folder')
  os.mkdir(numpy_folder)
else:
  print('Numpy array folder exists')

Test folder exists
Processed test folder exists
Creat numpy array folder


In [4]:
# Use for truncate HU value of 3DIRCAD dataset
def truncate_HU_value(range1, range2, img_path, save_path):
    print("*** Truncating HU value to eliminate superfluous information ***")
    for idx in range(range1, range2):
      # Due to the naming convention of 3DIRCAD dataset
        if idx < 10:
          img, img_header = load(img_path + '/ircad_e0' + str(idx) + '_orig.nii')
          img[img < -200] = -200
          img[img > 250] = 250
          img = np.array(img, dtype='int16')
          print('Saving image ' + str(idx))
          save(img, save_path + '/' + 'cad-volume-' + str(idx) + '.nii')
        else:
          img, img_header = load(img_path + '/ircad_e' + str(idx) + '_orig.nii')
          img[img < -200] = -200
          img[img > 250] = 250
          img = np.array(img, dtype='int16')
          print('Saving image ' + str(idx))
          save(img, save_path + '/' + 'cad-volume-' + str(idx) + '.nii')



# Remove tumor label of 3DIRCAD dataset
def remove_tumor_label(range1, range2, img_path, save_path):
    print("*** Removing tumor label ***")
    for idx in range(range1, range2):
        # Due to the naming convention of 3DIRCAD dataset
        if idx < 10:
          img, img_header = load(img_path + '/ircad_e0' + str(idx) + '_liver.nii')
          img[img == 2] = 1
          img = np.array(img, dtype='uint8')
          print('Saving image ' + str(idx))
          save(img, save_path + '/' + 'cad-segmentation-' + str(idx) + '.nii')
        else:
          img, img_header = load(img_path + '/ircad_e' + str(idx) + '_liver.nii')
          img[img == 2] = 1
          img = np.array(img, dtype='uint8')
          print('Saving image ' + str(idx))
          save(img, save_path + '/' + 'cad-segmentation-' + str(idx) + '.nii')



truncate_HU_value(range1=1, range2=21, img_path=testing_folder, save_path=processed_testing_folder)
remove_tumor_label(range1=1, range2=21, img_path=testing_folder, save_path=processed_testing_folder)

*** Truncating HU value to eliminate superfluous information ***
Saving image 1
Saving image 2
Saving image 3
Saving image 4
Saving image 5
Saving image 6
Saving image 7
Saving image 8
Saving image 9
Saving image 10
Saving image 11
Saving image 12
Saving image 13
Saving image 14
Saving image 15
Saving image 16
Saving image 17
Saving image 18
Saving image 19
Saving image 20
*** Removing tumor label ***
Saving image 1
Saving image 2
Saving image 3
Saving image 4
Saving image 5
Saving image 6
Saving image 7
Saving image 8
Saving image 9
Saving image 10
Saving image 11
Saving image 12
Saving image 13
Saving image 14
Saving image 15
Saving image 16
Saving image 17
Saving image 18
Saving image 19
Saving image 20


In [12]:
def create_test_data():
    print('-' * 30)
    print('Creating test data...')
    print('-' * 30)

    imgs_test = []
    masks_test = []
    testing_images_files = []
    testing_masks_files = []
    
    # List out all of the item available in the folder
    item_list = os.listdir(testing_folder)
    np.sort(item_list)
    
    # Adjust the range based on which CT scan you want to test
    for idx in range(1,2):
      testing_images_files.append('cad-volume-' + str(idx) + '.nii')
      testing_masks_files.append('cad-segmentation-' + str(idx) + '.nii')

    # Load up the CT scan and transformed them into arrays
    for orig, liver in zip(testing_images_files, testing_masks_files):
        print('Processing: ' + orig)
        print('Processing: ' + liver)
        testing_image = nibabel.load(os.path.join(processed_testing_folder, orig))
        testing_mask = nibabel.load(os.path.join(processed_testing_folder, liver))
        
        print("Total testing slices before eliminate non-liver slice: " + str(testing_image.shape[2]))
        for k in range(testing_image.shape[2]):
            image_2d = np.array(testing_image.get_fdata()[::2, ::2, k])
            mask_2d = np.array(testing_mask.get_fdata()[::2, ::2, k])
            # If you want to test with uncropped data, comment out the line below
            if len(np.unique(mask_2d)) != 1:
              masks_test.append(mask_2d)
              imgs_test.append(image_2d)
        
    
    print("Total testing slices after eliminate non-liver slice: " + str(len(imgs_test)))

    # Transform orginal array (0,1,2) into array with format (2,0,1)
    imgst = np.ndarray((len(imgs_test), image_rows, image_cols), dtype=np.uint8)
    maskst = np.ndarray((len(masks_test), image_rows, image_cols), dtype=np.uint8)

    for index, img in enumerate(imgs_test):
        imgst[index, :, :] = img

    for index, mask in enumerate(masks_test):
        maskst[index, :, :] = mask

    np.save(numpy_folder + '/' + 'imgs_test.npy', imgst)
    np.save(numpy_folder + '/' + 'imgs_mask.npy', maskst)
    print('Saving to .npy files done.')
    

# Load up the numpy array
def load_test_data():
    print('--- Loading test images ---')
    imgs_test = np.load(numpy_folder + '/' + 'imgs_test.npy')
    masks_test = np.load(numpy_folder + '/' + 'imgs_mask.npy')
    return imgs_test, masks_test



if __name__ == '__main__':
    create_test_data()


------------------------------
Creating test data...
------------------------------
Processing: cad-volume-1.nii
Processing: cad-segmentation-1.nii
Total testing slices before eliminate non-liver slice: 129
Total testing slices after eliminate non-liver slice: 98
Saving to .npy files done.


In [20]:
# ResUNet model
def dice_coef(y_true, y_pred):
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    intersection = tf.reduce_sum(y_true_f * y_pred_f)
    return (2. * intersection + smooth) / (tf.reduce_sum(y_true_f) + tf.reduce_sum(y_pred_f) + smooth)


def dice_coef_loss(y_true, y_pred):
    return 1.0 - dice_coef(y_true, y_pred)

def bn_act(x, act=True):
    x = BatchNormalization()(x)
    if act == True:
        x = Activation("relu")(x)
    return x


def conv_block(x, filters, kernel_size=(3, 3), padding="same", strides=1):
    conv = bn_act(x)
    conv = Conv2D(filters, kernel_size, padding=padding, strides=strides)(conv)
    return conv


def stem(x, filters, kernel_size=(3, 3), padding="same", strides=1):
    conv = Conv2D(filters, kernel_size, padding=padding, strides=strides)(x)
    conv = conv_block(conv, filters, kernel_size=kernel_size, padding=padding, strides=strides)

    shortcut = Conv2D(filters, kernel_size=(1, 1), padding=padding, strides=strides)(x)
    shortcut = bn_act(shortcut, act=False)

    output = Add()([conv, shortcut])
    return output


def residual_block(x, filters, kernel_size=(3, 3), padding="same", strides=1):
    res = conv_block(x, filters, kernel_size=kernel_size, padding=padding, strides=strides)
    res = conv_block(res, filters, kernel_size=kernel_size, padding=padding, strides=1)

    shortcut = Conv2D(filters, kernel_size=(1, 1), padding=padding, strides=strides)(x)
    shortcut = bn_act(shortcut, act=False)

    output = Add()([shortcut, res])
    return output


def upsample_concat_block(x, xskip):
    u = UpSampling2D((2, 2))(x)
    c = Concatenate()([u, xskip])
    return c


def ResUNet():
    f = [16, 32, 64, 128, 256]
    inputs = Input((image_rows, image_cols, 1))

    ## Encoder
    e0 = inputs
    e1 = stem(e0, f[0])
    e2 = residual_block(e1, f[1], strides=2)
    e3 = residual_block(e2, f[2], strides=2)
    e4 = residual_block(e3, f[3], strides=2)
    e5 = residual_block(e4, f[4], strides=2)

    ## Bridge
    b0 = conv_block(e5, f[4], strides=1)
    b1 = conv_block(b0, f[4], strides=1)

    ## Decoder
    u1 = upsample_concat_block(b1, e4)
    d1 = residual_block(u1, f[4])

    u2 = upsample_concat_block(d1, e3)
    d2 = residual_block(u2, f[3])

    u3 = upsample_concat_block(d2, e2)
    d3 = residual_block(u3, f[2])

    u4 = upsample_concat_block(d3, e1)
    d4 = residual_block(u4, f[1])

    outputs = Conv2D(1, (1, 1), padding="same", activation="sigmoid")(d4)

    model = Model(inputs=[inputs], outputs=[outputs])

    metrics = [dice_coef, Recall(), Precision()]

    model.compile(optimizer=Adam(lr=1e-4), loss="binary_crossentropy", metrics=metrics)

    return model


# Preprocess test data by adding a new dimension in order to feed the data to network
def preprocess(imgs):
    print('--- Preprocessing images ---')
    imgs_p = np.ndarray((imgs.shape[0], image_rows, image_cols), dtype=np.uint8)
    for i in range(imgs.shape[0]):
        imgs_p[i] = resize(imgs[i], (image_rows, image_cols), preserve_range=True)

    imgs_p = imgs_p[..., np.newaxis]
    return imgs_p


def get_dice_coef(y_true, y_pred):
    intersection = np.sum(y_true * y_pred)
    return (2. * intersection + smooth) / (np.sum(y_true) + np.sum(y_pred) + smooth)

def get_recall(y_true, y_pred):
    y_pred = y_pred > 0.5
    y_pred = y_pred.astype(np.int32)
    m = tf.keras.metrics.Recall()
    m.update_state(y_true, y_pred)
    r = m.result().numpy()
    m.reset_states()
    return r

def get_precision(y_true, y_pred):
    y_pred = y_pred > 0.5
    y_pred = y_pred.astype(np.int32)
    m = tf.keras.metrics.Precision()
    m.update_state(y_true, y_pred)
    r = m.result().numpy()
    m.reset_states()
    return r

def get_metrics(y_true, y_pred):
    y_pred = y_pred.flatten()
    y_true = y_true.flatten()

    dice_coef_val = get_dice_coef(y_true, y_pred)

    y_true = y_true.astype(np.int32)
   
    recall_value = get_recall(y_true, y_pred)
    precision_value = get_precision(y_true, y_pred)

    return [dice_coef_val,recall_value, precision_value]


def predict():
    for idx in range(1):
        print('-' * 30)
        print('Loading model and preprocessing test data...' + str(idx))
        print('-' * 30)

        model = ResUNet()
        model.load_weights(weight_path)

        #  load 3D data
        img_test = np.load('/content/drive/My Drive/3DIRCAD-Preprocessed-test/numpy/imgs_test.npy')

        img_test = preprocess(img_test)
        img_test = img_test.astype('float32')

        mean = np.mean(img_test)  # mean for data centering
        std = np.std(img_test)  # std for data normalization

        img_test -= mean
        img_test /= std

        
        #  load liver mask
        mask = np.load('/content/drive/My Drive/3DIRCAD-Preprocessed-test/numpy/imgs_mask.npy')
        mask = preprocess(mask)
        mask = mask.astype('float32')
        
        print('-' * 30)
        print('Predicting masks on test data...' + str(idx))
        print('-' * 30)

        imgs_mask_test_result = model.predict(img_test, verbose=1)

        #for k in range(len(imgs_mask_test_result)):
            #imgs_mask_test_result[k][:, :, :] = imgs_mask_test_result[k][:, ::-1, :]
        
        result = get_metrics(mask, imgs_mask_test_result)
    
        print(result)
        
        
if __name__ == '__main__':
    predict()

------------------------------
Loading model and preprocessing test data...0
------------------------------
--- Preprocessing images ---
--- Preprocessing images ---
------------------------------
Predicting masks on test data...0
------------------------------
[0.986997569787921, 0.98160905, 0.99244964]
