In [3]:
import pandas as pd
import numpy as np
import os
import tensorflow as tf
import cv2 as cv
import warnings
from tensorflow.keras import layers, models
from tensorflow.keras import backend as K
import itertools
import matplotlib.cm as cm
import skimage.transform as st
from enum import Enum
from sklearn.metrics import roc_curve, auc, classification_report, roc_auc_score
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.ndimage.filters import gaussian_filter
import gc

print(tf.__version__)
warnings.filterwarnings("ignore")

# gpus = tf.config.list_physical_devices(device_type='GPU')
# tf.config.set_visible_devices(devices=gpus[0], device_type='GPU')

from google.colab import drive
drive.mount('/content/drive')

2.5.0
Mounted at /content/drive


In [2]:
def calculateBMIInterval(height, weight):
    bmi = weight/(height*height)
    if (bmi < 24):
        return 0
    elif (bmi >= 24 and bmi < 30):
        return 1
    else:
        return 2
    
df_weight = pd.read_csv('/content/drive/MyDrive/works/Data/chartevents_weight.csv')
df_height = pd.read_csv('/content/drive/MyDrive/works/Data/chartevents_height.csv')

In [5]:
def get_data(train=True, target='NULL'):
    X_test = []
    y_test = []

    count = 0
    
    if (train):
        filename = '/content/drive/MyDrive/works/Data/mimic_train.tfrecords'
    else:
        filename = '/content/drive/MyDrive/works/Data/mimic_test.tfrecords'
    
    raw_dataset = tf.data.TFRecordDataset(filename)
    for raw_record in raw_dataset:
        sub_y = []

        example = tf.train.Example()
        example.ParseFromString(raw_record.numpy())
                
        ethnicity = example.features.feature['race'].float_list.value[0] 
        
        if (target != 'NULL'):
            id = example.features.feature['id'].float_list.value[0]

            # separate by bmi interval
            weight = df_weight.loc[df_weight['subject_id'] == id, 'valuenum'].values
            height = df_height.loc[df_height['subject_id'] == id, 'valuenum'].values

            try:
                bmi_interval = calculateBMIInterval(int(height)/100, int(weight))
            except:
                continue

            if not (bmi_interval == target):
                continue
            
        if (tf.math.equal(ethnicity, 0)):
            label = tf.constant([1, 0, 0])
        elif (tf.math.equal(ethnicity, 1)):
            label = tf.constant([0, 1, 0])
        elif (tf.math.equal(ethnicity, 4)):
            label = tf.constant([0, 0, 1])
        else:
            continue

        # nparr = np.fromstring(example.features.feature['jpg_bytes'].bytes_list.value[0], np.uint8)
        # img_np = cv.imdecode(nparr, cv.IMREAD_GRAYSCALE)

        # X_test.append(st.resize(img_np, (256, 256)))

        # y_test.append(label)

        count+=1
    print(count)
        
    # return np.array(X_test), np.array(y_test)

In [None]:
INPUT_SHAPE = (256, 256, 1)

def swish_activation(x):
    return (K.sigmoid(x) * x)

def define_model():
    base_model = tf.keras.applications.densenet.DenseNet121(
            include_top=False, weights=None, input_shape=INPUT_SHAPE, pooling='max')
         
    pred_layer = tf.keras.layers.Dense(3, activation='softmax')(base_model.output)
 
    model = tf.keras.Model(inputs=base_model.input, outputs=pred_layer)    
  
    return model

In [None]:
model = define_model()

def scheduler(epoch, lr):
    if epoch % 2 == 0:
        return lr * tf.math.exp(-0.05)
    else:
        return lr

callback = [tf.keras.callbacks.LearningRateScheduler(scheduler)]

print(model.summary())

In [None]:
model.compile(loss=tf.keras.losses.CategoricalCrossentropy(from_logits=True),
                 optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3))

X_train, y_train = get_data()

model.fit(X_train, y_train, epochs=10, shuffle=True, callbacks=callback)

In [7]:
def plot_confusion_matrix(cm,
                          target_names,
                          title='Confusion matrix',
                          cmap=None,
                          normalize=True):
    """
    given a sklearn confusion matrix (cm), make a nice plot
 
    Arguments
    ---------
    cm:           confusion matrix from sklearn.metrics.confusion_matrix
 
    target_names: given classification classes such as [0, 1, 2]
                  the class names, for example: ['high', 'medium', 'low']
 
    title:        the text to display at the top of the matrix
 
    cmap:         the gradient of the values displayed from matplotlib.pyplot.cm
                  see http://matplotlib.org/examples/color/colormaps_reference.html
                  plt.get_cmap('jet') or plt.cm.Blues
 
    normalize:    If False, plot the raw numbers
                  If True, plot the proportions
 
    Usage
    -----
    plot_confusion_matrix(cm           = cm,                  # confusion matrix created by
                                                              # sklearn.metrics.confusion_matrix
                          normalize    = True,                # show proportions
                          target_names = y_labels_vals,       # list of names of the classes
                          title        = best_estimator_name) # title of graph
 
    Citiation
    ---------
    http://scikit-learn.org/stable/auto_examples/model_selection/plot_confusion_matrix.html
 
    """
 
    accuracy = np.trace(cm) / float(np.sum(cm))
    misclass = 1 - accuracy
 
    if cmap is None:
        cmap = plt.get_cmap('Blues')
 
    plt.figure(figsize=(8, 6))
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
 
    if target_names is not None:
        tick_marks = np.arange(len(target_names))
        plt.xticks(tick_marks, target_names, rotation=45)
        plt.yticks(tick_marks, target_names)
 
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
 
 
    thresh = cm.max() / 1.5 if normalize else cm.max() / 2
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        if normalize:
            plt.text(j, i, "{:0.4f}".format(cm[i, j]),
                     horizontalalignment="center",
                     color="white" if cm[i, j] > thresh else "black")
        else:
            plt.text(j, i, "{:,}".format(cm[i, j]),
                     horizontalalignment="center",
                     color="white" if cm[i, j] > thresh else "black")
 
    
    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label\naccuracy={:0.4f}; misclass={:0.4f}'.format(accuracy, misclass))
    
    plt.show()
    
def plot_roc(y_test, preds, title, label):
    fig = plt.figure(figsize=(8,6))

    for i in range(len(label)):
        fpr, tpr, _ = roc_curve(y_test[:, i], preds[:, i])
        roc_auc = auc(fpr, tpr)
        # plot the roc curve for the model
        plt.plot(fpr, tpr, linestyle='solid', label='{} AUC={:.3f}'.format(label[i], roc_auc))

    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title(title)
    plt.legend(loc="lower right")
    plt.plot([0,1], [0,1], color='orange', linestyle='--')
#     filename = title + '.svg'
#     plt.savefig(filename)
    plt.show()

In [8]:
def test(y_preds, y_test):
    
    n_bootstraps = 1000
    rng_seed = 42  # control reproducibility
    bootstrapped_scores = []

    rng = np.random.RandomState(rng_seed)
    for i in range(n_bootstraps):
        # bootstrap by sampling with replacement on the prediction indices
        indices = rng.randint(0, len(y_preds), len(y_preds))
        if len(np.unique(y_test[indices])) < 2:
            # We need at least one positive and one negative sample for ROC AUC
            # to be defined: reject the sample
            continue

        score = roc_auc_score(y_test[indices], y_preds[indices])
        bootstrapped_scores.append(score)
        
    plt.hist(bootstrapped_scores, bins=100)
    plt.title('Histogram of the bootstrapped ROC AUC scores')
    plt.show()
    
    auc_score = np.array(bootstrapped_scores)
    
    mean_score = auc_score.mean()
    std_dev = auc_score.std()
    std_error = std_dev / np.math.sqrt(1)
    ci =  2.262 * std_error
    lower_bound = mean_score - ci
    upper_bound = mean_score + ci

    print("Sample auc mean: {:0.2f}". format(mean_score))
    print("Samole auc std: {:0.2f}".format(std_dev))
    print("Sample auc CI: {:0.2f}". format(ci))
    print("Confidence interval for the score: [{:0.2f} - {:0.2f}]".format(
        lower_bound, upper_bound))

In [9]:
Labels_race = ['WHITE', 'AFRICAN AMERICAN', 'ASIA']

def plot(y_preds, y_test):

    plot_roc(y_test, y_preds, 'ROC', Labels_race)

    cm_race = [0 for i in range(len(Labels_race))]
    for i in range(len(Labels_race)):
        cm_race[i] = [0 for j in range(len(Labels_race))]

    preds = tf.one_hot(tf.argmax(y_preds, axis=1), len(Labels_race))
    for i in range(len(y_test)):
        cm_race[np.argmax(preds[i])][np.argmax(y_test[i])] += 1

    plot_confusion_matrix(np.array(cm_race).transpose(), 
                          normalize = False,
                          target_names = Labels_race,
                          title = 'CM')

In [None]:
!unzip '/content/drive/MyDrive/works/Dnet_race_WBA_V2.zip'

In [None]:
X_test, y_test = get_data(train=False)

model = tf.keras.models.load_model('/content/saved_model/Dnet_race_split_256')

print(X_test.shape)
y_preds = model.predict(X_test)
test(y_preds, y_test)
plot(y_preds, y_test)

## AUC of different BMI category

In [None]:
# bmi 0
X_test, y_test = get_data(train=False, target=0)

print(X_test.shape)
y_preds = model.predict(X_test)
test(y_preds, y_test)
plot(y_preds, y_test)

In [None]:
# bmi 1
X_test, y_test = get_data(train=False, target=1)

print(X_test.shape)
y_preds = model.predict(X_test)
test(y_preds, y_test)
plot(y_preds, y_test)

In [None]:
# bmi 2
X_test, y_test = get_data(train=False, target=2)

print(X_test.shape)
y_preds = model.predict(X_test)
test(y_preds, y_test)
plot(y_preds, y_test)

## Noisy and Blurred

In [None]:
INPUT_SHAPE = (256, 256, 1)

def noisy(noise_type, image, sig):
    _, row, col, ch= image.shape
    if (noise_type == 0):
        mean = 0
        var = 0.1
        sigma = var**sig
        gauss = np.random.normal(mean, sigma,(row,col,ch))
        gauss = gauss.reshape(row,col,ch)
        noisy = image + gauss
        return noisy
    elif (noise_type == 1):
        blurred = gaussian_filter(np.float32(image), sigma=sig)
        return blurred

In [None]:
# noisy
# X_test, y_test = get_data(train=False)

print(X_test.shape)
y_preds = model.predict(noisy(0, X_test.reshape(len(X_test), 256, 256, 1), 1))
test(y_preds, y_test)
plot(y_preds, y_test)

In [None]:
# blurred

y_preds = model.predict(noisy(1, X_test.reshape(len(X_test), 256, 256, 1), 2))
test(y_preds, y_test)
plot(y_preds, y_test)

## Masked CXRs

In [None]:
# model = tf.keras.models.load_model('Dnet_race_WBA')

last_conv_layer_name = 'relu'
classifier_layer_names = ['max_pool', 'dense']

def make_gradcam_heatmap(
    img_array, model, last_conv_layer_name, classifier_layer_names
):
    # First, we create a model that maps the input image to the activations
    # of the last conv layer
    last_conv_layer = model.get_layer(last_conv_layer_name)
    last_conv_layer_model = tf.keras.Model(model.inputs, last_conv_layer.output)

    # Second, we create a model that maps the activations of the last conv
    # layer to the final class predictions
    classifier_input = tf.keras.Input(shape=last_conv_layer.output.shape[1:])
    x = classifier_input
    for layer_name in classifier_layer_names:
        x = model.get_layer(layer_name)(x)
    classifier_model = tf.keras.Model(classifier_input, x)

    # Then, we compute the gradient of the top predicted class for our input image
    # with respect to the activations of the last conv layer
    with tf.GradientTape() as tape:
        # Compute activations of the last conv layer and make the tape watch it
        last_conv_layer_output = last_conv_layer_model(img_array)
        tape.watch(last_conv_layer_output)
        # Compute class predictions
        preds = classifier_model(last_conv_layer_output)
        top_pred_index = tf.argmax(preds[0])
        top_class_channel = preds[:, top_pred_index]

    # This is the gradient of the top predicted class with regard to
    # the output feature map of the last conv layer
    grads = tape.gradient(top_class_channel, last_conv_layer_output)

    # This is a vector where each entry is the mean intensity of the gradient
    # over a specific feature map channel
    pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))

    # We multiply each channel in the feature map array
    # by "how important this channel is" with regard to the top predicted class
    last_conv_layer_output = last_conv_layer_output.numpy()[0]
    pooled_grads = pooled_grads.numpy()
    for i in range(pooled_grads.shape[-1]):
        last_conv_layer_output[:, :, i] *= pooled_grads[i]

    # The channel-wise mean of the resulting feature map
    # is our heatmap of class activation
    heatmap = np.mean(last_conv_layer_output, axis=-1)

    # For visualization purpose, we will also normalize the heatmap between 0 & 1
    heatmap = np.maximum(heatmap, 0) / np.max(heatmap)
    
    return heatmap

In [None]:
def show_heatmap(img_array, last_conv_layer_name, classifier_layer_names):

    heatmap = make_gradcam_heatmap(
    img_array, model, last_conv_layer_name, classifier_layer_names
    )

    # Display heatmap
    # plt.matshow(heatmap)
    # plt.show()

    heatmap = np.uint8(255 * heatmap)

    jet = cm.get_cmap("jet")

    # We use RGB values of the colormap
    jet_colors = jet(np.arange(256))[:, :3]
    jet_heatmap = jet_colors[heatmap]

    # We create an image with RGB colorized heatmap
    jet_heatmap = tf.keras.preprocessing.image.array_to_img(jet_heatmap)
    jet_heatmap = jet_heatmap.resize((256, 256))
    jet_heatmap = tf.keras.preprocessing.image.img_to_array(jet_heatmap)

    return jet_heatmap

In [None]:
def get_mask_img(img, threshold):
    
    jet_heatmap = show_heatmap(np.reshape(img, (1, 256, 256, 1)), last_conv_layer_name, classifier_layer_names)
    
    hottest_areas = np.ma.MaskedArray(jet_heatmap[:, :, 2], jet_heatmap[:, : ,2] < threshold)

    where = np.array(np.where(hottest_areas.mask))
    
    img_ = np.repeat(img, 3, -1)
    
    try:
        x1, y1 = np.amin(where, axis=1)
        x2, y2 = np.amax(where, axis=1)

        img_rec = cv.rectangle(img_, (y1, x1), (y2, x2), (0, 0, 0), -1)
        
    except:
        img_rec = img_


    return img_rec[:, :, 2]

In [None]:
X_masked_img = [] 
for i in range(len(X_test)):
    X_masked_img.append(get_mask_img(X_test[i].reshape(256, 256, 1), 0.1))

In [None]:
y_preds = model.predict(np.reshape(X_masked_img, (len(X_masked_img), 256, 256, 1)))
test(y_preds, y_test)
plot(y_preds, y_test)

## Show CXRs

In [None]:
for i in range(5):
    fig, axs = plt.subplots(1, 4)
    fig.set_size_inches(18.5, 10.5)
    
    axs[0].set_title('Normal CXR')
    axs[0].imshow(X_test[i], cmap='gray')
    axs[1].set_title('Noisy CXR')
    axs[1].imshow(noisy(0, X_test[i], 1), cmap='gray')
    axs[2].set_title('Blurred CXR')
    axs[2].imshow(noisy(1, X_test[i], 2), cmap='gray')
    axs[3].set_title('Masked CXR')
    axs[3].imshow(X_masked_img[i], cmap='gray')
    plt.show()

In [None]:
from sklearn.metrics import auc, accuracy_score, recall_score, precision_score, f1_score, confusion_matrix
import seaborn as sns

In [None]:
labels = ['WHITE', 'BLACK', 'ASIA']
modified_y_test = []
X_test, y_test = get_data(train=False)

result = model.predict(X_test)
labels = np.argmax(result, axis=1)

for i in range(len(y_test)):
    if (np.array_equal([1, 0, 0], y_test[i])):
        modified_y_test.append(0)
    if (np.array_equal([0, 1, 0], y_test[i])):
        modified_y_test.append(1)
    if (np.array_equal([0, 0, 1], y_test[i])):
        modified_y_test.append(2)

print (classification_report(modified_y_test, labels))
class_matrix = confusion_matrix(modified_y_test, labels)
sns.heatmap(class_matrix, annot=True, fmt='d', cmap='Blues')