In [None]:
!pip install tensorflow==1.15.0

In [None]:
!pip install keras==2.1.2

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Global

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import datetime
import keras
# Display Image
from PIL import Image
# computer vision package to read dataset
import cv2
import os
from __future__ import print_function
from keras import backend as K
from keras import activations
from keras import utils
from keras.models import Model
from keras.layers import *
from keras.preprocessing.image import ImageDataGenerator
from keras.applications.vgg16 import VGG16, preprocess_input
from keras.applications.vgg16 import VGG16, preprocess_input
from keras.applications.resnet50 import ResNet50
from keras.applications.xception import Xception
from keras.optimizers import RMSprop, Adam, SGD, Nadam
from keras.preprocessing.image import ImageDataGenerator
from keras.callbacks import ReduceLROnPlateau, ModelCheckpoint, EarlyStopping
from keras import regularizers
from keras.layers import Input

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
from PIL import Image 
from keras.preprocessing.image import ImageDataGenerator, load_img, img_to_array
from numpy import expand_dims

import tensorflow as tf

IMG_SIZE = 224


train_folder = "/content/drive/MyDrive/train_test_val_ultrasound/train"
test_folder = '/content/drive/MyDrive/train_test_val_ultrasound/test'
val_folder = '/content/drive/MyDrive/train_test_val_ultrasound/val'

#train_c = os.path.join(train_folder, 'COVID-19')
#train_n = os.path.join(train_folder , 'normal')


train_datagen = ImageDataGenerator(rescale = 1./255, 
                                   shear_range = 0.2, zoom_range=0.2, height_shift_range=0.1,
                                   width_shift_range=0.1,fill_mode='nearest')
test_datagen = ImageDataGenerator(rescale=1./255)

train_set = train_datagen.flow_from_directory(train_folder, 
                                              target_size =(224,224),
                                              batch_size=32, 
                                              class_mode='categorical')

test_set = test_datagen.flow_from_directory(test_folder, 
                                            target_size=(224,224),
                                            batch_size=1, 
                                            class_mode='categorical')

val_set = test_datagen.flow_from_directory(val_folder, 
                                           target_size=(224,224),
                                           batch_size=32, 
                                           class_mode='categorical')

# Common

In [None]:
#=========================================================================================================================
#======================= NEURAL NETWORK PERFORMANCE MEASURES
#=========================================================================================================================
# 3.3 :: define performance evaluation functions

def get_performance(conf_matrix):
    #how many classes? = len of conf_matril
    nos_class = len(conf_matrix[0,:]) # len of 0th row
    res = np.zeros((0,9),dtype ='float64')
    for i in range(0,nos_class):
        # for each class calculate 4 performance measure - ACC, PRE, SEN, SPF, 
        # first compute TP, TN, FP, FN
        TP = conf_matrix[i,i]
        FP = np.sum(conf_matrix[:,i]) - TP
        FN = np.sum(conf_matrix[i,:]) - TP
        TN = np.sum(conf_matrix) - FN - FP - TP

        ACC = (TP+TN)   /   (TP+FP+FN+TN)
        PRE = (TP)      /   (TP+FP)
        SEN = (TP)      /   (TP+FN)
        SPF = (TN)      /   (TN+FP)
        F1S = (TP)      /   (TP + (FP+FN)*0.5)

        res_i = np.array([TP, FN, FP, TN, ACC, PRE, SEN, SPF, F1S])
        res = np.vstack((res,res_i))
    return res


#------------------------------------------------------------------PRINTING

def print_lstr(class_labels):
    g_LSTR=''   # HEADER ROW for printing confusing matrix
    for i in range(0,len(class_labels)):
        g_LSTR+='\t'+str(class_labels[i])
    return  g_LSTR

def print_cf_row(cf_row,nos_labels):
    res = ''
    for j in range(0,nos_labels):
        res += '\t'+ str(cf_row[j])
    return res
def print_conf_matrix(conf_matrix, suffix, class_labels):
    res=(suffix+'A\\P ' + print_lstr(class_labels)+'\n')
    nos_l=len(class_labels)
    for i in range(0,nos_l):
        res+=(suffix+str(class_labels[i]) + print_cf_row(conf_matrix[i],nos_l )+'\n')
    return res
def print_performance(perf_measures, class_labels):
    nos_class = len(perf_measures[:,0])

    print('Performance for '+str(nos_class)+' classes')
    print ('Class    \tACC\tPRE\tSEN\tSPF\tF1S\tALL\tT.P\tF.N\tF.P\tT.N')
    for i in range(0, nos_class):
        perf_i = np.round(perf_measures[i,:],2)
        print(str(class_labels[i])+'\t'+str(perf_i[4])+'\t'+str(perf_i[5])+'\t'+str(perf_i[6])+'\t'+str(perf_i[7])+'\t'+str(perf_i[8]) + \
              '\t'+str(np.sum(perf_i[0:2]))+'\t'+str(perf_i[0])+'\t'+str(perf_i[1]), '\t'+str(perf_i[2])+'\t'+str(perf_i[3])
              
              )
    return
#------------------------------------------------------------------


# Model

In [None]:
def squash(x, axis=-1):
    s_squared_norm = K.sum(K.square(x), axis, keepdims=True) + K.epsilon()
    scale = K.sqrt(s_squared_norm) / (0.5 + s_squared_norm)
    return scale * x

def softmax(x, axis=-1):
    ex = K.exp(x - K.max(x, axis=axis, keepdims=True))
    return ex / K.sum(ex, axis=axis, keepdims=True)


def margin_loss(y_true, y_pred):
    lamb, margin = 0.5, 0.1 
    return K.sum(y_true * K.square(K.relu(1 - margin - y_pred)) + lamb * (
        1 - y_true) * K.square(K.relu(y_pred - margin)), axis=-1)
    

class Capsule(Layer):
    def __init__(self,
                 num_capsule,
                 dim_capsule,
                 routings=3, 
                 share_weights=True,
                 activation='squash',
                 **kwargs):
        super(Capsule, self).__init__(**kwargs)
        self.num_capsule = num_capsule
        self.dim_capsule = dim_capsule
        self.routings = routings
        self.share_weights = share_weights
        if activation == 'squash':
            self.activation = squash
        else:
            self.activation = activations.get(activation)

    def build(self, input_shape):
        input_dim_capsule = input_shape[-1]
        if self.share_weights:
            self.kernel = self.add_weight(
                name='capsule_kernel',
                shape=(1, input_dim_capsule,
                       self.num_capsule * self.dim_capsule),
                initializer='glorot_uniform',
                trainable=True)
        else:
            input_num_capsule = input_shape[-2]
            self.kernel = self.add_weight(
                name='capsule_kernel',
                shape=(input_num_capsule, input_dim_capsule,
                       self.num_capsule * self.dim_capsule),
                initializer='glorot_uniform',
                trainable=True)

    def call(self, inputs):
        if self.share_weights:
            hat_inputs = K.conv1d(inputs, self.kernel)
        else:
            hat_inputs = K.local_conv1d(inputs, self.kernel, [1], [1])

        batch_size = K.shape(inputs)[0]
        input_num_capsule = K.shape(inputs)[1]
        hat_inputs = K.reshape(hat_inputs,
                               (batch_size, input_num_capsule,
                                self.num_capsule, self.dim_capsule))
        hat_inputs = K.permute_dimensions(hat_inputs, (0, 2, 1, 3))

        b = K.zeros_like(hat_inputs[:, :, :, 0])
        for i in range(self.routings):
            c = softmax(b, 1)
            o = self.activation(K.batch_dot(c, hat_inputs, [2, 2]))
            if i < self.routings - 1:
                b = K.batch_dot(o, hat_inputs, [2, 3])
                if K.backend() == 'theano':
                    o = K.sum(o, axis=1)

        return o

    def compute_output_shape(self, input_shape):
        return (None, self.num_capsule, self.dim_capsule)
    


## build

In [None]:

input_image = Input(shape=(224,224, 3))
#base_model = ResNet50(include_top=False, weights='imagenet', input_tensor=Input(shape=(224,224,3)))
#base_model = Xception(include_top=False, weights='imagenet', input_tensor=Input(shape=(224,224,3)))
base_model = VGG16(include_top=False, weights='imagenet', input_tensor=Input(shape=(224,224,3)))
base_model.summary()


vgg_out = base_model.get_layer(name = 'block5_pool').output

for layer in base_model.layers:
    layer.trainable = True
    print(layer, layer.trainable)

output = Conv2D(256, kernel_size=(7,7), strides=(1, 1), activation='relu')(vgg_out)
x = Reshape((-1, 256))(output)
capsule = Capsule(3, 16, 3, True)(x)
output = Lambda(lambda z: K.sqrt(K.sum(K.square(z), 2)))(capsule)
VGGCapsnet_Binarymodel = Model(base_model.input, outputs=output)


lr=0.00001
VGGCapsnet_Binarymodel.compile(loss='categorical_crossentropy', optimizer=Adam(lr=lr), metrics=['accuracy'])

VGGCapsnet_Binarymodel.summary()

# Training

In [None]:

train_batch = 32
val_batch = 16
epochs=10

filepath = '/content/drive/MyDrive/train_test_val_ultrasound/model.hdf5'
checkpoint = ModelCheckpoint(filepath, 
                             monitor='val_loss', 
                             verbose=1, 
                             save_best_only=True, 
                             save_weights_only=False, 
                             mode='min')


logdir = os.path.join('/content/drive/MyDrive/ultra_train_test_val/logdir',datetime.datetime.now().strftime("%Y%m%d-%H%M%s"))
tensorboard_callback = keras.callbacks.TensorBoard(logdir)

## start



In [None]:
hist=VGGCapsnet_Binarymodel.fit_generator(generator = train_set,
                    epochs=epochs,
                    shuffle = True,
                    validation_data=val_set, 
                    validation_steps = len(val_set.classes)//val_batch,
                    steps_per_epoch=len(train_set.classes)//train_batch,
                    callbacks=[checkpoint, tensorboard_callback],
                    verbose = 1)

In [None]:

#results = VGGCapsnet_Binarymodel.evaluate_generator(test_set)
# yhat = np.argmax(VGGCapsnet_Binarymodel.predict_generator(test_set),  axis=1)


# Testing & Performance

In [None]:
our_labels =   ['0-Covid   ', '1-Normal  ', '2-Pneumonia']
conf_matrix = np.zeros((len(our_labels),len(our_labels)),dtype='int32') # confusion matrix Overall


#for all_set in [test_set]: # [train_set, test_set, val_set]:
all_set = test_set 
max_samples = len(all_set)-1
verbose=1
do_plot=False

print('\n++++\n')
print('Test samples:', max_samples+1,'of',len(all_set))
print('\n++++\n')
y_hat, y_true = [], []
y_test, y_score = [], [] # true, pred

for z,(x,y) in enumerate(all_set):
  truth_class = np.argmax(y[0])

  if do_plot:
    plt.figure()
    plt.imshow(x[0])
    plt.title('Sample:'+ str(z)+'::'+str(y)+'::'+str(our_labels[truth_class]))
    plt.show()
    #print(i, '\nX:', , '\nY:', y, '\n\n\n')
  
  
  predicted = VGGCapsnet_Binarymodel.predict(x)
  pred_class = np.argmax(predicted[0])
  if verbose>0:
    print('Sample:'+ str(z), 'True Class:', str(y), str(our_labels[truth_class]), '[predicted-class]', str(our_labels[pred_class]))
  conf_matrix[truth_class, pred_class]+=1  # true class / pred class



  yH = predicted #VGGCapsnet_Binarymodel.predict(x)
  y_score.extend(yH)
  y_test.extend(y)
  yhat = np.argmax(yH, axis=1)
  ytrue = np.argmax(y, axis=1)
  y_hat.extend(yhat)
  y_true.extend(ytrue)
# end for


  if z==max_samples: 
    break



y_hat, y_true = np.array(y_hat), np.array(y_true)
y_test, y_score =  np.array(y_test),  np.array(y_score)
#print(y_test)
#print(y_score)

## Performance

In [None]:
print('\tGlobal Confusion Matrix [Overall]')
print(print_conf_matrix( conf_matrix,'', our_labels)) #logit('\t'+str(cmx))
print_performance( get_performance(conf_matrix) ,our_labels ) 

## Performace (sklearn)

In [None]:
#import sklearn.metrics
from sklearn.metrics import confusion_matrix, classification_report

ytest, yhat = y_true, y_hat
print(classification_report(ytest, yhat))
cf_matrix = confusion_matrix(ytest, yhat)
ax= plt.subplot()
sns.heatmap(cf_matrix, annot = True, ax=ax,fmt = ".1f")
plt.ylabel('Actal Values')
plt.xlabel('Predicted Values')
ax.set_title('Confusion Matrix'); 
ax.xaxis.set_ticklabels(['COVID-19', 'Normal', 'Pneumonia']); ax.yaxis.set_ticklabels(['COVID-19', 'Normal', 'Pneumonia']);
# our_labels =   ['0-Covid   ', '1-Normal  ', '2-Pneumonia']
plt.show()

sns.heatmap(cf_matrix/np.sum(cf_matrix), annot=True, 
            fmt='.2%', cmap='Blues')




# Training History

In [None]:
accuracy = hist.history['acc']
val_accuracy = hist.history['val_acc']
loss = hist.history['loss']
val_loss = hist.history['val_loss']
epochs = range(len(accuracy))

plt.plot(epochs, accuracy, 'go-', label='Training accuracy')
plt.plot(epochs, val_accuracy, 'ms-', label='Validation accuracy')
plt.legend()
plt.show()

In [None]:
plt.plot(epochs, accuracy, 'go-', label='Training accuracy')
plt.plot(epochs, val_accuracy, 'ms-', label='Validation accuracy')
plt.title('Training and validation accuracy curves for VGGCapsNet Model')
plt.legend()
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.grid(True)
plt.figure()
plt.plot(epochs, loss, 'bo-', label='Training loss')
plt.plot(epochs, val_loss, 'rs-', label='Validation loss')
plt.title('Training and validation loss curves for VGGCapsNet Model')
plt.legend()
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.grid(True)
plt.show()
plt.savefig('foo.png')

# RoC

In [None]:
from sklearn.metrics import roc_curve, auc

# Compute ROC curve and ROC area for each class
fpr = dict()
tpr = dict()
roc_auc = dict()

n_classes = 3
#y_test, y_score = y_true, y_hat
for i in range(n_classes):
    fpr[i], tpr[i], _ = roc_curve(y_test[:, i], y_score[:, i])
    roc_auc[i] = auc(fpr[i], tpr[i])

# Compute micro-average ROC curve and ROC area
fpr["micro"], tpr["micro"], _ = roc_curve(y_test.ravel(), y_score.ravel())
roc_auc["micro"] = auc(fpr["micro"], tpr["micro"])

# Plot of a ROC curve for a specific class
plt.figure()
plt.plot(fpr[2], tpr[2], label='ROC curve (area = %0.2f)' % roc_auc[2])
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver operating characteristic example')
plt.legend(loc="lower right")
plt.show()

# Plot ROC curve
plt.figure()
plt.plot(fpr["micro"], tpr["micro"],
         label='micro-average ROC curve (area = {0:0.2f})'
               ''.format(roc_auc["micro"]))
for i in range(n_classes):
    plt.plot(fpr[i], tpr[i], label='ROC curve of class {0} (area = {1:0.2f})'
                                   ''.format(i, roc_auc[i]))

plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Some extension of Receiver operating characteristic to multi-class')
plt.legend(loc="lower right")
plt.show()
plt.savefig('line_plot.jpg', dpi=300, quality=80, optimize=True, progressive=True)