Covid-19 X-ray predictor
========================

In [1]:
import os
import pandas as pd
import shutil
import numpy as np
from imutils import paths
import matplotlib.pyplot as plt
import cv2
import os
pd.options.display.max_colwidth = 10000

Preparing the data
------------------

In [2]:
def not_found(path: str) -> bool:
    return not os.path.exists(path)    

In [3]:
base = ".."
data_path = os.path.join(base, "data")
metadata = pd.read_csv(os.path.join(data_path, "metadata.csv"))


In [4]:
pa = metadata.where((metadata.view == "PA") & (metadata.modality == "X-ray"))[["finding", "path"]].dropna()
print(pa.shape)
pa.head(10)

(99, 2)


Unnamed: 0,finding,path
0,COVID-19,/data/sources/covid-19-cv/data/PA/COVID-19/auntminnie-a-2020_01_28_23_51_6665_2020_01_28_Vietnam_coronavirus.jpeg
1,COVID-19,/data/sources/covid-19-cv/data/PA/COVID-19/auntminnie-b-2020_01_28_23_51_6665_2020_01_28_Vietnam_coronavirus.jpeg
2,COVID-19,/data/sources/covid-19-cv/data/PA/COVID-19/auntminnie-c-2020_01_28_23_51_6665_2020_01_28_Vietnam_coronavirus.jpeg
3,COVID-19,/data/sources/covid-19-cv/data/PA/COVID-19/auntminnie-d-2020_01_28_23_51_6665_2020_01_28_Vietnam_coronavirus.jpeg
4,COVID-19,/data/sources/covid-19-cv/data/PA/COVID-19/nejmc2001573_f1a.jpeg
5,COVID-19,/data/sources/covid-19-cv/data/PA/COVID-19/nejmc2001573_f1b.jpeg
6,ARDS,/data/sources/covid-19-cv/data/PA/ARDS/ARDSSevere.png
7,COVID-19,/data/sources/covid-19-cv/data/PA/COVID-19/lancet-case2a.jpg
8,COVID-19,/data/sources/covid-19-cv/data/PA/COVID-19/lancet-case2b.jpg
9,SARS,/data/sources/covid-19-cv/data/PA/SARS/SARS-10.1148rg.242035193-g04mr34g0-Fig8a-day0.jpeg


In [5]:
def get_image(image_path: str):
    image = cv2.imread(image_path)
    return cv2.resize(cv2.cvtColor(image, cv2.COLOR_BGR2RGB), (224, 224))

def image_loader(row):
    if(not_found(row["path"])):
       print("NOT FOUND")
    return get_image(row["path"])

In [6]:
pa["image"] = pa.apply(image_loader, axis=1)
pa["label"] = pa.apply(lambda row: "COVID-19" if row["finding"]=="COVID-19" else "other", axis=1)
pa.shape

(99, 4)

Adding normal data
------------------

In [7]:
normal_path = os.path.join(data_path ,"PA", "NORMAL")
normal_folder = os.listdir(normal_path)
normal_files = np.array([os.path.join(normal_path, d) for d in normal_folder])

nf = pd.DataFrame(data=normal_files, columns = ["path"] )
nf["finding"] = "NORMAL"
nf["label"] = "other"
normal_100 = nf.sample(100)
normal_100["image"] = normal_100.apply(image_loader, axis=1)
normal_100.shape
#nf = [image]
#normal_100.head(1)
#normal_data.shape

(100, 4)

In [8]:
pa["image"] = pa.apply(image_loader, axis=1)
pa["label"] = pa.apply(lambda row: "COVID-19" if row["finding"]=="COVID-19" else "other", axis=1)
pa.shape

(99, 4)

In [9]:
data_frame = pd.concat([normal_100, pa], keys=["image", "path", "label"])
data_frame.shape

(199, 4)

Preparing the model
-------------------

In [140]:
# import the necessary packages
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import *
from tensorflow.keras.applications.densenet import *
from tensorflow.keras.models import Model
from tensorflow.keras.layers import *
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import to_categorical
import tensorflow_addons as tfa
from sklearn.preprocessing import LabelBinarizer
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report

In [107]:
lbs = data_frame["label"].to_numpy()
lbs.shape

(199,)

In [108]:
# perform one-hot encoding on the labels
lb = LabelBinarizer()
labels = to_categorical(lb.fit_transform(lbs))
labels.shape

(199, 2)

In [109]:
data =  np.array(data_frame["image"].to_list()) / 255.0 #scale intensities to the range [0, 255]
data.shape

(199, 224, 224, 3)

In [110]:
(trainX, testX, trainY, testY) = train_test_split(data, labels,
	test_size=0.2, stratify=labels, random_state=42)

In [134]:
INIT_LR = 1e-3
EPOCHS = 100
BS = 32

trainAug = ImageDataGenerator(
	rotation_range=15,
	fill_mode="nearest")

In [135]:
trainAug = ImageDataGenerator(
 samplewise_center=True, 
 samplewise_std_normalization=True, 
 horizontal_flip = True, 
 vertical_flip = False, 
 height_shift_range= 0.05, 
 width_shift_range=0.1, 
 rotation_range=5, 
 shear_range = 0.1,
 fill_mode = 'reflect',
 zoom_range=0.15
 )

In [142]:

input_shape = Input(shape=(224, 224, 3))

#baseModel = inception_resnet_v2.InceptionResNetV2(weights="imagenet", include_top=False,input_tensor= input_shape)
#baseModel = VGG16(weights="imagenet", include_top=False,input_tensor= input_shape)
baseModel = DenseNet121( weights='imagenet',  include_top=False, input_tensor= input_shape)

headModel = baseModel.output
headModel = AveragePooling2D(pool_size=(4, 4))(headModel)
headModel = Flatten(name="flatten")(headModel)
headModel = Dense(96, activation="elu")(headModel)
headModel = Dropout(0.5)(headModel)
headModel = Dense(2, activation="softmax")(headModel)

In [143]:
model = Model(inputs=baseModel.input, outputs=headModel)

In [144]:
#freeze baseModel layers
for layer in baseModel.layers:
	layer.trainable = False

In [146]:

# compile our model
print("[INFO] compiling model...")
opt = tfa.optimizers.RectifiedAdam(lr=INIT_LR, decay = 1e-6)#, decay=INIT_LR / EPOCHS)
#opt = Adam(lr=INIT_LR)#, decay=INIT_LR / EPOCHS)

model.compile(loss="binary_crossentropy", optimizer=opt,
	metrics=["accuracy"])


[INFO] compiling model...


In [147]:
class_weight = {0: 2.0, 1: 1.0} #COVID is 2 times more important

In [None]:
# train the head of the network
print("[INFO] training head...")
H = model.fit(
	trainAug.flow(trainX, trainY, batch_size=BS),
	steps_per_epoch=len(trainX) // BS,
	validation_data=(testX, testY),
	validation_steps=len(testX) // BS,
	epochs=EPOCHS, class_weight=class_weight)

[INFO] training head...
  ...
    to  
  ['...']
  ...
    to  
  ['...']
Train for 4 steps, validate on 40 samples
Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100

In [None]:
# make predictions on the testing set
print("[INFO] evaluating network...")
predIdxs = model.predict(testX, batch_size=BS)

In [None]:
# for each image in the testing set we need to find the index of the
# label with corresponding largest predicted probability
predIdxs = np.argmax(predIdxs, axis=1)

In [None]:

# show a nicely formatted classification report
print(classification_report(testY.argmax(axis=1), predIdxs,
	target_names=lb.classes_))

In [None]:

# compute the confusion matrix and and use it to derive the raw
# accuracy, sensitivity, and specificity
cm = confusion_matrix(testY.argmax(axis=1), predIdxs)
total = sum(sum(cm))
acc = (cm[0, 0] + cm[1, 1]) / total
sensitivity = cm[0, 0] / (cm[0, 0] + cm[0, 1])
specificity = cm[1, 1] / (cm[1, 0] + cm[1, 1])


In [None]:

# show the confusion matrix, accuracy, sensitivity, and specificity
print(cm)
print("acc: {:.4f}".format(acc))
print("sensitivity: {:.4f}".format(sensitivity))
print("specificity: {:.4f}".format(specificity))

In [None]:
# plot the training loss and accuracy
N = EPOCHS
plt.style.use("ggplot")
plt.figure()
plt.plot(np.arange(0, N), H.history["loss"], label="train_loss")
plt.plot(np.arange(0, N), H.history["val_loss"], label="val_loss")
plt.plot(np.arange(0, N), H.history["accuracy"], label="train_acc")
plt.plot(np.arange(0, N), H.history["val_accuracy"], label="val_acc")
plt.title("Training Loss and Accuracy on COVID-19 Dataset")
plt.xlabel("Epoch #")
plt.ylabel("Loss/Accuracy")
plt.legend(loc="lower left")

In [None]:
# serialize the model to disk
print("[INFO] saving COVID-19 detector model...")
model.save(args["model"], save_format="h5")