In [1]:
%env SM_FRAMEWORK=tf.keras
import zipfile, os, numpy as np, pickle, yaml, gc, tensorflow as tf
import segmentation_models as sm
import tensorflow_addons as tfa
sys.path.append("..")
from model.resnet3d import Resnet3DBuilder
from model.cnn_model import get_model
from tensorflow import keras
from keras import backend as K
K.clear_session()
nii_size = 384
from segmentation_models import Unet
datatype='3.0T'
os.environ["CUDA_VISIBLE_DEVICES"] = "1"

env: SM_FRAMEWORK=tf.keras
Segmentation Models: using `tf.keras` framework.


Using TensorFlow backend.


S1 Testing Start!

In [2]:
# path test, train
top_layer_path = ['../my_data/datasets/all_seg_data_NL_zm_2/','../my_data/datasets/all_seg_data_NL_zm_2/']

# S1 [img, msk]
S1_img_stack =['T3_image_arr_384_valid.npy', 'T3_masks_arr_384_valid.npy',
               'T1_image_arr_384_valid.npy', 'T1_masks_arr_384_valid.npy',
               'T3_image_arr_384_train.npy', 'T3_masks_arr_384_train.npy']

In [3]:
def S1_dataloader(valid_data='Mix', tune_type='test'):
    if tune_type=='test':
        img_layer_path = top_layer_path[0]
        if valid_data == '3.0T':
            # loading valida data 3.0T + 1.5T: image / masks
            X_valid = np.load(img_layer_path +'/'+ S1_img_stack[0])
            y_valid = np.load(img_layer_path +'/'+ S1_img_stack[1])

        elif valid_data == '1.5T':
            X_valid = np.load(img_layer_path +'/'+ S1_img_stack[2])
            y_valid = np.load(img_layer_path +'/'+ S1_img_stack[3])
    elif tune_type=='train':
        img_layer_path = top_layer_path[1]
        # loading valida data 3.0T + 1.5T: image / masks
        X_valid = np.load(img_layer_path +'/'+ S1_img_stack[4])
        y_valid = np.load(img_layer_path +'/'+ S1_img_stack[5])


    X_valid = np.reshape(X_valid, (X_valid.shape[0]*32,384,384,1))
    y_valid = np.reshape(y_valid, (y_valid.shape[0]*32,384,384,1))
    return X_valid.astype(np.float32), y_valid.astype(np.int8)

In [4]:
def S1_model_loader(weight_path, backbone, mode):
    S1_X_valid, S1_y_valid= S1_dataloader(valid_data=datatype, tune_type=mode)
    print(f'S1 data shape: img {S1_X_valid.shape} msk {S1_y_valid.shape}')
    model = Unet(backbone, encoder_weights=None, input_shape=(None, None, 1))
    model.load_weights(weight_path)
    Results = model.predict(S1_X_valid, batch_size=10, verbose=1)
    return Results, S1_X_valid, S1_y_valid

In [5]:
import pandas as pd
if datatype=='1.5T':
    data_n = 3
elif datatype=='3.0T':
    data_n = 1
S1_model_df = pd.DataFrame(np.load(top_layer_path[0] + '/' + S1_img_stack[data_n]).flatten(), columns=['GT'])

S1_model_df_train = pd.DataFrame(np.load(top_layer_path[1] + '/' + S1_img_stack[5]).flatten(), columns=['GT'])

In [6]:
from scipy.stats import sem
from tqdm import tqdm
sys.path.append("..")
from utils.visual_plt import ClassReport
from sklearn.metrics import accuracy_score
from sklearn.metrics import roc_auc_score
def CI(y_pred, y_true, optimal_th, score_type):
    n_bootstraps = 1000
    rng_seed = 42  # control reproducibility
    bootstrapped_scores = []
    rng = np.random.RandomState(rng_seed)
    for i in tqdm(range(n_bootstraps)):
        indices = rng.randint(0, len(y_pred), len(y_pred))
        if len(np.unique(y_true[indices])) < 2:
            continue
        if score_type=='acc':
            score = accuracy_score(y_true[indices], y_pred[indices]>optimal_th)
        elif score_type=='auc':
            score = roc_auc_score(y_true[indices], y_pred[indices])
        bootstrapped_scores.append(score)
    sorted_scores = np.array(bootstrapped_scores)
    sorted_scores.sort()
    confidence_lower = sorted_scores[int(0.025 * len(sorted_scores))]
    confidence_upper = sorted_scores[int(0.975 * len(sorted_scores))]
    return confidence_lower, confidence_upper

In [7]:
import matplotlib.pyplot as plt
from sklearn import metrics
from sklearn.metrics import auc
from sklearn.metrics import plot_roc_curve, roc_curve, roc_auc_score, auc, roc_curve
from sklearn.metrics import precision_recall_curve, average_precision_score, confusion_matrix
def Find_Optimal_Cutoff(FPR, TPR, threshold):
    y = TPR - FPR
    
    Youden_index = np.argmax(y)  # Only the first occurrence is returned.
    optimal_threshold = threshold[Youden_index]
    point = [FPR[Youden_index], TPR[Youden_index]]
    return optimal_threshold, point


In [8]:
# S1 model weight path
S1_dense = '../my_data/my_weights/S1_DenseNet.hdf5'
S1_res = '../my_data/my_weights/S1_ResNet.hdf5'
S1_vgg = '../my_data/my_weights/S1_VGGNet.hdf5'
S1_weights=[S1_dense, S1_res, S1_vgg]
S1_backbone = ['GT', 'densenet121', 'resnet50', 'vgg16']

In [9]:
class metric:
    def __init__(self,y_true, y_pred):
        self.y_true_f = K.flatten(y_true)
        self.y_pred_f = K.flatten(y_pred)
        self.current = confusion_matrix(K.cast(self.y_true_f, dtype='int8'), K.cast(self.y_pred_f, dtype='int8'), labels=[0, 1])
    def dice(self):
        # intersection = K.sum(self.y_true_f* self.y_pred_f )
        intersection = np.diag(self.current)
        ground_truth_set = self.current.sum(axis=1)
        predicted_set = self.current.sum(axis=0)
        # dice = (2. * intersection) / (K.sum(self.y_true_f) + K.sum(self.y_pred_f ))
        dice = (2. * intersection) / (ground_truth_set + predicted_set)
        return np.mean(dice)
        
    def iou(self):
        # ytrue, ypred is a flatten vector
        # compute mean iou
        intersection = np.diag(self.current)
        ground_truth_set = self.current.sum(axis=1)
        predicted_set = self.current.sum(axis=0)
        union = ground_truth_set + predicted_set - intersection
        IoU = intersection / union.astype(np.float32)
        return np.mean(IoU)

In [10]:
def get_pred(mode_type, d): #d=train or test
    print(f'Start model = {d}')
    print(S1_backbone[i])
    S1_pred, S1_X_valid, S1_y_valid = S1_model_loader(S1_weights[i-1], S1_backbone[i], d)
    S1_pred = np.reshape(S1_pred, (S1_pred.shape[0]//32,32,384,384))

    return S1_pred

In [11]:
fig = plt.figure(figsize=(25, 8))

# grid = plt.GridSpec(nrows=2, ncols=3, figure=fig)
# ax1 = plt.subplot(grid[0, 0])
# ax2 = plt.subplot(grid[0, 1])
# ax3 = plt.subplot(grid[0, 2])
# ax4 = plt.subplot(grid[1, 0:3])

grid = plt.GridSpec(nrows=2, ncols=2, figure=fig)
ax2 = plt.subplot(grid[0, 0])
ax3 = plt.subplot(grid[0, 1])
ax4 = plt.subplot(grid[1, 0:2])


y_flatten = np.array(S1_model_df['GT'])
y_flatten_train = S1_model_df_train['GT'].astype(np.int8)
count=0
result_table1 = []
model_num = []
next_ = 0


color = ['tab:gray','tab:blue','tab:orange','tab:green','tab:red','tab:purple',
         'tab:brown','tab:pink','tab:olive','tab:cyan','b','mediumvioletred']
d_style = ['X','+','H','*','s','4','^','v','o','D','p','d']

for i, backbone in enumerate(S1_backbone):
    if backbone=='GT':
        p_flatten = y_flatten
        p_flatten_train = y_flatten_train
    else:
        p_flatten = (get_pred(i, 'test')).flatten()
        # p_flatten_train = (get_pred(i, 'train')).flatten()
    # print(p_flatten_train.shape)
# -------------------------------Calc iou and dice metrics ---------------------
    metric_class = metric(y_flatten,p_flatten)
    iou = metric_class.iou()
    dice = metric_class.dice()
    print('iou', iou, 'dice', dice)
# -------------------------------plot AUROC and Cut-off Point-------------------
    auc = roc_auc_score(y_flatten.astype(np.int8), p_flatten)
    fpr, tpr, ths = roc_curve(y_flatten.astype(np.int8), p_flatten)
    # fpr2, tpr2, ths2 = roc_curve(y_flatten_train.astype(np.int8), p_flatten_train)
    # optimal_th, optimal_point = Find_Optimal_Cutoff(TPR=tpr2, FPR=fpr2, threshold=ths2)
# -------------------------------plot AUPRC-----------------------------
    ap = average_precision_score(y_flatten.astype(np.int8), p_flatten)
    pre, rec, _ = precision_recall_curve(y_flatten.astype(np.int8), p_flatten)
    
    # ax1.plot(fpr2, tpr2, linestyle='dotted', color=color[count])


    # ----table----
    if backbone=='GT':
        # ax1.plot(optimal_point[0], optimal_point[1], color=color[count], marker=d_style[count], linestyle='dotted', label=f"GT")
        ax2.plot(fpr, tpr, lw=2, linestyle='dotted', color=color[count], label = f'GT')
        ax3.step(rec, pre, lw=2, linestyle='dotted', label = f'GT', alpha=0.7)
    elif backbone !='GT':
        if next_+1<10:
            model_ns = f'Model 0{next_+1}'
            model_num.append(model_ns)
        else:
            model_ns = f'Model {next_+1}'
            model_num.append(model_ns)
        # ax1.plot(optimal_point[0], optimal_point[1], color=color[count], marker=d_style[count], linestyle='dotted', label=f"{model_ns}")
        ax2.plot(fpr, tpr, lw=2, linestyle='dotted', color=color[count], label = f'{model_ns}')
        ax3.step(rec, pre, lw=2, linestyle='dotted', label = f'{model_ns}', alpha=0.7)
        result_table1.append([backbone])
        # p_flatten = S1_model_df[backbone]
        # cl1,cu1 =CI(p_flatten, y_flatten, optimal_th, score_type='auc')

        # cl2,cu2 =CI(p_flatten, y_flatten, optimal_th, score_type='acc')
        # y_proba_th = ((p_flatten > optimal_th).astype(np.int8))
        y_proba_th = ((p_flatten > 0.5).astype(np.int8))
        # print(optimal_th)
        acc = accuracy_score(y_flatten.astype(np.int8), y_proba_th)
        CM = confusion_matrix(y_flatten.astype(np.int8), y_proba_th)
        result_table1[next_].append(f'{auc:.3f}')

        
        # result_table1[next_].append(f'{cl1:.3f} - {cu1:.3f}')
        # result_table1[next_].append(f'{optimal_th:.3f}')
        result_table1[next_].append(f'{acc:.3f}')
        # result_table1[next_].append(f'{cl2:.3f} - {cu2:.3f}')
        result_table1[next_].append(f'{iou:.3f}')
        result_table1[next_].append(f'{dice:.3f}')
        result_table1[next_].append(f'{(CM[1,1]/(CM[1,1]+CM[1,0])):.3f}')
        result_table1[next_].append(f'{(CM[0,0]/(CM[0,0]+CM[0,1])):.3f}')
        result_table1[next_].append(f'{(CM[1,1]/(CM[1,1]+CM[0,1])):.3f}')
        result_table1[next_].append(f'{ap:.3f}')
    # ----table----
        next_+=1
    count+=1
# ax1.plot([0, 1], [0, 1], linestyle="--", lw=3, color='black', alpha=.7)
# ax1.set_xlabel('False Positive Rate', fontsize=12)
# ax1.set_ylabel('True Positive Rate', fontsize=12)
# ax1.set_title(f'Only S1 - Train Cut-Off Point Plot', fontsize=18)
# ax1.legend(fontsize=10, loc='lower right')

ax2.plot([0, 1], [0, 1], linestyle='--', lw=3, color='black', alpha=.7)
ax2.legend(fontsize=10, loc='lower right')
ax2.set_title(f'Only S1 - {datatype} - Test AUROC Plot', fontsize=18)
ax2.set_xlabel('False Positive Rate', fontsize=12)
ax2.set_ylabel('True Positive Rate', fontsize=12)
ax2.set_xlim([-0.05, 1.05])
ax2.set_ylim([-0.05, 1.05])

ax3.legend(fontsize=10, loc='lower right')
ax3.set_title(f'Only S1 - {datatype} - Test AUPRC Plot', fontsize=18)
ax3.set_xlabel('Recall', fontsize=12)
ax3.set_ylabel('Precision', fontsize=12)
ax3.set_xlim([-0.05, 1.05])
ax3.set_ylim([-0.05, 1.05])


# result_table1 = pd.DataFrame(result_table1, columns = ['Model Name', 'AUROC', 'AUC CI',
#                                                         'Threshold', 'Accuracy', 'Accuracy CI','Dice', 'IoU',
#                                                        'Sensitivity', 'Specificity', 'Precision','AUPRC'])

result_table1 = pd.DataFrame(result_table1, columns = ['Model Name', 'AUROC', 'Accuracy','Dice', 'IoU',
                                                       'Sensitivity', 'Specificity', 'Precision','AUPRC'])                                                       

ax4.axis('off')
ax4.axis('tight')
tab1 = ax4.table(cellText=result_table1.values, rowLabels = model_num, colLabels=result_table1.columns, cellLoc='center',loc='center')
tab1.scale(1.1,1)
tab1.auto_set_font_size(False)
tab1.set_fontsize(14)

# if not os.path.exists(save_path):
#     os.makedirs(save_path)
plt.xticks(fontsize=12)
plt.yticks(fontsize=12)
fig.tight_layout()
plt.subplots_adjust(wspace=0.3, hspace=0)
plt.savefig(f'../plot_results/paper_viewer/Only S1 - {datatype} - Curve Plot .png', dpi=600)
# plt.savefig(f'{save_path}/{datatype} Lacune ALL model - Curve Plot .jpg')
plt.show()
plt.clf()

iou 0.9999999897320138 dice 1.0
Start model = test
densenet121
S1 data shape: img (992, 384, 384, 1) msk (992, 384, 384, 1)
