# Development of CNN Model to Classify Chest X-Ray Images of COVID-19 Patients

In [None]:
# For opening folders
import glob

# For image processing
from PIL import Image

# For data handling
import numpy as np
import pandas as pd

In [None]:
# Names of folders that contain the images
folders =['covid', 'normal', 'viral pneumonia']

In [None]:
# Reading in the data

data = []
labels = []
data = np.asarray(data)


for folder in folders:
    counter = 0
    for filepath in glob.glob('{}/*'.format(folder)):
        
        # Labels
        labels.append(folder)
        
        # Images
        image = Image.open(filepath)
        image = image.resize((256,256))  # Resizing the images
        image = np.asarray(image)
        
        if len(image.shape) != 2:        # Multichannel images converted to single channel
            image2 = np.zeros((256,256))
            image2 = image[:,:,0]
            data = np.append(data, image2)
        else:
            data = np.append(data, image)

cleaned_xray_images = data.reshape(362,256,256,1)

# Data is cleaned locally and stored so that a small file can be uploaded to the cloud server

pd.DataFrame(cleaned_xray_images).to_csv("x_ray_dataset_input.csv", header = None, index = None)
pd.DataFrame(labels).to_csv("x_ray_dataset_output.csv", header = None, index = None)


In [None]:
# Since the current version of Keras Vis has not been maintained,
# it needs to be installed from a repository github that contains 
# an updated version [https://github.com/raghakot/keras-vis]

# Uninstalling older version of keras-vis
!pip uninstall vis
!pip uninstall keras-vis
!pip uninstall keras-vis-temp

# Installing the new vis library 
!pip install git+https://github.com/raghakot/keras-vis.git -U


# For plotting
import matplotlib.pyplot as plt
% matplotlib inline

# For data preprocessing
from sklearn.preprocessing import LabelEncoder, MinMaxScaler
from sklearn.model_selection import train_test_split
from keras.utils import to_categorical
from keras.preprocessing.image import ImageDataGenerator

# For model development
from keras.layers import Input, Dense, Conv2D, MaxPool2D, Flatten, Dropout
from keras.models import Model
from keras import optimizers

# For model training
from keras.callbacks import ModelCheckpoint

# For clustering and its analysis
from sklearn.cluster import AgglomerativeClustering as ac
from sklearn.metrics import accuracy_score

# For visualizing CNN CAM and Saliency
from vis.visualization import visualize_saliency, visualize_activation
from vis.visualization import visualize_cam, overlay
from vis.utils import utils
from keras import activations
from scipy import ndimage

In [None]:
# Reading in data

data = pd.read_csv("/x_ray_dataset_input.csv 2.zip",header = None).values
y = pd.read_csv("/x_ray_dataset_output.csv 2.zip",header = None).values

In [None]:
# Reshaping the data and plotting a sample image in the dataset

data = data.reshape(362,256,256,1)
y = np.asarray(y)

plt.imshow(data[1,:,:,0],cmap = 'gray')

In [None]:
# Reshuffling the dataset 

new_sequence = np.random.permutation(data.shape[0])
data = data[new_sequence]
y = y[new_sequence] 

In [None]:
# Encoding Y labels

enc = LabelEncoder()
y = enc.fit_transform(y)
y = to_categorical(y, num_classes = 3)

print(y[:10])

# Labels coded as:
# 0: Covid
# 1: Normal
# 2: Viral Pneumonia

In [None]:
# Splitting Training and Testing data

x_train, x_test, y_train, y_test = train_test_split(data, 
                                                    y, 
                                                    test_size = 0.25, 
                                                    shuffle = True)

In [None]:
# Setting up the data generators

input_data = ImageDataGenerator(rescale = 1./255)

training_data = input_data.flow(x_train, 
                                y_train, 
                                batch_size = 10) 
                    
testing_data = input_data.flow(x_test, 
                               y_test, 
                               batch_size = 10) 

In [None]:
# Developing CNN model

#=============================================================================
input_img = Input(shape=(256, 256, 1)) 

# CNN part of the model

output = Conv2D(32, kernel_size = (3,3), activation = 'relu')(input_img)
output = MaxPool2D((2, 2))(output)

output = Conv2D(64, kernel_size = (3,3), activation = 'relu')(output)
output = Conv2D(128, kernel_size = (3,3), activation = 'relu')(output)
output = MaxPool2D((2, 2))(output)

output = Flatten()(output)

# Dense layers of the model

output = Dense(512,activation = 'relu')(output)
output = Dropout(0.2)(output)

prob_output = Dense(3, activation = 'softmax')(output)
#=============================================================================

# Model
mdl = Model(input_img, prob_output)

# Optimizer
adm = optimizers.adam(lr = 0.000001)

# Compiling the model
mdl.compile(optimizer=adm, loss='categorical_crossentropy', metrics = ['accuracy'])

In [None]:
# Keeping track of the validation accuracy and storing the best weights
checkpoints = ModelCheckpoint(filepath="weights.hdf5", 
                               monitor = 'val_accuracy',
                               verbose=0, 
                               save_best_only=True)

In [None]:
# Training the model
history = mdl.fit_generator(training_data,
                            epochs = 200,
                            callbacks=[checkpoints],
                            validation_data = testing_data)
                                   

In [None]:
# Loading the best weights and saving the model

mdl.load_weights('weights.hdf5')
mdl.save('covid19_cnn.h5')

In [None]:
# Visualizing training and testing accuracy over the epochs

train_acc = history.history['accuracy']
test_acc = history.history['val_accuracy']
train_loss = history.history['loss']
test_loss = history.history['val_loss']

epochs = range(1,len(train_acc)+1)

plt.plot(epochs,train_acc, 'b', label = 'Training accuracy')
plt.plot(epochs,test_acc, 'orange',label = 'Testing accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.title("Model Evaluation")
plt.legend()

plt.show()

plt.plot(epochs,train_loss, 'b', label = 'Training loss')
plt.plot(epochs,test_loss, 'orange',label = 'Testing loss')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.title("Model Evaluation")
plt.legend()


In [None]:
# Function to plot the activaitons

def activation_plotter(mdl, image):

  l1 = 2
  
  l2 = 6

  
  # list of layers and their names

  list_layers = [layers.output for layers in mdl.layers[l1-1:l2]]
  layer_names = [layers.name for layers in  mdl.layers[l1-1:l2]]

  
  # Model that will capture the activations at the end of each layer  
  mdl_activation = Model(outputs = list_layers, inputs = mdl.input)

 
  # Activations of the image passed to the function
  activations = mdl_activation.predict((image/255).reshape(1,256,256,1))

  
  # For loop to plot activation for each layer
  for idx, layer_name in enumerate(layer_names):
 
    filter_image_shape = activations[idx].shape[1:3]
    no_of_filters = activations[idx].shape[-1]

    activ_val = []
    
    # Activation value for each filter in a given layer
    for f in range(no_of_filters):
      activ_val.append(activations[idx][0,:,:,f].sum())
    
    # Plotting the activation value of the filters in this layer

    fig1 = plt.figure(figsize = (20,5))
    plt.bar(x = list(range(1,no_of_filters+1)), 
            height = activ_val, 
            width = 0.5,
            color = 'gray',
            edgecolor = 'black')
    plt.xlabel('Filters')
    plt.ylabel('Activation')
    plt.title("Activations for {}".format(layer_name))

    images_per_row = 16

    # Compiling the feature maps into a grid so that it is easy to display
    
    mega_array_shape_columns = filter_image_shape[1]*images_per_row
    mega_array_shape_rows = int(filter_image_shape[0]*(no_of_filters/images_per_row))
    mega_array = np.zeros((mega_array_shape_rows, mega_array_shape_columns))

    col = 0
    row = 0
    a = filter_image_shape[0]
    b = filter_image_shape[1]
    for i in range(int(no_of_filters/images_per_row)):
      for j in range(images_per_row):
        mega_array[i*a:(i+1)*(a),j*b:(j+1)*b] = activations[idx][0,:,:,(i*images_per_row)+j]
    
    # PLottng the grid of the feature maps for this layer
    
    fig2 = plt.figure(figsize = (20,10))
    plt.imshow(mega_array, cmap = 'gray')
    plt.title(layer_name)
    plt.show()
  
  return activations, mdl_activation, layer_names

In [None]:
# Calling function to plot activations 

activation_val, mdl_activation, layer_names = activation_plotter(mdl,x_train[8])

In [None]:
# This function returns the activation value for a given layer and image 

def layerwise_activation_vector(mdl_activation,image, name_of_layer):
  
  # Activations of the image passed to the function
  activations = mdl_activation.predict((image/255).reshape(1,256,256,1))

  for idx, layer_name in enumerate(layer_names):
    if layer_name == name_of_layer:
      
      no_of_filters = activations[idx].shape[-1]
      
      activ_val = []
      
      for f in range(no_of_filters):
        activ_val.append(activations[idx][0,:,:,f].sum())
      
      return activ_val
    

In [None]:
# This function compiles the activation values obtained from layerwise_activation_vector()
# for the dataset of images and required layer name passed to it, into a matrix of activations where each row 
# corresponds to the activation for a given image and layer

def activation_matrix(mdl_activation,data, name_of_layer):
  
  active_matrix = []
  
  for i in range(data.shape[0]):
    active_matrix.append(layerwise_activation_vector(mdl_activation,data[i],name_of_layer))

  return np.asarray(active_matrix)

In [None]:
# Function to generate clusters from activation values and check how those clusters perform against the performance of CNNs

def cluster_analysis(mdl, activations, y_test,x_test):
  
  # Hirarchical clustering of the matrix of activations
  cluster_mdl  = ac(n_clusters = 3)

  # Predictions of the clustering algorithm
  predicted_classes = cluster_mdl.fit_predict(activations)

  # Real classes in the test dataset
  original_classes = np.argmax(y_test,axis = 1)

  # Predictions made by the CNN model
  cnn_predictions = np.argmax(mdl.predict(x_test), axis = 1)

  # Calssification accuracies of:

  # Cluster v/s Real Class
  print(accuracy_score(predicted_classes,original_classes))

  # CNN predictions v/s Real Class
  print(accuracy_score(cnn_predictions,original_classes))

  # Cluster v/s CNN predictions
  print(accuracy_score(predicted_classes,cnn_predictions))

  return

In [None]:
# Layer wise activation
matrix_of_activations = activation_matrix(mdl_activation, x_test, 'conv2d_1')

# Hirarchical clustering of the layerwise activation
cluster_analysis(mdl,matrix_of_activations, y_test,x_test)

In [None]:
# Calling the clustering function on all the activations 

full_mat = activation_matrix(mdl_activation, x_test, mdl.layers[1].name)

for layers in  mdl.layers[2:6]:
  full_mat = np.concatenate((full_mat,activation_matrix(mdl_activation, x_test, layers.name)), axis = 1)

cluster_analysis(mdl,full_mat, y_test,x_test)

In [None]:
# Confusion Matrix to understand the distribution of correct and incorrect predicitons

confusion_matrix = pd.DataFrame(index = ['t_covid','t_normal','t_vp'], columns = ['p_covid','p_normal','p_vp'])

classification_list = [[],[],[],[],[],[],[],[],[]]
for i in range(x_test.shape[0]):
  original_class = np.argmax(y_test[i])
  predicted_class = np.argmax(mdl.predict(np.expand_dims(x_test[i],axis = 0)))
  classification_list[(original_class*4)+predicted_class-original_class].append(i)
  
for i in range(3):
  for j in range(3):
    confusion_matrix.iat[i,j] =classification_list[(i*3)+j]

In [None]:
confusion_matrix

In [None]:
#  need to convert the softmax output to a linear output
from keras import activations
layer_idx = utils.find_layer_idx(mdl, 'dense_2')
mdl.layers[layer_idx].activation = activations.relu
mdl_linear_op = utils.apply_modifications(mdl)

In [None]:
# A function to take in the images and generate the grad_cam and saliency maps

def cnn_visualizer(images, mdl_linear_op,layer_idx, input_class, predicted_class, row_name, col_name):
 
  no_of_images = len(images)
  images_per_row = 5

  image_array_columns = 256*images_per_row
  image_array_rows = int(np.ceil(no_of_images/images_per_row))*256
  
  image_array = np.zeros((image_array_rows, image_array_columns))
  salient_map_array = np.zeros((image_array_rows, image_array_columns))
  cam_actual_array = np.zeros((image_array_rows, image_array_columns))
  cam_pred_array = np.zeros((image_array_rows, image_array_columns))


  for idx,image in enumerate(images):
    a = int(np.floor(idx / 5))
    b = idx - (5*a)

    image_array[(a*256):((a+1)*256), b*256:(b+1)*256] = image.reshape(256,256) 

    gradient = visualize_saliency(mdl_linear_op, 
                              layer_idx,
                              filter_indices = None,
                              seed_input = image)
    gaus_filter = ndimage.gaussian_filter(gradient, sigma = 5)
    salient_map_array[a*256:(a+1)*256, b*256:(b+1)*256] = gaus_filter 

    visualization = visualize_cam(mdl_linear_op, layer_idx, filter_indices=input_class[idx], seed_input=image)
    cam_actual_array[a*256:(a+1)*256, b*256:(b+1)*256] = visualization

    visualization = visualize_cam(mdl_linear_op, layer_idx, filter_indices=predicted_class[idx], seed_input=image)
    cam_pred_array[a*256:(a+1)*256, b*256:(b+1)*256] = visualization

  
    
  # Plottng the grid of the feature maps for this layer
  fig, ax = plt.subplots(3,1,figsize=(20,15), constrained_layout = True)
  fig.suptitle('{}+{}'.format(row_name,col_name))

  ax[0].imshow(image_array, cmap = 'gray')
  ax[0].imshow(salient_map_array, alpha = 0.6)
  ax[0].set(title ='Saliency Map')

  ax[1].imshow(image_array, cmap = 'gray')
  ax[1].imshow(cam_actual_array,cmap = 'RdYlBu',alpha = 0.6)
  ax[1].set(title ='CAM Map Corresponding to True Classes')

  ax[2].imshow(image_array, cmap = 'gray')
  ax[2].imshow(cam_pred_array,cmap = 'RdYlBu', alpha = 0.6)
  ax[2].set(title='CAM Map Corresponding to Predicted Classes')

  plt.show()

  return

In [None]:
# Plottiing the different visualizzations by predicted and actual class
# to make an attempt at understanding visually how the CNN makes predictions

for i in range(3):
  for j in range(3):

    row = i
    col = j
    image_list = confusion_matrix.iloc[row,col]
    if len(image_list) != 0:
      images = x_test[image_list]
      input_class = np.argmax(y_test[image_list], axis = 1)

      predicted_class =np.argmax(mdl.predict(images), axis =1)
      row_name = confusion_matrix.index[row]
      col_name = confusion_matrix.columns[col]
      cnn_visualizer(images,mdl_linear_op,layer_idx,input_class,predicted_class,row_name,col_name)