In [None]:
pip install wandb --upgrade

In [None]:
%matplotlib inline

import os
from glob import glob
import pickle

import numpy as np
import math

from PIL import Image
import matplotlib.pyplot as plt
import matplotlib.cm as cm
import seaborn as sns

from sklearn.model_selection import train_test_split

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import LearningRateScheduler
from tensorflow.keras.applications.densenet import DenseNet121
from itertools import cycle
from tensorflow.keras.metrics import MeanIoU
from tqdm import tqdm

import wandb
from wandb.keras import WandbCallback

In [None]:
wandb.login()

# 1. Data Load

In [None]:
data_dir = '../'

with open(os.path.join(data_dir, 'x_train.pickle'), 'rb') as f:
    x_train = pickle.load(f)
with open(os.path.join(data_dir, 'x_valid.pickle'), 'rb') as f:
    x_valid = pickle.load(f)

with open(os.path.join(data_dir, 'x_test.pickle'), 'rb') as f:
    x_test = pickle.load(f)
with open(os.path.join(data_dir, 'x_ext.pickle'), 'rb') as f:
    x_ext = pickle.load(f)
   
with open(os.path.join(data_dir, 'y_train.pickle'), 'rb') as f:
    y_train = pickle.load(f)
with open(os.path.join(data_dir, 'y_valid.pickle'), 'rb') as f:
    y_valid = pickle.load(f)

with open(os.path.join(data_dir, 'y_test.pickle'), 'rb') as f:
    y_test = pickle.load(f)
with open(os.path.join(data_dir, 'y_ext.pickle'), 'rb') as f:
    y_ext = pickle.load(f)

In [None]:
y_train = keras.utils.to_categorical(y_train)
y_valid = keras.utils.to_categorical(y_valid)
y_test = keras.utils.to_categorical(y_test)
y_ext = keras.utils.to_categorical(y_ext)

In [None]:
# Check shape
x_train.shape, y_train.shape, x_valid.shape, y_valid.shape, x_test.shape, y_test.shape, x_ext.shape, y_ext.shape

# 2. Data Visualization with Augmentation

## 2.1. Data Augmentation

In [None]:
train_datagen = ImageDataGenerator(rotation_range=90,
                                   zoom_range=[0.8, 1.0], 
                                   horizontal_flip=True,
                                   vertical_flip=True, 
                                   fill_mode='constant', cval=0)

test_datagen = ImageDataGenerator()

## 2.1. Data Visualization

In [None]:
# x_train visualization
fig = plt.figure(figsize=(16, 8))
axs = []

for x_data in train_datagen.flow(x_train, seed=42):
    for idx, img in enumerate(x_data):
        axs.append(fig.add_subplot(4,8,idx+1))
        plt.imshow(img)
        axs[idx].axis('off')
    break

In [None]:
# x_valid visualization
fig = plt.figure(figsize=(20, 10))
axs = []

for x_data in test_datagen.flow(x_valid, shuffle=False, seed=42):
    for idx, img in enumerate(x_data):
        axs.append(fig.add_subplot(5,10,idx+1))
        plt.imshow(img)
        axs[idx].axis('off')
    break

In [None]:
# x_test visualization
fig = plt.figure(figsize=(20, 10))
axs = []

for x_data in test_datagen.flow(x_test, shuffle=False, seed=42):
    for idx, img in enumerate(x_data):
        axs.append(fig.add_subplot(5,10,idx+1))
        plt.imshow(img)
        axs[idx].axis('off')
    break

In [None]:
# x_ext visualization
fig = plt.figure(figsize=(20, 10))
axs = []

for x_data in test_datagen.flow(x_ext, shuffle=False, seed=42):
    for idx, img in enumerate(x_data):
        axs.append(fig.add_subplot(5,10,idx+1))
        plt.imshow(img)
        axs[idx].axis('off')
    break

# 3. Modeling

## 3.1. Deep Ensembles Train  
log synced with wandb

In [None]:
save_model = os.path.join(os.getcwd(), 'BestModelSaved')
model_num = ''

In [None]:
# DECREASE LEARNING RATE EACH EPOCH
annealer = LearningRateScheduler(lambda x: 1e-3 * 0.95 ** x)

#train networks
nets = 5
model = [0] *nets
single_proba=[]
ext_single_proba=[]

for j in range(nets):
    tf.keras.backend.clear_session()
    
    model_name= model_num + str(j+1)
    model_path = os.path.join(save_model, model_name + '_bestmodel.h5')

    checkpoint = tf.keras.callbacks.ModelCheckpoint(filepath=model_path,
                                                    monitor='val_accuracy',
                                                    verbose=1,
                                                    save_weights_only=False,
                                                    save_best_only=True,
                                                    mode='auto',
                                                    save_freq='epoch')
    early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_accuracy', patience=15)
    
    
    run = wandb.init(project='Brain_GBMmeta', group=model_num, 
                     config={
                         'batch_size':32, 
                         'epochs':50,
                         'loss_function':'categorical_crossentropy', 
                         'architecture':'DenseNet121 pretrained', 
                         'dataset':'DataProcessed/final_twoT2'
                     }, 
                    name=model_name)
    config = wandb.config
    
    
    densenet121=DenseNet121()
    
    model[j] = densenet121.layers[-2].output
    model[j] = tf.keras.layers.Dense(2, activation='softmax')(model[j])
    model[j] = tf.keras.models.Model(inputs=densenet121.input, outputs=model[j])
    
    model[j].compile(loss=config.loss_function, 
                     optimizer='adam',
                     metrics=['accuracy'])
    
    
    _ = model[j].fit(train_datagen.flow(x_train, y_train, batch_size=config.batch_size, seed=42),
                     validation_data=test_datagen.flow(x_valid, y_valid, shuffle=False, batch_size=config.batch_size, seed=42),
                     epochs=config.epochs,
                     steps_per_epoch=x_train.shape[0]//32, 
                     callbacks=[annealer, checkpoint, early_stopping, WandbCallback(monitor='val_accuracy')],
                     verbose=1)
    
    print("CNN {0:d}: Epochs={1:d}, Train accuracy={2:.5f}, Validation accuracy={3:.5f} \n".format(
        j+1,config.epochs,max(_.history['accuracy']),max(_.history['val_accuracy']) ))
    
    run.finish()

In [None]:
'''
# Load saved model
nets = 5
de_model = [0]*5
de_model[0] = keras.models.load_model("0_bestmodel.h5")
de_model[1] = keras.models.load_model("1_bestmodel.h5")
de_model[2] = keras.models.load_model("2_bestmodel.h5")
de_model[3] = keras.models.load_model("3_bestmodel.h5")
de_model[4] = keras.models.load_model("4_bestmodel.h5")
'''

## 3.2. Sinlge Model Train

In [None]:
save_model = os.path.join(os.getcwd(), 'BestModelSaved')
model_num = ''

In [None]:
# DECREASE LEARNING RATE EACH EPOCH
annealer = LearningRateScheduler(lambda x: 1e-3 * 0.95 ** x)

tf.keras.backend.clear_session()

model_name= model_num
model_path = os.path.join(save_model, model_name + '_bestmodel.h5')

checkpoint = tf.keras.callbacks.ModelCheckpoint(filepath=model_path,
                                                monitor='val_accuracy',
                                                verbose=1,
                                                save_weights_only=False,
                                                save_best_only=True,
                                                mode='auto',
                                                save_freq='epoch')
early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_accuracy', patience=15)


run = wandb.init(project='Brain_GBMmeta', group=model_num, 
                 config={
                     'batch_size':32, 
                     'epochs':50,
                     'loss_function':'categorical_crossentropy', 
                     'architecture':'DenseNet121 pretrained', 
                     'dataset':'DataProcessed/final_twoT2'
                 }, 
                name=model_name)
config = wandb.config


densenet121=DenseNet121()

model = densenet121.layers[-2].output
model = tf.keras.layers.Dense(2, activation='softmax')(model)
model = tf.keras.models.Model(inputs=densenet121.input, outputs=model)

model.compile(loss=config.loss_function, 
                 optimizer='adam',
                 metrics=['accuracy'])


_ = model.fit(train_datagen.flow(x_train, y_train, batch_size=config.batch_size, seed=42),
                 validation_data=test_datagen.flow(x_valid, y_valid, shuffle=False, batch_size=config.batch_size, seed=42),
                 epochs=config.epochs,
                 steps_per_epoch=x_train.shape[0]//32, 
                 callbacks=[annealer, checkpoint, early_stopping, WandbCallback(monitor='val_accuracy')],
                 verbose=1)

print("CNN Single: Epochs={0:d}, Train accuracy={1:.5f}, Validation accuracy={2:.5f} \n".format(
    config.epochs,max(_.history['accuracy']),max(_.history['val_accuracy']) ))

run.finish()

# 4. Test

In [None]:
from sklearn.metrics import confusion_matrix, auc, roc_auc_score, recall_score, f1_score, balanced_accuracy_score, classification_report, roc_curve, precision_score

In [None]:
def matrix(y_true, y_pred, y_proba):
    conf_mat = confusion_matrix(y_true.argmax(axis=1), y_pred.argmax(axis=1))
    TP = conf_mat[1][1]
    TN = conf_mat[0][0]
    FN = conf_mat[1][0]
    FP = conf_mat[0][1]

    acc = round((TP+TN)/(TP+TN+FP+FN), 4)
    sens = round(TP/(TP+FN), 4)
    spec = round(TN/(TN+FP), 4)
    auroc = round(roc_auc_score(y_true, y_proba), 4)
    precision = round(precision_score(y_true.argmax(axis=1), y_pred.argmax(axis=1)), 4)
    recall = round(recall_score(y_true.argmax(axis=1), y_pred.argmax(axis=1)), 4)
    f1score = round(f1_score(y_true.argmax(axis=1), y_pred.argmax(axis=1)), 4)
    w_acc = round(balanced_accuracy_score(y_true.argmax(axis=1), y_pred.argmax(axis=1)), 4)

    print(conf_mat, "\n")
    print("Acc", acc)
    print("Sensitivity", sens)
    print("Specificity", spec)
    print("AUROC", auroc)
    print('Precision', precision)
    print("Recall", recall)
    print("F1",  f1score)
    print("Weighted accuracy", w_acc, "\n")
    print(classification_report(y_true, y_pred))
    
    # ROC & AUC
    fpr = dict()
    tpr = dict()
    roc_auc = dict()
    for i in range(y_true.shape[1]):
        fpr[i], tpr[i], _ = roc_curve(y_true[:,i], y_proba[:, i])
        roc_auc[i] = auc(fpr[i], tpr[i])
    
    plt.figure(figsize=(5, 5))
    colors = cycle(['blue', 'red'])
    for i, color in zip(range(y_true.shape[1]), colors):
        if i == 0:
            pass
        else:
            plt.plot(fpr[i], tpr[i], color=color, lw=1.5, label='ROC curve of {0} (area = {1:0.2f})' ''.format(label[str(i)], roc_auc[i]))
    plt.plot([0, 1], [0, 1], 'k-', lw=1.5)
    plt.xlim([-0.05, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver operating characteristic for multi-class data')
    plt.legend(loc="lower right")
    plt.show()

In [None]:
label = {'0': 'GBM', 
        '1':'meta'}

## 4.1. Internal Test

### 4.1.1. Deep Ensembles Model

In [None]:
# Single
de_sing_proba= []
for m in de_model:
    proba = m.predict(test_datagen.flow(x_test, shuffle=False, seed=42))
    de_sing_proba.append(proba)

# Ensemble
results = np.zeros( (y_test.shape[0],2) )
for s in de_sing_proba:
    results = results + s

de_proba = results/5
de_pred = (de_proba > 0.5).astype(np.int64)

In [None]:
# Esemble Result
matrix(y_test, de_pred, de_proba)

### 4.1.2. Single Model

In [None]:
# Single
sing_proba = sing_model.predict(test_datagen.flow(x_test, shuffle=False, seed=42))
sing_pred = (sing_proba > 0.5).astype(np.int64)

In [None]:
y_test.shape, sing_pred.shape, sing_proba.shape

In [None]:
matrix(y_test, sing_pred, sing_proba)

In [None]:
matrix(y_test, sing_pred, sing_proba)

## 4.2. External Test

### 4.2.1. Deep Ensembles Model

In [None]:
# Deep Ensemble
ext_de_sing_proba= []
for m in de_model:
    proba = m.predict(test_datagen.flow(x_ext, shuffle=False, seed=42))
    ext_de_sing_proba.append(proba)
    
# Ensemble
ext_results = np.zeros( (y_ext.shape[0],2) )
for s in ext_de_sing_proba:
    ext_results = ext_results + s

ext_de_proba = ext_results/5
ext_de_pred = (ext_de_proba > 0.5).astype(np.int64)

In [None]:
matrix(y_ext, ext_de_pred, ext_de_proba)

### 4.2.2. Single Model

In [None]:
# ext_Single
ext_sing_proba = sing_model.predict(test_datagen.flow(x_ext, shuffle=False, seed=42))
ext_sing_pred = (ext_sing_proba > 0.5).astype(np.int64)

In [None]:
matrix(y_ext, ext_sing_pred, ext_sing_proba)

## 4.3. Uncertainty (Entropy) calculation

### 4.3.1. Internal test

In [None]:
# Deep Ensembles
int_de_entropy_list = []
for i in range(len(de_proba)):
    pq = de_proba[i] * y_test[i] #정답에 대한 예측 확률 사용하는거 맞는지 확인
    index = np.argmax(pq)
    p = pq[index]
    entropy = -p*math.log(p) - (1-p)*math.log(1-p+ 0.00000000001)
    int_de_entropy_list.append(entropy)

In [None]:
# Single 
int_sing_entropy_list = []
for i in range(len(sing_proba)):
    pq = sing_proba[i] * y_test[i] #정답에 대한 예측 확률 사용하는거 맞는지 확인
    index = np.argmax(pq)
    p = pq[index]
    entropy = -p*math.log(p) - (1-p)*math.log(1-p+ 0.00000000001)
    int_sing_entropy_list.append(entropy)

In [None]:
# plot distribution
sns.kdeplot(int_de_entropy_list, label='Deep Ensembles', color='r')
sns.kdeplot(int_sing_entropy_list, label='Single', color='g')
plt.xlabel('Internal Validation Entropy Values')
plt.legend()
plt.show()

### 4.3.2. External Test

In [None]:
# Deep Ensembles
ext_de_entropy_list = []
for i in range(len(ext_de_proba)):
    pq = ext_de_proba[i] * y_ext[i] #정답에 대한 예측 확률 사용하는거 맞는지 확인
    index = np.argmax(pq)
    p = pq[index]
    entropy = -p*math.log(p) - (1-p)*math.log(1-p+ 0.00000000001)
    ext_de_entropy_list.append(entropy)

In [None]:
# Single
ext_sing_entropy_list = []
for i in range(len(ext_sing_proba)):
    pq = ext_sing_proba[i] * y_ext[i]
    index = np.argmax(pq)
    p = pq[index]
    entropy = -p*math.log(p) - (1-p)*math.log(1-p+ 0.00000000001)
    ext_sing_entropy_list.append(entropy)

In [None]:
# plot distribution
sns.kdeplot(ext_de_entropy_list, label='Deep Ensembles', color='r')
sns.kdeplot(ext_sing_entropy_list, label='Single', color='g')
plt.xlabel('External Validation Entropy Values')
plt.legend()
plt.show()

In [None]:
# plot all together
sns.kdeplot(int_de_entropy_list, label='Int - Deep Ensembles', color='r')
sns.kdeplot(int_sing_entropy_list, label='Int - Single', color='g')
sns.kdeplot(ext_de_entropy_list, label='Ext - Deep Ensembles', color='r')
sns.kdeplot(ext_sing_entropy_list, label='Ext - Single', color='g')
plt.xlabel('Validation Entropy Values')
plt.legend()
plt.show()

### 4.3.3. Divide Groups and Check Each Group's Results
Uncertainty cutoff 0.25

In [None]:
def uc_grouping(entropy_list, cutoff):
    high_idx = []
    low_idx = []
    for idx, entropy in enumerate(entropy_list):
        if entropy >= cutoff:
            high_idx.append(idx)
        else:
            low_idx.append(idx)
            
    return high_idx, low_idx

#### 4.3.3.1 Internal Test

##### 4.3.3.1.1 Deep Ensembles

In [None]:
int_de_high, int_de_low = uc_grouping(int_de_entropy_list, cutoff=0.25)
print(len(int_de_high), len(int_de_low))

print("Internal Uncertainty High: %.2f%%" % (len(int_de_high) / len(int_de_entropy_list) * 100.0))
print("Internal Uncertainty Low: %.2f%%" % (len(int_de_low) / len(int_de_entropy_list) * 100.0))

In [None]:
# Matrix 위해 ensemble 결과 필요
de_test_high = y_test[int_de_high]
de_pred_high = de_pred[int_de_high]
de_proba_high = de_proba[int_de_high]

de_test_low = y_test[int_de_low]
de_pred_low = de_pred[int_de_low]
de_proba_low = de_proba[int_de_low]

In [None]:
#cutoff 0.25
matrix(de_test_high, de_pred_high, de_proba_high)
matrix(de_test_low, de_pred_low, de_proba_low)

##### 4.3.3.1.2. Single

In [None]:
int_sing_high, int_sing_low = uc_grouping(int_sing_entropy_list, cutoff=0.25)
print(len(int_sing_high), len(int_sing_low))

print("Internal Uncertainty High: %.2f%%" % (len(int_sing_high) / len(int_sing_entropy_list) * 100.0))
print("Internal Uncertainty Low: %.2f%%" % (len(int_sing_low) / len(int_sing_entropy_list) * 100.0))

In [None]:
# Matrix 위해 ensemble 결과 필요
sing_test_high = y_test[int_sing_high]
sing_pred_high = sing_pred[int_sing_high]
sing_proba_high = sing_proba[int_sing_high]

sing_test_low = y_test[int_sing_low]
sing_pred_low = sing_pred[int_sing_low]
sing_proba_low = sing_proba[int_sing_low]

In [None]:
#cutoff 0.25
matrix(sing_test_high, sing_pred_high, sing_proba_high)
matrix(sing_test_low, sing_pred_low, sing_proba_low)

#### 4.3.3.2. External Test

##### 4.3.3.2.1. Deep Ensembles

In [None]:
ext_de_high, ext_de_low = uc_grouping(ext_de_entropy_list, cutoff=0.25)
print(len(ext_de_high), len(ext_de_low))

print("External Uncertainty High: %.2f%%" % (len(ext_de_high) / len(ext_de_entropy_list) * 100.0))
print("External Uncertainty Low: %.2f%%" % (len(ext_de_low) / len(ext_de_entropy_list) * 100.0))

In [None]:
# Matrix 위해 ensemble 결과 필요
ext_de_test_high = y_ext[ext_de_high]
ext_de_pred_high = ext_de_pred[ext_de_high]
ext_de_proba_high = ext_de_proba[ext_de_high]

ext_de_test_low = y_ext[ext_de_low]
ext_de_pred_low = ext_de_pred[ext_de_low]
ext_de_proba_low = ext_de_proba[ext_de_low]

In [None]:
#cutoff 0.25
matrix(ext_de_test_high, ext_de_pred_high, ext_de_proba_high)
matrix(ext_de_test_low, ext_de_pred_low, ext_de_proba_low)

##### 4.3.3.2.2. Single

In [None]:
ext_sing_high, ext_sing_low = uc_grouping(ext_sing_entropy_list, cutoff=0.25)
print(len(ext_sing_high), len(ext_sing_low))

print("Internal Uncertainty High: %.2f%%" % (len(ext_sing_high) / len(ext_sing_entropy_list) * 100.0))
print("Internal Uncertainty Low: %.2f%%" % (len(ext_sing_low) / len(ext_sing_entropy_list) * 100.0))

In [None]:
# Matrix 위해 ensemble 결과 필요
ext_sing_test_high = y_ext[ext_sing_high]
ext_sing_pred_high = ext_sing_pred[ext_sing_high]
ext_sing_proba_high = ext_sing_proba[ext_sing_high]

ext_sing_test_low = y_ext[ext_sing_low]
ext_sing_pred_low = ext_sing_pred[ext_sing_low]
ext_sing_proba_low = ext_sing_proba[ext_sing_low]

In [None]:
#cutoff 0.25
matrix(ext_sing_test_high, ext_sing_pred_high, ext_sing_proba_high)
matrix(ext_sing_test_low, ext_sing_pred_low, ext_sing_proba_low)

# 5. Grad-CAM

In [None]:
mask_path = '../'

with open(f'{mask_path}/test_mask.pickle', 'rb') as f:
    test_mask = pickle.load(f)
    
with open(f'{mask_path}/ext_mask.pickle', 'rb') as f:
    ext_mask = pickle.load(f)


print(test_mask.shape)
print(ext_mask.shape)

In [None]:
def make_gradcam_heatmap(img_array, model, last_conv_layer_name, pred_index=None):
    # First, we create a model that maps the input image to the activations
    # of the last conv layer as well as the output predictions
    grad_model = tf.keras.models.Model(
        [model.inputs], [model.get_layer(last_conv_layer_name).output, model.output]
    )

    # Then, we compute the gradient of the top predicted class for our input image
    # with respect to the activations of the last conv layer
    with tf.GradientTape() as tape:
        last_conv_layer_output, preds = grad_model(img_array)
        if pred_index is None:
            pred_index = tf.argmax(preds[0]) # model이 predict 한 값의 gradient 계산
        class_channel = preds[:, pred_index]

    # This is the gradient of the output neuron (top predicted or chosen)
    # with regard to the output feature map of the last conv layer
    grads = tape.gradient(class_channel, last_conv_layer_output)

    # This is a vector where each entry is the mean intensity of the gradient
    # over a specific feature map channel
    pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))

    # We multiply each channel in the feature map array
    # by "how important this channel is" with regard to the top predicted class
    # then sum all the channels to obtain the heatmap class activation
    last_conv_layer_output = last_conv_layer_output[0]
    heatmap = last_conv_layer_output @ pooled_grads[..., tf.newaxis]
    heatmap = tf.squeeze(heatmap)

    # For visualization purpose, we will also normalize the heatmap between 0 & 1
    heatmap = tf.maximum(heatmap, 0) / tf.math.reduce_max(heatmap)
    heatmap = np.uint8(heatmap * 255) # Scale between 0-255 to visualize
    heatmap = np.uint8(Image.fromarray(heatmap).resize((img_array.shape[1], img_array.shape[2]), Image.ANTIALIAS))/255
    return heatmap, preds.numpy()

In [None]:
def get_de_heatmap_prob(img_array, de_model, last_conv_layer):
    gradimg = np.expand_dims(img_array, axis=0) #(1, 224, 224, 3)
    heatmap_shape = (img_array.shape[0], img_array.shape[1])

    de_heatmap = np.zeros(heatmap_shape)
    de_proba = np.zeros((1, 2)) # proba output shape

    for index, m in enumerate(de_model):
        heatmap, proba = make_gradcam_heatmap(gradimg, m, last_conv_layer)
        de_heatmap = de_heatmap + heatmap
        de_proba = de_proba + proba

    de_heatmap = de_heatmap/5 #(224, 224) 0-1
    de_proba = de_proba/5 #(1, 2)
    return de_heatmap, de_proba

In [None]:
def save_display_gradcam_get_iou(img_dataset, last_conv_layer, label_dataset, de_model, mask_dataset, savepath, alpha=0.8):
    total_iou = []
    for num, img in enumerate(tqdm(img_dataset)):
        
        de_heatmap, de_proba = get_de_heatmap_prob(img, de_model, last_conv_layer)

        predictions = "Meta" if de_proba.argmax(-1) == 1 else "GBM"
        label = "Meta" if label_dataset[num].argmax(-1) == 1 else "GBM"

        # Rescale img, heatmap to a range 0-255
        img = Image.fromarray(img[:,:,0].astype(np.float32) * 255).convert('RGB')
        img = keras.preprocessing.image.img_to_array(img)

        heatmap = np.uint8(255 * de_heatmap)

        # Get IoU
        hm_label = np.where(heatmap <= 127.5, 0, heatmap)
        hm_label = np.where(hm_label > 127.5, 1, hm_label)

        mask_label = np.where(mask_dataset[num] > 0, 1, mask_dataset[num])
        mask_label = np.ceil(mask_label)

        m = MeanIoU(num_classes=2)
        m.update_state(mask_label, hm_label)
        iou = m.result().numpy()
        total_iou.append(iou)

        # Use jet colormap to colorize heatmap
        jet = cm.get_cmap("jet")

        # Use RGB values of the colormap
        jet_colors = jet(np.arange(256))[:, :3]
        jet_heatmap = jet_colors[heatmap] #range 0-1
        jet_heatmap = np.uint8(255 * jet_heatmap)

        # Superimpose the heatmap on original image
        superimposed_img = jet_heatmap * alpha + img
        superimposed_img = keras.preprocessing.image.array_to_img(superimposed_img)

        fig, ax = plt.subplots(1, 5, figsize=(20, 4))

        ax[0].imshow(img, cmap='gray')
        ax[0].set_title("T2")
        ax[1].imshow(superimposed_img)
        ax[1].set_title("GradCAM - T2")
        ax[2].imshow(np.array(hm_label))
        ax[2].set_title("Area of Pred")
        ax[3].imshow(np.array(mask_label))
        ax[3].set_title("Area of Ground Truth")
        ax[4].imshow(mask_dataset[num])
        ax[4].set_title("Mask")

        plt.suptitle(f'[{num}] Prediction - {predictions} / Ground Truth - {label} / IoU - {iou:.4f}',fontsize=15)
        plt.savefig(os.path.join(savepath, f'{num}.png'))
        #plt.show()
    return total_iou

In [None]:
# Internal Test
last_conv_layer = 'conv5_block16_2_conv'
savepath = '/home/ubuntu/SJEOM/Brain-GBMmeta/GitHub/ModelingTesting/Results/220506_CAM/int_test'

int_test_total_iou = save_display_gradcam_get_iou(x_test, last_conv_layer, y_test, de_model, test_mask, savepath, alpha=0.8)

with open('int_test_total_iou.pickle', 'wb') as f:
    pickle.dump(int_test_total_iou, f)

In [None]:
len(int_test_total_iou)

In [None]:
# External Test
last_conv_layer = 'conv5_block16_2_conv'
savepath = '/home/ubuntu/SJEOM/Brain-GBMmeta/GitHub/ModelingTesting/Results/220506_CAM/ext_test'

ext_test_total_iou = save_display_gradcam_get_iou(x_ext, last_conv_layer, y_ext, de_model, ext_mask, savepath, alpha=0.8)

with open('ext_test_total_iou.pickle', 'wb') as f:
    pickle.dump(ext_test_total_iou, f)

In [None]:
len(ext_test_total_iou)

# 6. Uncertainty x IoU

## 6.1. Internal Test

In [None]:
# int de uncertainty high group -> iou high/low
int_high_un_high_iou_idx=[]
int_high_un_low_iou_idx=[]

for i in int_de_high:
    if int_test_total_iou[i] >= 0.5:
        int_high_un_high_iou_idx.append(i)
    else:
        int_high_un_low_iou_idx.append(i)
        
# int de uncertainty low group -> iou high/low
int_low_un_high_iou_idx=[]
int_low_un_low_iou_idx=[]

for i in int_de_low:
    if int_test_total_iou[i] >= 0.5:
        int_low_un_high_iou_idx.append(i)
    else:
        int_low_un_low_iou_idx.append(i)

In [None]:
print(len(int_high_un_high_iou_idx), len(int_high_un_low_iou_idx))
print(len(int_low_un_high_iou_idx), len(int_low_un_low_iou_idx))

In [None]:
int_test_high_un_high_iou = y_test[int_high_un_high_iou_idx]
int_pred_high_un_high_iou = de_pred[int_high_un_high_iou_idx]
int_proba_high_un_high_iou = de_proba[int_high_un_high_iou_idx]

int_test_high_un_low_iou = y_test[int_high_un_low_iou_idx]
int_pred_high_un_low_iou = de_pred[int_high_un_low_iou_idx]
int_proba_high_un_low_iou = de_proba[int_high_un_low_iou_idx]

int_test_low_un_high_iou = y_test[int_low_un_high_iou_idx]
int_pred_low_un_high_iou = de_pred[int_low_un_high_iou_idx]
int_proba_low_un_high_iou = de_proba[int_low_un_high_iou_idx]

int_test_low_un_low_iou = y_test[int_low_un_low_iou_idx]
int_pred_low_un_low_iou = de_pred[int_low_un_low_iou_idx]
int_proba_low_un_low_iou = de_proba[int_low_un_low_iou_idx]

In [None]:
matrix(int_test_high_un_high_iou, int_pred_high_un_high_iou, int_proba_high_un_high_iou)
matrix(int_test_high_un_low_iou, int_pred_high_un_low_iou, int_proba_high_un_low_iou)
matrix(int_test_low_un_high_iou, int_pred_low_un_high_iou, int_proba_low_un_high_iou)
matrix(int_test_low_un_low_iou, int_pred_low_un_low_iou, int_proba_low_un_low_iou)

## 6.2. External Test

In [None]:
# ext de uncertainty high group -> iou high/low
ext_high_un_high_iou_idx=[]
ext_high_un_low_iou_idx=[]

for i in ext_de_high:
    if ext_test_total_iou[i] >= 0.5:
        ext_high_un_high_iou_idx.append(i)
    else:
        ext_high_un_low_iou_idx.append(i)
        
# ext de uncertainty low group -> iou high/low
ext_low_un_high_iou_idx=[]
ext_low_un_low_iou_idx=[]

for i in ext_de_low:
    if ext_test_total_iou[i] >= 0.5:
        ext_low_un_high_iou_idx.append(i)
    else:
        ext_low_un_low_iou_idx.append(i)

In [None]:
print(len(ext_high_un_high_iou_idx), len(ext_high_un_low_iou_idx))
print(len(ext_low_un_high_iou_idx), len(ext_low_un_low_iou_idx))

In [None]:
ext_test_high_un_high_iou = y_ext[ext_high_un_high_iou_idx]
ext_pred_high_un_high_iou = ext_de_pred[ext_high_un_high_iou_idx]
ext_proba_high_un_high_iou = ext_de_proba[ext_high_un_high_iou_idx]

ext_test_high_un_low_iou = y_ext[ext_high_un_low_iou_idx]
ext_pred_high_un_low_iou = ext_de_pred[ext_high_un_low_iou_idx]
ext_proba_high_un_low_iou = ext_de_proba[ext_high_un_low_iou_idx]


ext_test_low_un_high_iou = y_ext[ext_low_un_high_iou_idx]
ext_pred_low_un_high_iou = ext_de_pred[ext_low_un_high_iou_idx]
ext_proba_low_un_high_iou = ext_de_proba[ext_low_un_high_iou_idx]

ext_test_low_un_low_iou = y_ext[ext_low_un_low_iou_idx]
ext_pred_low_un_low_iou = ext_de_pred[ext_low_un_low_iou_idx]
ext_proba_low_un_low_iou = ext_de_proba[ext_low_un_low_iou_idx]

In [None]:
matrix(ext_test_high_un_high_iou, ext_pred_high_un_high_iou, ext_proba_high_un_high_iou)
matrix(ext_test_high_un_low_iou, ext_pred_high_un_low_iou, ext_proba_high_un_low_iou)
matrix(ext_test_low_un_high_iou, ext_pred_low_un_high_iou, ext_proba_low_un_high_iou)
matrix(ext_test_low_un_low_iou, ext_pred_low_un_low_iou, ext_proba_low_un_low_iou)

# 7. OOD Validation

## 7.1. Data Load

In [None]:
data_dir= '../'

In [None]:
with open(os.path.join(data_dir, 'x_ewha.pickle'), 'rb') as f:
    x_ewha = pickle.load(f)
    
with open(os.path.join(data_dir, 'y_ewha.pickle'), 'rb') as f:
    y_ewha = pickle.load(f)
    
with open(os.path.join(data_dir, 'x_sev.pickle'), 'rb') as f:
    x_sev = pickle.load(f)
    
with open(os.path.join(data_dir, 'y_sev.pickle'), 'rb') as f:
    y_sev = pickle.load(f)

In [None]:
# Ewha Data Visualization
n=0
f, axarr = plt.subplots(10, 10, sharex=True, sharey=True, figsize=(20, 20))
for i in range(10):
    for j in range(10):
        axarr[i, j].imshow(x_ewha[n+50*i+5*j], cmap='gray') #3 channel 한 번에 띄우면 이미지가 이상해ㅜ
        axarr[i, j].axis('off')
        axarr[i, j].set_title(n+50*i+5*j)
plt.show()

In [None]:
# Severance Data Visualization
n=0
f, axarr = plt.subplots(10, 10, sharex=True, sharey=True, figsize=(20, 20))
for i in range(10):
    for j in range(10):
        axarr[i, j].imshow(x_sev[n+50*i+5*j], cmap='gray') #3 channel 한 번에 띄우면 이미지가 이상해ㅜ
        axarr[i, j].axis('off')
        axarr[i, j].set_title(n+50*i+5*j)
plt.show()

## 7.2. Test

In [None]:
y_ewha = keras.utils.to_categorical(y_ewha)
y_sev = keras.utils.to_categorical(y_sev)

In [None]:
x_ewha.shape, y_ewha.shape, x_sev.shape, y_sev.shape

### 7.2.1. Ewha

#### 7.2.1.1. Deep Ensembles

In [None]:
# Deep Ensemble
ewha_de_sing_proba= []
for m in de_model:
    proba = m.predict(test_datagen.flow(x_ewha, shuffle=False, seed=42))
    ewha_de_sing_proba.append(proba)
    
# Ensemble
ewha_results = np.zeros( (y_ewha.shape[0],2) )
for s in ewha_de_sing_proba:
    ewha_results = ewha_results + s

ewha_de_proba = ewha_results/5
ewha_de_pred = (ewha_de_proba > 0.5).astype(np.int64)

In [None]:
matrix(y_ewha, ewha_de_pred, ewha_de_proba)

#### 7.2.1.2. Single

In [None]:
# Single
ewha_sing_proba = sing_model.predict(test_datagen.flow(x_ewha, shuffle=False, seed=42))
ewha_sing_pred = (ewha_sing_proba > 0.5).astype(np.int64)

In [None]:
y_ewha.shape, ewha_sing_pred.shape, ewha_sing_proba.shape

In [None]:
matrix(y_ewha, ewha_sing_pred, ewha_sing_proba)

#### 7.2.1.3. Uncertainty (Entropy) Calculation

In [None]:
ewha_de_entropy = []
for i in range(len(ewha_de_proba)):
    pq = ewha_de_proba[i] * y_ewha[i]
    index = np.argmax(pq)
    p = pq[index]
    entropy = -p*math.log(p) - (1-p)*math.log(1-p+ 0.00000000001)
    ewha_de_entropy.append(entropy)
    
ewha_sing_entropy = []
for i in range(len(ewha_sing_proba)):
    pq = ewha_sing_proba[i] * y_ewha[i]
    index = np.argmax(pq)
    p = pq[index]
    entropy = -p*math.log(p+ 0.00000000001) - (1-p)*math.log(1-p+ 0.00000000001)
    ewha_sing_entropy.append(entropy)

#### 7.2.1.4. Divide Groups with Entropy Values

In [None]:
# Deep Ensemble
ewha_de_high, ewha_de_low = uc_grouping(ewha_de_entropy, cutoff=0.25)
print(len(ewha_de_high), len(ewha_de_low))

print("Ewha Uncertainty High: %.2f%%" % (len(ewha_de_high) / len(ewha_de_entropy) * 100.0))
print("Ewha Uncertainty Low: %.2f%%" % (len(ewha_de_low) / len(ewha_de_entropy) * 100.0))

In [None]:
# Single
ewha_sing_high, ewha_sing_low = uc_grouping(ewha_sing_entropy, cutoff=0.25)
print(len(ewha_sing_high), len(ewha_sing_low))

print("Ewha Uncertainty High: %.2f%%" % (len(ewha_sing_high) / len(ewha_sing_entropy) * 100.0))
print("Ewha Uncertainty Low: %.2f%%" % (len(ewha_sing_low) / len(ewha_sing_entropy) * 100.0))

In [None]:
# plot distribution
sns.kdeplot(np.array(ewha_de_entropy), label='ensemble', color='r')
sns.kdeplot(np.array(ewha_sing_entropy), label='single', color='g')
plt.xlabel('OOD; Ewha Entropy Values')
plt.legend()
plt.show()

### 7.2.2. Severance

#### 7.2.2.1. Deep Ensembles

In [None]:
sev_de_sing_proba= []
for m in de_model:
    proba = m.predict(test_datagen.flow(x_sev, shuffle=False, seed=42))
    sev_de_sing_proba.append(proba)
    
# Ensemble
sev_results = np.zeros( (y_sev.shape[0],2) )
for s in sev_de_sing_proba:
    sev_results = sev_results + s

sev_de_proba = sev_results/5
sev_de_pred = (sev_de_proba > 0.5).astype(np.int64)

In [None]:
y_sev.shape, sev_de_proba.shape, sev_de_proba.shape

In [None]:
matrix(y_sev, sev_de_pred, sev_de_proba)

#### 7.2.2.2. Single

In [None]:
sev_sing_proba = sing_model.predict(test_datagen.flow(x_sev, shuffle=False, seed=42))
sev_sing_pred = (sev_sing_proba > 0.5).astype(np.int64)

In [None]:
y_sev.shape, sev_sing_proba.shape, sev_sing_pred.shape

In [None]:
matrix(y_sev, sev_sing_pred, sev_sing_proba)

#### 7.2.2.3. Uncertainty (Entropy) Calculation

In [None]:
sev_de_entropy = []
for i in range(len(sev_de_proba)):
    pq = sev_de_proba[i] * y_sev[i]
    index = np.argmax(pq)
    p = pq[index]
    entropy = -p*math.log(p) - (1-p)*math.log(1-p+ 0.00000000001)
    sev_de_entropy.append(entropy)
    
sev_sing_entropy = []
for i in range(len(sev_sing_proba)):
    pq = sev_sing_proba[i] * y_sev[i]
    index = np.argmax(pq)
    p = pq[index]
    entropy = -p*math.log(p+ 0.00000000001) - (1-p)*math.log(1-p+ 0.00000000001)
    sev_sing_entropy.append(entropy)

#### 7.2.2.4. Divide Groups with Entropy Values

In [None]:
# Deep Ensembles
sev_de_high, sev_de_low = uc_grouping(sev_de_entropy, cutoff=0.25)
print(len(sev_de_high), len(sev_de_low))

print("Severance Uncertainty High: %.2f%%" % (len(sev_de_high) / len(sev_de_entropy) * 100.0))
print("Severance Uncertainty Low: %.2f%%" % (len(sev_de_low) / len(sev_de_entropy) * 100.0))

In [None]:
# Single
sev_sing_high, sev_sing_low = uc_grouping(sev_sing_entropy, cutoff=0.25)
print(len(sev_sing_high), len(sev_sing_low))

print("Severance Uncertainty High: %.2f%%" % (len(sev_sing_high) / len(sev_sing_entropy) * 100.0))
print("Severance Uncertainty Low: %.2f%%" % (len(sev_sing_low) / len(sev_sing_entropy) * 100.0))

In [None]:
# Plot Distribution
sns.kdeplot(np.array(sev_de_entropy), label='ensemble', color='r')
sns.kdeplot(np.array(sev_sing_entropy), label='single', color='g')
plt.xlabel('OOD; Severance Entropy Values')
plt.legend()
plt.show()