In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import os 
import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt 
from sklearn.model_selection import train_test_split 
from tensorflow import keras
from tensorflow.keras.layers import Dense, Activation
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import categorical_crossentropy
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.preprocessing import image
from tensorflow.keras.models import Model
from tensorflow.keras.applications import imagenet_utils
from sklearn.metrics import confusion_matrix
import itertools
import os
import shutil
import random
import cv2
import matplotlib.pyplot as plt
from IPython.display import Image
%matplotlib inline

import tensorflow as tf
from tensorflow.keras.applications.resnet50 import ResNet50
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Flatten
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Dropout
import numpy as np
from itertools import cycle

from sklearn import svm, datasets
from sklearn.metrics import roc_curve, auc
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import label_binarize
from sklearn.multiclass import OneVsRestClassifier
from scipy import interp
from sklearn.metrics import roc_auc_score

from sklearn.datasets import make_circles
from sklearn.metrics import accuracy_score
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import f1_score

In [None]:
directory='/content/Pediatric Chest X-ray Pneumonia'
train_directory=os.path.join(directory,'train')
test_directory=os.path.join(directory,'test')
val_directory=os.path.join(directory,'val')

In [None]:
train_datagen = ImageDataGenerator(rescale=1/255,
                                   shear_range=0.2,
                                   zoom_range=0.2,
                                   horizontal_flip=True,
                                   vertical_flip=True)

validation_datagen = ImageDataGenerator(rescale=1/255)

test_datagen = ImageDataGenerator(rescale=1/255)

# Flow training images in batches of 32 using train_datagen generator
train_generator = train_datagen.flow_from_directory( train_directory, 
        classes = ['NORMAL', 'PNEUMONIA'],
        target_size=(224, 224),  
        batch_size=32,
        class_mode='binary')

# Flow validation images in batches of 32 using valid_datagen generator
validation_generator = validation_datagen.flow_from_directory( val_directory,
        classes = ['NORMAL', 'PNEUMONIA'],
        target_size=(224, 224), 
        batch_size=32,
        class_mode='binary',
        shuffle=False)

test_generator = test_datagen.flow_from_directory( test_directory,
        classes = ['NORMAL', 'PNEUMONIA'],
        target_size=(224, 224), 
        batch_size=32,
        class_mode='binary',
        shuffle=False)

STEP_SIZE_TRAIN=train_generator.n//train_generator.batch_size
STEP_SIZE_VAL=validation_generator.n//validation_generator.batch_size
STEP_SIZE_TEST=test_generator.n//test_generator.batch_size
print("STEP_SIZE_TRAIN: ",STEP_SIZE_TRAIN)
print("STEP_SIZE_TEST: ",STEP_SIZE_TEST)

In [None]:
base_model= tf.keras.applications.Xception(include_top = False, weights="imagenet", input_shape=(224,224,3))
base_model.summary()

In [None]:
def residual_unit(residual_input_data, filters):
    identity_x = residual_input_data
    
    filter1,filter2,filter3 = filters
    
    conv_op_1 = tf.keras.layers.Conv2D(filters=filter1,
                      kernel_size=(1,1),
                      strides=(1,1),
                      dilation_rate = 1,
                      padding='same')(residual_input_data)
    
    batch_norm_op_2 = tf.keras.layers.BatchNormalization()(conv_op_1)
    activation_op_2 = Activation('relu')(batch_norm_op_2)
    conv_op_2 = tf.keras.layers.Conv2D(filters=filter2,
                      kernel_size=(3,3),
                      strides=(1,1),
                      dilation_rate = 1,
                      padding='same')(activation_op_2)

    batch_norm_op_3 = tf.keras.layers.BatchNormalization()(conv_op_2)
    activation_op_3 = Activation('relu')(batch_norm_op_3)
    conv_op_3 = tf.keras.layers.Conv2D(filters=filter3,
                      kernel_size=(1,1),
                      strides=(1,1),
                      dilation_rate = 1,
                      padding='same')(activation_op_3)
    
    batch_norm_op_4 = tf.keras.layers.BatchNormalization()(conv_op_3)
    
    if identity_x.shape[-1] != conv_op_3.shape[-1]:
        filter_n = conv_op_3.shape[-1]
        
        identity_x = tf.keras.layers.Conv2D(filters=filter_n,
                          kernel_size=(1,1),
                          strides=(1,1),
                          dilation_rate=1,
                          padding='same')(identity_x)


        identity_x = tf.keras.layers.BatchNormalization()(identity_x)
        
    output = tf.keras.layers.Add()([identity_x, batch_norm_op_4])

    return output


In [None]:
def attention_module(attention_input_data, filters,p=1):
    p_res_unit_op_1 = attention_input_data
    for _ in range(p):
        p_res_unit_op_1 = residual_unit(p_res_unit_op_1, filters=filters)

    #print("pres", p_res_unit_op_1.shape)

    trunk_branch_op = trunk_branch(trunk_input_data=p_res_unit_op_1, filters=filters)

    #print("trunk_branch_op", trunk_branch_op.shape)

    mask_branch_op = mask_branch(mask_input_data=p_res_unit_op_1, filters=filters)

    print("mask_branch_op", mask_branch_op.shape)

    ar_learning_op = attention_residual_learning(mask_input=mask_branch_op, trunk_input=trunk_branch_op)
    print("ar_learning_op", ar_learning_op.shape)

    p_res_unit_op_2 = ar_learning_op
    for _ in range(p):
        p_res_unit_op_2 = residual_unit(p_res_unit_op_2, filters=filters)
    print("pres2", p_res_unit_op_2.shape)
    return p_res_unit_op_2

def trunk_branch(trunk_input_data, filters,t=2):
    
    t_res_unit_op = trunk_input_data
    for _ in range(t):
        t_res_unit_op = residual_unit(t_res_unit_op, filters=filters)

    return t_res_unit_op

def mask_branch(mask_input_data, filters, m=1,r=1):
    
    print(mask_input_data.shape)

    downsampling = tf.keras.layers.MaxPool2D(pool_size=(2, 2), 
                                  strides=(2, 2),
                                  padding='same')(mask_input_data)
    print('down', downsampling.shape)
    for _ in range(m):
        for j in range(r):
            downsampling = residual_unit(residual_input_data=downsampling, filters=filters)

        downsampling = tf.keras.layers.MaxPool2D(pool_size=(2, 2), 
                                      strides=(2, 2),
                                      padding='same')(downsampling)
        print(downsampling.shape)
    #===================================================================================================

    middleware = downsampling
    for _ in range(2 *r):
        middleware = residual_unit(residual_input_data=middleware, filters=filters)

    #===================================================================================================

    upsampling = tf.keras.layers.UpSampling2D(size=(2, 2))(middleware)


    for _ in range(m):
        for j in range(r):
            upsampling = residual_unit(residual_input_data=upsampling, filters=filters)

        upsampling = tf.keras.layers.UpSampling2D(size=(2, 2))(upsampling)
   # print(upsampling.shape)

    #resized_images= tf.image.resize(upsampling, tf.constant([mask_input_data.shape[1], mask_input_data.shape[2]])) 
    
    #print(resized_images.shape)

    conv_filter = upsampling.shape[-1]
    
    conv1 = tf.keras.layers.Conv2D(filters=conv_filter,
                      kernel_size=(1,1),
                      strides=(1,1),
                      dilation_rate=1,
                      padding='same')(upsampling)
    
    conv2 = tf.keras.layers.Conv2D(filters=conv_filter,
                      kernel_size=(1,1),
                      strides=(1,1),
                      dilation_rate=1,
                      padding='same')(conv1)

    sigmoid = Activation('sigmoid')(conv2)

    return sigmoid

def attention_residual_learning(mask_input, trunk_input):
    Mx = tf.keras.layers.Lambda(lambda x: 1 + x)(mask_input) # 1 + mask
    print("Mx:", Mx.shape) # (None, 56, 56, 256)
    print("trunk_input:", trunk_input.shape) # (None, 55, 55, 256)

    
    mask_dim = Mx.shape[1]
    trunk_dim = trunk_input.shape[1]

    print("Mask:", mask_dim)
    print("Trunk:", trunk_dim)

    # If dimensions do not match, then perform reshaping ...
    if (mask_dim > trunk_dim):
        diff = mask_dim - trunk_dim
        n_pair_pads = np.floor(diff/2)
        n_single_pads = diff % 2
        
        #print("trunk_input[0]", trunk_input[0].shape) # (55, 55, 256)
        #print(type(trunk_input[0])) # <class 'keras.engine.keras_tensor.KerasTensor'> 

        # ZeroPadding()'s arguments have to be integers or tuple(s) of integers 
        n_pair_pads = int(n_pair_pads)
        n_single_pads = int(n_single_pads)

        x = tf.reshape(trunk_input[0], (1,) + trunk_input.shape[1:])

        # Same padding on top, bottom, left & right ...
        x = tf.keras.layers.ZeroPadding2D( padding = n_pair_pads )(x)

        # Specifying padding in the manner ((top,bottom), (left,right)) ...
        x = tf.keras.layers.ZeroPadding2D( padding = ((0,n_single_pads),(0,n_single_pads))
                                        ) (x)
                                        
        # Converting from (1,x1,x2,x3) to (x1,x2,x3) ... 
        trunk_input = x

    elif (trunk_dim > mask_dim):
        diff = trunk_dim - mask_dim # 14-13=1
        n_pair_pads = np.floor(diff/2) # 1/2 = 0
        n_single_pads = diff % 2 # 1%2 = 1
        
        # ZeroPadding()'s arguments have to be integers or tuple(s) of integers 
        n_pair_pads = int(n_pair_pads)
        n_single_pads = int(n_single_pads)

        x = tf.reshape(Mx[0], (1,) + Mx.shape[1:])

        # Same padding on top, bottom, left & right ...
        x = tf.keras.layers.ZeroPadding2D( padding = n_pair_pads )(x)

        # Specifying padding in the manner ((top,bottom), (left,right)) ...
        x = tf.keras.layers.ZeroPadding2D( padding = ((0,n_single_pads),(0,n_single_pads))
                                        ) (x)
                                        
        # Converting from (1,x1,x2,x3) to (x1,x2,x3) ... 
        Mx = x

    print("Mx after zp:", Mx.shape)
    print("trunk_input after zp:", trunk_input.shape)   

    return tf.keras.layers.Multiply()([Mx, trunk_input]) # M(x) * T(x)  

In [None]:
layer_name = 'block13_sepconv2_act'   
layer_output = base_model.get_layer(layer_name).output
new_base_model = keras.Model(base_model.input, layer_output)

input1 = layer_output
att_mod_1 = attention_module(new_base_model.output, filters=[728,728,1024]) # 32, 64, 128
print('att_mod_1', att_mod_1.shape)

avg_pool_layer = tf.keras.layers.GlobalAveragePooling2D()(att_mod_1)
fc1 = tf.keras.layers.Dense(512, activation = 'relu')(avg_pool_layer)
dp = tf.keras.layers.Dropout(0.2)(fc1)
fully_connected_layer_last = Dense(1, activation='sigmoid')(dp)

pre = Model(inputs=input1, outputs=fully_connected_layer_last)

In [None]:
pre.summary()

In [None]:
final_model = pre(layer_output) 
pipe_model = Model(base_model.input, final_model)

In [None]:
pipe_model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.00001),
              loss='binary_crossentropy',
              metrics=['accuracy'])

history = pipe_model.fit(train_generator,
steps_per_epoch=STEP_SIZE_TRAIN,
epochs=30,
class_weight= {0:1.939,1:0.674},
verbose=1,
validation_data = validation_generator,
validation_steps=STEP_SIZE_VAL)        

In [None]:
preds = xx.predict(test_generator,verbose=1)
final_preds=[]
for i in preds:
    if(i >= 0.5):
        final_preds.append(1)
    else:
        final_preds.append(0)
accuracy = accuracy_score(list(test_generator.classes), final_preds)
print('Accuracy: %f' % accuracy)
# precision tp / (tp + fp)
precision = precision_score(list(test_generator.classes), final_preds)
print('Precision: %f' % precision)
# recall: tp / (tp + fn)
recall = recall_score(list(test_generator.classes), final_preds)
print('Recall: %f' % recall)
# f1: 2 tp / (2 tp + fp + fn)
f1 = f1_score(list(test_generator.classes), final_preds)
print('F1 score: %f' % f1)
fpr, tpr, _ = roc_curve(test_generator.classes, final_preds)
roc_auc = auc(fpr, tpr)
print(roc_auc)
pr, tpr, _ = roc_curve(test_generator.classes, final_preds)
roc_auc = auc(fpr, tpr)
plt.figure()
lw = 2
plt.plot(fpr, tpr, color='darkorange',
lw=lw, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], color='navy', lw=lw, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver operating characteristic example')
plt.legend(loc="lower right")
plt.show()

In [None]:
history.history

In [None]:
pipe_model.save("/content/drive/MyDrive/xception_ran.h5")

In [None]:
plt.plot(history.history['accuracy'],marker = 'v')
plt.plot(history.history['val_accuracy'],marker = '*')
plt.title('Model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()
# summarize history for loss
plt.plot(history.history['loss'],marker = 'v')
plt.plot(history.history['val_loss'],marker = '*')
plt.title('Model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

In [None]:
xception_ran = keras.models.load_model('/content/drive/MyDrive/xception_ran.h5')

In [None]:
directory='/content/Pediatric Chest X-ray Pneumonia'
train_directory=os.path.join(directory,'train')
test_directory=os.path.join(directory,'test')
val_directory=os.path.join(directory,'val')

train_datagen = ImageDataGenerator(rescale=1/255)

validation_datagen = ImageDataGenerator(rescale=1/255)

test_datagen = ImageDataGenerator(rescale=1/255)

train_generator = train_datagen.flow_from_directory( train_directory, 
        classes = ['NORMAL', 'PNEUMONIA'],
        target_size=(224, 224),  
        batch_size=1,
        class_mode='binary',
        shuffle=False)

validation_generator = validation_datagen.flow_from_directory( val_directory,
        classes = ['NORMAL', 'PNEUMONIA'],
        target_size=(224,224), 
        batch_size=1,
        class_mode='binary',
        shuffle=False)

test_generator = test_datagen.flow_from_directory( test_directory,
        classes = ['NORMAL', 'PNEUMONIA'],
        target_size=(224, 224), 
        batch_size=1,
        class_mode='binary',
        shuffle=False)

STEP_SIZE_TRAIN=train_generator.n//train_generator.batch_size
STEP_SIZE_VAL=validation_generator.n//validation_generator.batch_size
STEP_SIZE_TEST=test_generator.n//test_generator.batch_size
print("STEP_SIZE_TRAIN: ",STEP_SIZE_TRAIN)
print("STEP_SIZE_TEST: ",STEP_SIZE_TEST)

In [None]:
modelx = tf.keras.models.load_model('/content/drive/MyDrive/xception_ran.h5')

In [None]:
modelx.summary()

In [None]:
modelx.get_layer(name = 'model_1').summary()

In [None]:
features_layer1 = tf.keras.models.Model(
    inputs=modelx.inputs,
    outputs=modelx.get_layer(name = 'block13_sepconv2_act').output
)

In [None]:
f1train = features_layer1.predict(train_generator,verbose=1)
f1test = features_layer1.predict(test_generator,verbose=1)

In [None]:
features_layer2 = tf.keras.models.Model(
    inputs=modelx.get_layer('model_1').input,
    outputs=modelx.get_layer(name = 'model_1').get_layer(name = 'dense').output
)

In [None]:
f2train = features_layer2.predict(f1train,verbose=1)
f2test = features_layer2.predict(f1test,verbose=1)

In [None]:
train_labels = train_generator.classes
test_labels = test_generator.classes

In [None]:
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
%matplotlib inline
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import seaborn as sns

tsne = TSNE(n_components=2, verbose=1, perplexity=40, n_iter=300)
tsne_results = tsne.fit_transform(f2test)
df = pd.DataFrame()
df["y"] = test_labels
df["Component-1"] = tsne_results[:,0]
df["Component-2"] = tsne_results[:,1]
category_to_label = {0: 'Normal', 1:'Pneumonia'}

#plt.figure(figsize=(16,9))
sns.scatterplot(x="Component-1", y="Component-2" ,legend="full",alpha=1, hue= map(lambda x: "Normal" if x == 0 else "Pneumonia", df.y.tolist()),palette=sns.color_palette("hls", 2), data=df).set(title="Pediatric Pneumonia Test data T-SNE projection")