<a href="https://colab.research.google.com/github/icanardahan/COVID-19-Prediction-from-X-Ray-Pictures/blob/main/CovidXray.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Libraries
import os, glob, time, random

%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns; sns.set()

from PIL import Image

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import Sequential
import tensorflow.keras.layers as Layers

In [None]:
# Helper Functions
def setupMatplotLib():
    plt.rc('figure', figsize=(10,7))
    plt.rc('font', size=14)
    plt.rc('axes', titlesize=18)     # fontsize of the axes title
    plt.rc('axes', labelsize=18)     # fontsize of the x and y labels
    plt.rc('xtick', labelsize=14)    # fontsize of the tick labels
    plt.rc('ytick', labelsize=14)    # fontsize of the tick labels
    plt.rc('legend', fontsize=12)    # legend fontsize
    plt.rc('figure', titlesize=20)   # fontsize of the figure title
setupMatplotLib()

def plot_history(histories, vertical=False, otherkey='accuracy'):
    plt.rc('figure', figsize=(14,8))
    if vertical:
        p,r = 2,1
    else:
        p,r = 1,2 
    anyPlots = False
    k=1
    for name, history in histories:
        if 'val_loss' in history.history.keys():
            anyPlots = True
            
            ax = plt.subplot(p,r,1)
            val = ax.plot(history.epoch, history.history['val_loss'],
                           '--', label=name.title()+' Val')
            ax.plot(history.epoch, history.history['loss'], color=val[0].get_color(),
                     label=name.title()+' Train')
        elif 'loss' in history.history.keys():
            anyPlots = True
            
            ax = plt.subplot(p,r,1)
            ax.plot(history.epoch, history.history['loss'], label=name.title()+' Train')
            
        if 'val_' + otherkey in history.history.keys():
            k = 2
            anyPlots = True
            
            ax = plt.subplot(p,r,2)
            val = ax.plot(history.epoch, history.history['val_' + otherkey],
                           '--', label=name.title()+' Val')
            ax.plot(history.epoch, history.history[otherkey], color=val[0].get_color(),
                     label=name.title()+' Train')
        elif otherkey in history.history.keys():
            k = 2
            anyPlots = True
            
            ax = plt.subplot(p,r,2)
            ax.plot(history.epoch, history.history[otherkey], label=name.title()+' Train')
        plt.rc('figure', figsize=(10,7))

    if anyPlots:
        for i in range(1,k+1):
            if i == 1:
                key = histories[0][1].model.loss
            else:
                key = otherkey
            ax = plt.subplot(p,r,i)
            ax.set_xlabel('Epochs')
            ax.set_ylabel(key.replace('_',' ').title())
            ax.legend()
        plt.tight_layout()
        
def plotClassCounts(countDict):
    plt.figure(figsize=(12,8))
    plt.bar(range(len(countDict)), countDict.values())
    plt.xticks(range(len(countDict)),countDict.keys())
    plt.ylabel('Sayılar')
    plt.xlabel('Sınıflar')

# We assume that all files in the folder are images
def plotRandomImages(path):
    classNames = ['COVID-19','normal','pneumonia']
    images = []
    for i, cn in enumerate(classNames):
        imageNames = imagesC19 = os.listdir(os.path.join(path,cn))
        filepath = random.choice(imageNames)
        images.append(Image.open(os.path.join(path,cn,filepath)))
    plt.figure(figsize=(18,6))
    for i in range(3):
        plt.subplot(1,3,i+1)
        plt.xticks([])
        plt.yticks([])
        plt.grid(False)
        plt.imshow(images[i], cmap=plt.cm.binary_r)
        plt.xlabel(classNames[i])
    plt.show()

In [None]:
# Training and validation set to start with the same random kernel
seed = int(time.time())

# The pictures we have are 256x256. We train it as 128x128 so that the training is a little faster
imsize = (128, 128)

# Whether we will replace the data with different transformations
# Generally, this works if the data is low, but does not seem necessary for this problem.
augment = False 

datadir = 'data'

if augment:
    
    # We select enough to fit in the memory, you may need to choose different where you use
    batch_size = 256
    
    traingGen = tf.keras.preprocessing.image.ImageDataGenerator(
        rotation_range=3,
        width_shift_range=0.1,
        height_shift_range=0.1,
        shear_range=0.1,
        zoom_range=0.1,
        rescale=1/255.,
        validation_split=0.2,
        dtype='float16'
    )
    
    testGen = tf.keras.preprocessing.image.ImageDataGenerator(
        rescale=1/255.,
        dtype='float16'
    ) 
    
    trainData = traingGen.flow_from_directory(
        directory = os.path.join(datadir, 'train'),
        color_mode="grayscale",
        batch_size=batch_size,
        target_size=imsize,
        shuffle=True,
        subset='training',
        seed = seed,
        class_mode="sparse"
    )
    
    valData = traingGen.flow_from_directory(
        directory = os.path.join(datadir, 'train'),
        color_mode="grayscale",
        batch_size=batch_size,
        target_size=imsize,
        shuffle=True,
        subset='validation',
        seed = seed,
        class_mode="sparse"
    )
    
    testData = testGen.flow_from_directory(
        directory = os.path.join(datadir, 'test'),
        color_mode="grayscale",
        batch_size=batch_size,
        target_size=imsize,
        class_mode="sparse",
        shuffle = False
    )
    
else:
    
    # We select enough to fit in the memory, you may need to choose different where you use
    batch_size = 128
    
    trainData = tf.keras.preprocessing.image_dataset_from_directory(
        directory = os.path.join(datadir, 'train'),
        color_mode="grayscale",
        batch_size=batch_size,
        image_size=imsize,
        shuffle=True,
        validation_split=0.2,
        subset='training',
        seed = seed
    )

    valData = tf.keras.preprocessing.image_dataset_from_directory(
        directory = os.path.join(datadir, 'train'),
        color_mode="grayscale",
        batch_size=batch_size,
        image_size=imsize,
        shuffle=True,
        validation_split=0.2,
        subset='validation',
        seed = seed
    )

    testData = tf.keras.preprocessing.image_dataset_from_directory(
        directory = os.path.join(datadir, 'test'),
        color_mode="grayscale",
        batch_size=batch_size,
        image_size=imsize,
        shuffle = False
    )

In [None]:
# Let's look at the class numbers

# If we wanted to count from data, but because we know the numbers, we pass
#if augment:
#    yTrain = trainData.classes
#    yTest  = testData.classes
#else:
#    yTrain = np.concatenate([y for x, y in trainData], axis=0)
#    yTest  = np.concatenate([y for x, y in testData], axis=0)

# Training and test clusters were separated from the data as 90-10. While training, we will separate 20% of 90% as validation.
counts = {'train':{'COVID-19':1593,'normal':7966,'pneumonia':5462},
          'test': {'COVID-19':177, 'normal': 885,'pneumonia': 607}}

# Class proportions are obviously not equal
train_ratios = np.array(list(counts['train'].values()))/sum(counts['train'].values())

print('Class Proportions:',train_ratios)

In [None]:
plotClassCounts(counts['train'])

In [None]:
# Random Examples
plotRandomImages('data/train')

Now it is time to set up the model. We provide a very small model to start with, but this isn't enough

In [None]:
# It's up to you to improve it!
def baseCnnClassifier(optimizer='adam'):
    model = Sequential()
    
    # A typical convolutional group
    model.add(Layers.Conv2D(filters = 16, kernel_size=(5,5), strides = 1, padding='same'))
    model.add(Layers.Activation('relu'))
    model.add(Layers.Conv2D(filters = 16, kernel_size=(5,5), strides = 1, padding='same'))
    model.add(Layers.Activation('relu')) 
    model.add(Layers.BatchNormalization()) 
    model.add(Layers.MaxPool2D(pool_size=(2,2),strides=2))
    
    model.add(Layers.Conv2D(filters=32,kernel_size=(3,3), strides = 1, padding='same'))
    model.add(Layers.Activation('relu'))
    model.add(Layers.Conv2D(filters=32,kernel_size=(3,3), strides = 1, padding='same'))
    model.add(Layers.Activation('relu'))
    model.add(Layers.BatchNormalization())
    model.add(Layers.MaxPool2D(pool_size=(2,2),strides=2))
    
    # We flatten it to attach it to the output layer
    model.add(Layers.GlobalAveragePooling2D())
    
    model.add(Layers.Dense(3,activation='softmax', dtype='float32'))
    model.compile(loss='sparse_categorical_crossentropy', optimizer=optimizer, metrics=['accuracy'])

    return model

In [None]:
# Lets create a model
baseModel = baseCnnClassifier(optimizer=tf.keras.optimizers.Adam(learning_rate=0.0005))

# To end early in case of overfitting
earlyStop = keras.callbacks.EarlyStopping(monitor='val_accuracy', patience=5)

# Record the best performing model according to our validation cluster.
modelSave = keras.callbacks.ModelCheckpoint('./checkpoint',monitor='val_accuracy', 
                                            save_best_only=True, save_weights_only=True,
                                            mode='max',)
epochs = 20

baseHist = baseModel.fit(trainData,
                         epochs = epochs, verbose = 1,
                         # We will choose a model according to the validation data and stop the training.
                         validation_data=valData,
                         callbacks = [earlyStop, modelSave],                         
                         # Class rates were not equal, we are increasing its weight
                         class_weight = {0: 4, 1: 1, 2: 1}
                        )

baseModel.summary()

In [None]:
plot_history([('COVID',baseHist)]) 

In [None]:
# We load the weights that give the best validation result.
baseModel.load_weights('./checkpoint')
#print('Training performance', baseModel.evaluate(trainData))

#We test it with a cluster that we don't use in education and model selection.
print('Test performance', baseModel.evaluate(testData))

Look at the results in more detail.

In [None]:
from sklearn.metrics import classification_report, confusion_matrix, plot_confusion_matrix, ConfusionMatrixDisplay
ypred = np.argmax(baseModel.predict(testData), axis=-1)
if augment:
    ytrue = testData.classes
else:
    ytrue = np.concatenate([y for x, y in testData], axis=0)
print(classification_report(ytrue, ypred))

In [None]:
cm = confusion_matrix(ytrue, ypred)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=['COVID-19','Normal','Pneumonia'])
disp.plot(cmap='Blues')