In [1]:
import os
import cv2
import numpy as np
import nibabel as nib
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from keras.layers import *
import keras.backend as k
from keras.models import *
from keras.optimizers import *
from keras.losses import categorical_crossentropy
from keras.callbacks import ModelCheckpoint, LearningRateScheduler, EarlyStopping

##Mounting google Drive

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


##Setting paths

In [3]:
train_imgs_pth = '/content/drive/MyDrive/heart_data/imagesTr'
test_pth = '/content/drive/MyDrive/heart_data/imagesTs'
train_lbs_pth = '/content/drive/MyDrive/heart_data/labelsTr'

##Extracting each batch (We extract these pictures from NITFI files)

In [4]:
TR_List = sorted(os.listdir(train_imgs_pth))
Lb_List = sorted(os.listdir(train_lbs_pth))
TST_List = os.listdir(test_pth)
train_images_batch = []
train_labels_batch = []
test_images_batch = []
for i in TR_List:
    img_path = os.path.join(train_imgs_pth, i)
    img = nib.load(img_path)
    train_images_batch.append(img)
for j in Lb_List:
    lbl_path = os.path.join(train_lbs_pth, j)
    lbl = nib.load(lbl_path)
    train_labels_batch.append(lbl)
for k in TST_List:
    test_path = os.path.join(test_pth, k)
    test = nib.load(test_path)
    test_images_batch.append(test)

print(f'{len(train_images_batch)} train images Batch loaded successfully.')
print(f'{len(train_labels_batch)} train labels Batch loaded successfully.')
print(f'{len(test_images_batch)} test images Batch loaded successfully.')

20 train images Batch loaded successfully.
20 train labels Batch loaded successfully.
10 test images Batch loaded successfully.


### Extracting Each Batch's Data

In [5]:
X_Train = []
Y_Train = []
X_Test = []

for i in range(len(train_images_batch)):
  im = train_images_batch[i]
  lb = train_labels_batch[i]
  for j in range(im.shape[-1]):
    img_show = im.get_fdata()[:,:,j]
    lbl_show = lb.get_fdata()[:,:,j]
    X_Train.append(img_show)
    Y_Train.append(lbl_show)

##Checking if each batch has equall number of images and ground truth (No missing data)

In [6]:
for i in range(len(train_images_batch)):
  print(f'batch number {i} ===> img {np.shape(train_images_batch[i])[-1]}  \   {np.shape(train_labels_batch[i])[-1]}')


batch number 0 ===> img 130  \   130
batch number 1 ===> img 110  \   110
batch number 2 ===> img 120  \   120
batch number 3 ===> img 130  \   130
batch number 4 ===> img 100  \   100
batch number 5 ===> img 120  \   120
batch number 6 ===> img 120  \   120
batch number 7 ===> img 120  \   120
batch number 8 ===> img 90  \   90
batch number 9 ===> img 120  \   120
batch number 10 ===> img 122  \   122
batch number 11 ===> img 100  \   100
batch number 12 ===> img 110  \   110
batch number 13 ===> img 100  \   100
batch number 14 ===> img 110  \   110
batch number 15 ===> img 110  \   110
batch number 16 ===> img 120  \   120
batch number 17 ===> img 120  \   120
batch number 18 ===> img 109  \   109
batch number 19 ===> img 110  \   110


##Extracting Test Data

In [7]:
for k in range(len(test_images_batch)):
  tst_img = test_images_batch[k]
  for l in range(tst_img.shape[-1]):
    tst_show = tst_img.get_fdata()[:,:,l]
    X_Test.append(tst_show)

In [8]:
print(f'Number of Train images : {np.shape(X_Train)[0]}\nNumber of Train Labels : {np.shape(Y_Train)[0]}\nNumber of Test Labels : {np.shape(X_Test)[0]}')

Number of Train images : 2271
Number of Train Labels : 2271
Number of Test Labels : 1297


  result = asarray(a).shape


##Function for implementing data augmentation

In [9]:
def augment(input_image, input_mask):
   if tf.random.uniform(()) > 0.5:
       # Random flipping of the image and mask
       input_image = tf.image.flip_left_right(input_image)
       input_mask = tf.image.flip_left_right(input_mask)
   return input_image, input_mask

##Preprocessing Function

In [10]:
size = 256
def preprocessing(input_tensor, Type):
  if Type.lower() == 'img':   # checking if input_tensor is an image
    casted = cv2.resize(input_tensor, (size, size))
    c_type = casted.astype('float')
    final = c_type/np.max(c_type)

  elif Type.lower() == 'lbl':   # checking if input_tensor is a ground truth
    casted = cv2.resize(input_tensor, (size, size))
    final = casted
  return final


### PreProcessing Images and Labels

In [11]:
Train_images = []
Train_masks = []
Test_images = []

for i in range(np.shape(X_Train)[0]):
  p_im = preprocessing(X_Train[i], 'img')
  p_lb = preprocessing(Y_Train[i], 'lbl')
  # import pdb; pdb.set_trace()
  if np.size(np.where(p_lb[:] != 0)) != 0:
    Train_images.append(p_im)
    Train_masks.append(p_lb)


##Correcting the data dimension

In [12]:
Train_images = np.stack((Train_images,)*3, axis = -1)
Train_masks = np.expand_dims(Train_masks, axis = -1)

In [13]:
np.shape(Train_images)

(1351, 256, 256, 3)

In [None]:
np.shape(Train_masks)

(1351, 256, 256, 1)

##Defining and Creating UNet Model

In [14]:
IMG_HEIGHT = 256
IMG_WIDTH = 256
IMG_CHANNELS = 3
def U_Net_Segmentation(input_size=(IMG_HEIGHT, IMG_WIDTH, IMG_CHANNELS)):
    
    inputs = Input(input_size)
    # n = Lambda(lambda x:x/255)(inputs)
    
    
    c1 = Conv2D(16, (3,3), activation='elu', kernel_initializer='he_normal',
                padding='same')(inputs)
    c1 = Dropout(0.1)(c1)
    c1 = Conv2D(16, (3,3), activation='elu', kernel_initializer='he_normal',
                padding='same')(c1)
    p1 = MaxPooling2D((2,2))(c1)


    c2 = Conv2D(32, (3,3), activation='elu', kernel_initializer='he_normal',
                padding='same')(p1)
    c2 = Dropout(0.1)(c2)
    c2 = Conv2D(32, (3,3), activation='elu', kernel_initializer='he_normal',
                padding='same')(c2)
    p2 = MaxPooling2D((2,2))(c2)


    c3 = Conv2D(64, (3,3), activation='elu', kernel_initializer='he_normal',
                padding='same')(p2)
    c3 = Dropout(0.2)(c3)
    c3 = Conv2D(64, (3,3), activation='elu', kernel_initializer='he_normal',
                padding='same')(c3)
    p3 = MaxPooling2D((2,2))(c3)


    c4 = Conv2D(128, (3,3), activation='elu', kernel_initializer='he_normal',
                padding='same')(p3)
    c4 = Dropout(0.2)(c4)
    c4 = Conv2D(128, (3,3), activation='elu', kernel_initializer='he_normal',
                padding='same')(c4)
    p4 = MaxPooling2D((2,2))(c4)


    c5 = Conv2D(256, (3,3), activation='elu', kernel_initializer='he_normal',
                padding='same')(p4)
    c5 = Dropout(0.3)(c5)
    c5 = Conv2D(256, (3,3), activation='elu', kernel_initializer='he_normal',
                padding='same')(c5)



    u6 = Conv2DTranspose(128, (2,2), strides=(2,2), padding='same')(c5)
    u6 = concatenate([u6, c4])
    c6 = Conv2D(128, (3,3), activation='elu', kernel_initializer='he_normal',
                padding='same')(u6)
    c6 = Dropout(0.2)(c6)
    c6 = Conv2D(128, (3,3), activation='elu', kernel_initializer='he_normal',
                padding='same')(c6)   


    u7 = Conv2DTranspose(64, (2,2), strides=(2,2), padding='same')(c6)
    u7 = concatenate([u7, c3])
    c7 = Conv2D(64, (3,3), activation='elu', kernel_initializer='he_normal',
                padding='same')(u7)
    c7 = Dropout(0.2)(c7)
    c7 = Conv2D(64, (3,3), activation='elu', kernel_initializer='he_normal',
                padding='same')(c7) 

    u8 = Conv2DTranspose(32, (2,2), strides=(2,2), padding='same')(c7)
    u8 = concatenate([u8, c2])
    c8 = Conv2D(32, (3,3), activation='elu', kernel_initializer='he_normal',
                padding='same')(u8)
    c8 = Dropout(0.1)(c8)
    c8 = Conv2D(32, (3,3), activation='elu', kernel_initializer='he_normal',
                padding='same')(c8) 
    
    
    u9 = Conv2DTranspose(16, (2,2), strides=(2,2), padding='same')(c8)
    u9 = concatenate([u9, c1], axis = 3)
    c9 = Conv2D(16, (3,3), activation='elu', kernel_initializer='he_normal',
                padding='same')(u9)
    c9 = Dropout(0.1)(c9)
    c9 = Conv2D(16, (3,3), activation='elu', kernel_initializer='he_normal',
                padding='same')(c9) 
    
    outputs = Conv2D(1,(1,1), activation='sigmoid')(c9)
    
    model = Model(inputs=[inputs], outputs=[outputs])
    model.compile(optimizer='adam', loss='binary_crossentropy', 
                  metrics=[tf.keras.metrics.BinaryIoU(target_class_ids = [0, 1])])
    model.summary()
    return model

In [15]:
model = U_Net_Segmentation()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 256, 256, 3  0           []                               
                                )]                                                                
                                                                                                  
 conv2d (Conv2D)                (None, 256, 256, 16  448         ['input_1[0][0]']                
                                )                                                                 
                                                                                                  
 dropout (Dropout)              (None, 256, 256, 16  0           ['conv2d[0][0]']                 
                                )                                                             

In [16]:
imageset = 'LA'
backbone = 'UNET'
version = 'v1.0'
model_h5 = 'model-{imageset}-{backbone}-{version}.h5'.format(imageset=imageset, 
                  backbone = backbone, version = version)
model_h5_checkpoint = '{model_h5}.checkpoint'.format(model_h5=model_h5)

earlystopper = EarlyStopping(patience=7, verbose=1)
checkpointer = ModelCheckpoint('segmodel.h5', verbose = 1, save_best_only=True)

## Training

In [None]:
results = model.fit(Train_images, Train_masks, 
                    validation_split=0.1, 
                    batch_size=16,
                    epochs=50,
                    callbacks=[earlystopper, checkpointer])

## Loading the model

In [None]:
new_model = tf.keras.models.load_model('/content/segmodel.h5')

## Preparing test images to be fed to model

In [17]:
Test_images = []

for i in range(np.shape(X_Test)[0]):
  if np.size(np.where(X_Test[i] != 0)) != 0:
    p_test = preprocessing(X_Test[i], 'img')
    Test_images.append(p_test)


##Preparing Test images dimensions

In [None]:
Test_images = np.stack((Test_images,)*3, axis = -1)
np.shape(Test_images)

##Prediction on test images

In [None]:
test = np.expand_dims(Test_images[40], axis = 0)
pred = new_model.predict(test)
temp = np.squeeze(pred, axis = -1)
output = np.squeeze(temp, axis = 0)

