In [None]:
import os
import tarfile
import fnmatch

import numpy as np
import pandas as pd
import nibabel as nib
from collections import Counter

from sklearn.preprocessing import normalize as sklearn_normalize
from skimage.morphology import binary_opening


from matplotlib import pyplot as plt


import tensorflow as tf
import keras
from keras.models import *
from keras.layers import *
from keras.optimizers import *
from keras.callbacks import ModelCheckpoint

# Preprocessing

### Data loading

### Preprocessing pipeline

General data preprocessing pipeline class for all tasks.

In [None]:
from src.loader import DatasetLoader

pipeline = DatasetLoader("Task02_Heart")
x_train, y_train, x_test = pipeline.get_x_train(), pipeline.get_y_train(), pipeline.get_x_test()
# pipeline.display_train_set()

### Unbalanced Dataset Handling

In [None]:
from sklearn.utils.class_weight import compute_class_weight

class_wights = compute_class_weight(class_weight='balanced', classes=np.array([0, 1]), 
                                    y=y_train.flatten())
class_wights = {i : w for i,w in enumerate(class_wights)}

### Create Custom Metrics/Loss - Dice coeff loss

In [None]:
from src.model import get_unet

input_img = Input((pipeline.img_shape[0], pipeline.img_shape[1], 1), name='img')
model = get_unet(input_img, n_filters=16, dropout=0.05, batchnorm=True)
model.compile(optimizer=legacy.Adam())
model.summary()

# Training Model

In [None]:
from datetime import datetime

epoch = 50
batch_size = 32

logdir = PATH + "Model/logs/unet-batch_size-{}-epochs-{}-loss-{}.h5".format(batch_size, epoch, 'dice_coef_loss') + datetime.now().strftime("%Y%m%d-%H%M%S")
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=logdir)

In [None]:
history = model.fit(x = x_train.reshape((x_train.shape[0], x_train.shape[1], x_train.shape[2] ,1)),
                    y = y_train.reshape((y_train.shape[0], y_train.shape[1], y_train.shape[2] ,1)),
                    batch_size=batch_size, epochs = epoch,
                    validation_split=0.2,
                    class_weight=class_wights,
                    callbacks=[tensorboard_callback,
                               ModelCheckpoint(
                                   filepath=PATH + "Model/unet-pipeline-batch_size-{}-epochs-{}-loss-{}.h5".format(batch_size, epoch, 'dice_coef_loss'), 
                                   verbose=1, save_best_only=True, save_weights_only=False)                            
])

# Training Metrics Result

In [None]:
print(history.history.keys())
#  "Accuracy"
plt.plot(history.history['dice_coef'])
plt.plot(history.history['val_dice_coef'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='upper left')
plt.show()
# "Loss"
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='upper left')
plt.show()

# Load Model and Predict on Test Data

In [None]:
from src.model import dice_coef_loss, dice_coef
PATH = './'
model_loaded = load_model(
    PATH + 'Model/unet-pipeline-batch_size-32-epochs-50-loss-dice_coef_loss.h5', 
    custom_objects={'dice_coef_loss': dice_coef_loss, 'dice_coef': dice_coef})

In [None]:
y_train = model_loaded.predict(x_train.reshape((x_train.shape[0], x_train.shape[1], x_train.shape[2] ,1)))
y_train = y_train.reshape((y_train.shape[0], y_train.shape[1], y_train.shape[2]))
y_test_post_process = pipeline.postprocess(y_train)

In [None]:
def display_result(x_dataset, y_dataset, nb_patient=10, slice_index=30) :
  fig = plt.figure(figsize= (7, 50), dpi = 90)

  for i in range(nb_patient):
    original = x_dataset[i][:,:,slice_index]
    plt.subplot(10, 2, i*2+1)
    plt.imshow(original)
    plt.subplots_adjust(wspace = 0)
    plt.title("original")
    plt.axis('off')

    mask = y_dataset[i][:, :, slice_index] == 1
    tmp = np.copy(x_dataset[i][:, :, slice_index])
    tmp[mask] = 3000
    # dice = np.sum(tmp[tmp==k])*2.0 / (np.sum(seg) + np.sum(gt))
    # print 'Dice similarity score is {}'.format(dice)

    plt.subplot(10, 2, i*2+2)
    plt.imshow(tmp)
    plt.title("PREDICTION")
    plt.subplots_adjust(wspace = 0)
    plt.axis('off')

def convert_to_dataset(data: np.array, data_size: list, nb_patient=10):
  start = 0
  dataset = []
  for i in range(nb_patient) :
    original = data[start:start+data_size[i]+1].T
    dataset.append(original)
    start += data_size[i]+1
  return dataset

# Convert to each one
data_size = pipeline.x_train_len
y_train_dataset = convert_to_dataset(y_test_post_process, data_size)

In [None]:
def display_result(x_dataset, y_dataset, nb_patient=10, slice_index=30) :
  fig = plt.figure(figsize= (7, 50), dpi = 90)

  for i in range(nb_patient):
    original = x_dataset[i][:,:,slice_index]
    plt.subplot(10, 2, i*2+1)
    plt.imshow(original)
    plt.subplots_adjust(wspace = 0)
    plt.title("original")
    plt.axis('off')

    mask = y_dataset[i][:, :, slice_index] == 1
    tmp = np.copy(x_dataset[i][:, :, slice_index])
    tmp[mask] = 3000
    # dice = np.sum(tmp[tmp==k])*2.0 / (np.sum(seg) + np.sum(gt))
    # print 'Dice similarity score is {}'.format(dice)

    plt.subplot(10, 2, i*2+2)
    plt.imshow(tmp)
    plt.title("PREDICTION")
    plt.subplots_adjust(wspace = 0)
    plt.axis('off')


def convert_to_dataset(data: np.array, data_size: list, nb_patient=10):
  start = 0
  dataset = []
  for i in range(nb_patient) :
    original = data[start:start+data_size[i]+1].T
    dataset.append(original)
    start += data_size[i]+1
  return dataset

# Convert to each one
data_size = pipeline.x_test_size
x_test_dataset = convert_to_dataset(x_test, data_size)
y_test_dataset = convert_to_dataset(y_test_post_process, data_size)
display_result(x_test_dataset, y_test_dataset)