In [1]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import os
import nibabel
import imageio
from skimage.transform import resize
from skimage.io import imsave
from skimage.segmentation import mark_boundaries
from skimage.color import gray2rgb
from cv2 import bitwise_and, addWeighted
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, concatenate, Conv2D, MaxPooling2D, Conv2DTranspose
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras import backend as K
from skimage.exposure import rescale_intensity
from sklearn.model_selection import train_test_split

In [2]:
data_path = 'E:/Task02_Heart/'
# downsample by half of the resolution in 2d 
image_rows = 320//2
image_cols = 320//2
Num_Classes = 3 # { 0 -> background, 1 -> heart, 2 -> tumour}

# loading data 

In [3]:
TrainImages_data_path = os.path.join(data_path, 'imagesTr')
TrainMasks_data_path = os.path.join(data_path, 'labelsTr')
file_list = os.listdir(TrainImages_data_path)
imgs_train = []
masks_train = []
for file_name in file_list:
    training_mask = nibabel.load(os.path.join(TrainImages_data_path,file_name))
    training_image = nibabel.load(os.path.join(TrainMasks_data_path,file_name))
    for k in range(training_mask.shape[2]):
        # take the axial cut at z=k plane (xy plane @ k=0,1,...,N) as numpy ndarray
        # downsample each image (slice) by helf, i.e. take every second pixel lengthwise and widthwise of the image
        # resize each image to size: (image_rows, image_cols)
        mask_2d = training_mask.get_fdata()[::2, ::2, k]
        mask_2d = resize(mask_2d, (image_rows, image_cols), preserve_range=True)
        image_2d = training_image.get_fdata()[::2, ::2, k]
        image_2d = resize(image_2d, (image_rows, image_cols), preserve_range=True)
        # if mask_2d contains only one gray level (only '0' values, i.e. black image), it means that there is no mask (organ+tumor)
        if len(np.unique(mask_2d)) != 1:
             masks_train.append(mask_2d)
             imgs_train.append(image_2d)

In [4]:
imgs = np.ndarray(
            (len(imgs_train), image_rows, image_cols), dtype=np.uint8
            )

In [5]:
imgs_mask = np.ndarray(
            (len(masks_train), image_rows, image_cols), dtype=np.uint8
            )

In [6]:
for index, img in enumerate(imgs_train):
        imgs[index, :, :] = img

In [7]:
for index, mask in enumerate(masks_train):
        imgs_mask[index, :, :] = mask

In [8]:
np.save('HrtImgs_train.npy', imgs)
np.save('HrtLbls_train.npy', imgs_mask)
# saved the training data 

# load training data

In [9]:
def load_train_data():
    imgs_train = np.load('LvrImgs_train.npy')
    masks_train = np.load('LvrLbls_train.npy')
    return imgs_train, masks_train

In [10]:
K.set_image_data_format('channels_last')

In [11]:
def gen_dice_coef(y_true, y_pred, smooth=1e-7):
    '''
    Dice coefficient for num_classes labels (classes). Ignores background pixel label 0
    Pass to model as metric during compile statement
    '''
    y_true_f = K.flatten(K.one_hot(K.cast(y_true, 'int32'), num_classes = Num_Classes)[...,1:])
    y_pred_f = K.flatten(y_pred[...,1:])
    intersect = K.sum(y_true_f * y_pred_f, axis=-1)
    denom = K.sum(y_true_f + y_pred_f, axis=-1)
    return K.mean((2. * intersect / (denom + smooth)))

In [12]:
def gen_dice_coef_loss(y_true, y_pred):
    '''
    Dice loss to minimize. Pass to model as loss during compile statement
    '''
    return 1 - gen_dice_coef(y_true, y_pred)

# creating a unet model

In [13]:
inputs = Input((image_rows, image_cols, 1))
conv1 = Conv2D(32, (3, 3), activation='relu', padding='same')(inputs)
conv1 = Conv2D(32, (3, 3), activation='relu', padding='same')(conv1)
pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)

conv2 = Conv2D(64, (3, 3), activation='relu', padding='same')(pool1)
conv2 = Conv2D(64, (3, 3), activation='relu', padding='same')(conv2)
pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)

conv3 = Conv2D(128, (3, 3), activation='relu', padding='same')(pool2)
conv3 = Conv2D(128, (3, 3), activation='relu', padding='same')(conv3)
pool3 = MaxPooling2D(pool_size=(2, 2))(conv3)

conv4 = Conv2D(256, (3, 3), activation='relu', padding='same')(pool3)
conv4 = Conv2D(256, (3, 3), activation='relu', padding='same')(conv4)
pool4 = MaxPooling2D(pool_size=(2, 2))(conv4)

conv5 = Conv2D(512, (3, 3), activation='relu', padding='same')(pool4)
conv5 = Conv2D(512, (3, 3), activation='relu', padding='same')(conv5)

up6 = concatenate([Conv2DTranspose(256, (2, 2), strides=(2, 2), padding='same')(conv5), conv4], axis=3)
conv6 = Conv2D(256, (3, 3), activation='relu', padding='same')(up6)
conv6 = Conv2D(256, (3, 3), activation='relu', padding='same')(conv6)

up7 = concatenate([Conv2DTranspose(128, (2, 2), strides=(2, 2), padding='same')(conv6), conv3], axis=3)
conv7 = Conv2D(128, (3, 3), activation='relu', padding='same')(up7)
conv7 = Conv2D(128, (3, 3), activation='relu', padding='same')(conv7)

up8 = concatenate([Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(conv7), conv2], axis=3)
conv8 = Conv2D(64, (3, 3), activation='relu', padding='same')(up8)
conv8 = Conv2D(64, (3, 3), activation='relu', padding='same')(conv8)

up9 = concatenate([Conv2DTranspose(32, (2, 2), strides=(2, 2), padding='same')(conv8), conv1], axis=3)
conv9 = Conv2D(32, (3, 3), activation='relu', padding='same')(up9)
conv9 = Conv2D(32, (3, 3), activation='relu', padding='same')(conv9)

PredictedMask = Conv2D(Num_Classes, (1, 1), activation='sigmoid')(conv9)
    # last layer is the predicted mask/label image (organ+tumor), each pixel in the set {0,1}
    
model = Model(inputs=[inputs], outputs=[PredictedMask])

model.compile(optimizer=Adam(learning_rate=1e-4), loss=gen_dice_coef_loss, metrics=[gen_dice_coef])

# proceeding to train

In [14]:
imgs_train, imgs_mask_train = load_train_data()

In [15]:
imgs_train = imgs_train[..., np.newaxis]
imgs_mask_train = imgs_mask_train[..., np.newaxis]

In [16]:
imgs_train = imgs_train.astype('float32')
mean = np.mean(imgs_train)
std = np.std(imgs_train)
imgs_train -= mean
imgs_train /= std

In [17]:
imgs_mask_train = imgs_mask_train.astype('float32')

In [18]:
model_checkpoint = ModelCheckpoint('weights_heart.h5', monitor='val_loss', save_best_only=True)

imgs_train, imgs_test, imgs_mask_train, imgs_mask_test = train_test_split(imgs_train,imgs_mask_train,shuffle = True )

In [None]:
 history=model.fit(imgs_train, imgs_mask_train, verbose=1, shuffle=True,
              validation_split=0.2,
              callbacks=[model_checkpoint])

11/57 [====>.........................] - ETA: 5:21 - loss: 1.0000 - gen_dice_coef: 0.0000e+00

model.evaluate(imgs_test, imgs_mask_test)

# testing data

In [None]:
TestImages_data_path = os.path.join(data_path, 'imagesTs')
file_list = os.listdir(TestImages_data_path)
imgs_test = []
for file_name in file_list:
    img = nibabel.load(os.path.join(TestImages_data_path,file_name))
    for k in range(img.shape[2]):
        img_2d = np.array(img.get_fdata()[::2, ::2, k])
        img_2d = resize(img_2d, (image_rows, image_cols), preserve_range=True)
        imgs_test.append(img_2d)

In [None]:
imgst = np.ndarray(
            (len(imgs_test), image_rows, image_cols), dtype=np.uint8
            )

In [None]:
for index, imge in enumerate(imgs_test):
    imgst[index, :, :] = imge

In [None]:
 np.save('HrtImgs_test.npy', imgst)

In [None]:
def load_test_data():
    imgs_test = np.load('HrtImgs_test.npy')
    return imgs_test

In [None]:
imgs_test = load_test_data()

In [None]:
imgs_test = imgs_test[..., np.newaxis]

In [None]:
imgs_test = imgs_test.astype('float32')
mean = np.mean(imgs_test)
std = np.std(imgs_test)
imgs_test -= mean
imgs_test /= std

In [None]:
model.load_weights('weights_heart.h5')

In [None]:
imgs_mask_test = model.predict(imgs_test, verbose=1)

In [None]:
np.save('Predicted_HrtMasks.npy', imgs_mask_test)

In [None]:
pred_dir = 'Predicted_Heart_Masks'
if not os.path.exists(pred_dir):
    os.mkdir(pred_dir)

In [None]:
for k in range(len(imgs_test)):        
        pred_mask = model.predict(imgs_test[k,:,:,:][np.newaxis,...])   # shape: (1, 256, 256, 3)
        pred_mask = tf.argmax(pred_mask, axis=-1)
        pred_mask = pred_mask[..., tf.newaxis]
        pred_mask = pred_mask[0]
        pred_mask = tf.keras.preprocessing.image.array_to_img(pred_mask)
        pred_mask = tf.keras.preprocessing.image.img_to_array(pred_mask)[:,:,0]
        mask = pred_mask.astype('uint8')

        testImg = rescale_intensity(imgs_test[k,:,:,0], out_range=(0,255))
        testImg = testImg.astype('uint8')
        testImgRGB = gray2rgb(testImg)

        blueImg = np.zeros(testImgRGB.shape, testImgRGB.dtype)
        blueImg[:,:] = (0, 0, 255)
        blueMask = bitwise_and(blueImg, blueImg, mask = mask)
        blendos = addWeighted(blueMask, 1, testImgRGB, 1, 0)
        sgmntdImg = mark_boundaries(blendos, mask, color = (0.8, 0.5, 0.38))
        imsave(os.path.join(pred_dir, str(k) + '_pred.png'), sgmntdImg)
    

In [None]:
ImgsNames = sorted(os.listdir(pred_dir), key=len)
ImgsNames = ImgsNames[2513:2597:]
Imgs = []
for image_name in ImgsNames:
    Imgs.append(imageio.imread(os.path.join(pred_dir,image_name)))

In [None]:
List = [0.1]*len(ImgsNames)
List[13] = 1
imageio.mimwrite('animated_heart.gif', Imgs, duration = List)

In [None]:
plt.figure()
plt.plot(history.history['gen_dice_coef'])
plt.plot(history.history['val_gen_dice_coef'])
plt.title('Generalized Dice Coefficient (Liver)')
plt.ylabel('Dice Coeff')
plt.xlabel('Epoch')
plt.legend(['Train', 'Test'], loc='upper left')
plt.show()

In [None]:
plt.figure()
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Generalized Dice Loss (Liver)')
plt.ylabel('Dice Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Test'], loc='upper right')
plt.show()