In [None]:
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import VGG16, ResNet50V2, DenseNet121
from tensorflow.keras.layers import AveragePooling2D, Dropout, Flatten, Dense, Input, BatchNormalization, Activation
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from imutils import paths
import matplotlib.pyplot as plt
import numpy as np
import argparse
import cv2
import os
from tqdm import tqdm
from sklearn.metrics import confusion_matrix, precision_recall_fscore_support, f1_score, precision_score, recall_score, classification_report
from datetime import datetime
import pickle as pkl

In [None]:
%cd /content/drive/My Drive/hackathon

# Load and preprocess data

In [None]:
def to_one_hot(y):
  if y == 'covid':
    return np.array([1, 0, 0])
  elif y == 'normal':
    return np.array([0, 1, 0])
  elif y == 'pneumonia':
    return np.array([0, 0, 1])
  else:
    raise ValueError(y + ' does not belong to any class')

In [None]:
# Load and preprocess train data
train_data = []
train_label = []
train_path = '/content/drive/My Drive/hackathon/images/train'
#train_path = '/content/drive/My Drive/hackathon/images/chest-crops/train'

for folder in os.listdir(train_path):
  for img in tqdm(os.listdir(os.path.join(train_path, folder))):
    # read, convert channels, and resize images
    image = cv2.imread(os.path.join(train_path, folder, img))
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = cv2.resize(image, (224, 224))

    # Extract labels
    label = folder

    # Append to lists
    train_data.append(image)
    train_label.append(label)

# Convert to numpy array and normalize images
x_train = np.array(train_data)/255.
y_train = np.array(train_label)
print(x_train.shape, y_train.shape)

# convert y to one-hot
y_train_one_hot = np.array([to_one_hot(y) for y in y_train])
y_train_one_hot.shape

In [None]:
# Load and preprocess validation data
val_data = []
val_label = []
val_path = '/content/drive/My Drive/hackathon/images/val'
#val_path = '/content/drive/My Drive/hackathon/images/chest-crops/val'

for folder in os.listdir(val_path):
  for img in tqdm(os.listdir(os.path.join(val_path, folder))):
    # read, convert channels, and resize images
    image = cv2.imread(os.path.join(val_path, folder, img))
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = cv2.resize(image, (224, 224))

    # Extract labels
    label = folder

    # Append to lists
    val_data.append(image)
    val_label.append(label)

# Convert to numpy array and normalize images
x_val = np.array(val_data)/255.
y_val = np.array(val_label)
print(x_val.shape, y_val.shape)

# convert y to one-hot
y_val_one_hot = np.array([to_one_hot(y) for y in y_val])
y_val_one_hot.shape

In [None]:
# Save to pkl
with open('x_train.pkl', 'wb') as file:
  pkl.dump(x_train, file)
with open('y_train.pkl', 'wb') as file:
  pkl.dump(y_train, file)
with open('y_train_one_hot.pkl', 'wb') as file:
  pkl.dump(y_train_one_hot, file)

with open('x_val.pkl', 'wb') as file:
  pkl.dump(x_val, file)
with open('y_val.pkl', 'wb') as file:
  pkl.dump(y_val, file)
with open('y_val_one_hot.pkl', 'wb') as file:
  pkl.dump(y_val_one_hot, file)

In [None]:
# Load from pkl
with open('x_train.pkl', 'rb') as file:
  x_train = pkl.load(file)
with open('y_train.pkl', 'rb') as file:
  y_train = pkl.load(file)
with open('y_train_one_hot.pkl', 'rb') as file:
  y_train_one_hot = pkl.load(file)

with open('x_val.pkl', 'rb') as file:
  x_val = pkl.load(file)
with open('y_val.pkl', 'rb') as file:
  y_val = pkl.load(file)
with open('y_val_one_hot.pkl', 'rb') as file:
  y_val_one_hot = pkl.load(file)

In [None]:
# Cropped version: Load and preprocess train data
train_data_crop = []
train_label_crop = []
#train_path = '/content/drive/My Drive/hackathon/images/train'
train_path = '/content/drive/My Drive/hackathon/images/chest-crops/train'

for folder in os.listdir(train_path):
  for img in tqdm(os.listdir(os.path.join(train_path, folder))):
    # read, convert channels, and resize images
    image = cv2.imread(os.path.join(train_path, folder, img))
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = cv2.resize(image, (224, 224))

    # Extract labels
    label = folder

    # Append to lists
    train_data_crop.append(image)
    train_label_crop.append(label)

# Convert to numpy array and normalize images
x_train_crop = np.array(train_data_crop)/255.
y_train_crop = np.array(train_label_crop)
print(x_train_crop.shape, y_train_crop.shape)

# convert y to one-hot
y_train_one_hot_crop = np.array([to_one_hot(y) for y in y_train_crop])
y_train_one_hot_crop.shape

In [None]:
# Cropped version: Load and preprocess train data
val_data_crop = []
val_label_crop = []
#val_path = '/content/drive/My Drive/hackathon/images/val'
val_path = '/content/drive/My Drive/hackathon/images/chest-crops/val'

for folder in os.listdir(val_path):
  for img in tqdm(os.listdir(os.path.join(val_path, folder))):
    # read, convert channels, and resize images
    image = cv2.imread(os.path.join(val_path, folder, img))
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = cv2.resize(image, (224, 224))

    # Extract labels
    label = folder

    # Append to lists
    val_data_crop.append(image)
    val_label_crop.append(label)

# Convert to numpy array and normalize images
x_val_crop = np.array(val_data_crop)/255.
y_val_crop = np.array(val_label_crop)
print(x_val_crop.shape, y_val_crop.shape)

# convert y to one-hot
y_val_one_hot_crop = np.array([to_one_hot(y) for y in y_val_crop])
y_val_one_hot_crop.shape

In [None]:
# Cropped: Save to pkl
with open('x_train_crop.pkl', 'wb') as file:
  pkl.dump(x_train_crop, file)
with open('y_train_crop.pkl', 'wb') as file:
  pkl.dump(y_train_crop, file)
with open('y_train_one_hot_crop.pkl', 'wb') as file:
  pkl.dump(y_train_one_hot_crop, file)

with open('x_val_crop.pkl', 'wb') as file:
  pkl.dump(x_val_crop, file)
with open('y_val_crop.pkl', 'wb') as file:
  pkl.dump(y_val_crop, file)
with open('y_val_one_hot_crop.pkl', 'wb') as file:
  pkl.dump(y_val_one_hot_crop, file)

In [None]:
# Cropped: Load from pkl
with open('x_train_crop.pkl', 'rb') as file:
  x_train_crop = pkl.load(file)
with open('y_train_crop.pkl', 'rb') as file:
  y_train_crop = pkl.load(file)
with open('y_train_one_hot_crop.pkl', 'rb') as file:
  y_train_one_hot_crop = pkl.load(file)

with open('x_val_crop.pkl', 'rb') as file:
  x_val_crop = pkl.load(file)
with open('y_val_crop.pkl', 'rb') as file:
  y_val_crop = pkl.load(file)
with open('y_val_one_hot_crop.pkl', 'rb') as file:
  y_val_one_hot_crop = pkl.load(file)

In [None]:
# Create uncropped train+val
x_trainval = np.concatenate((x_train, x_val))
x_trainval.shape

with open('x_trainval.pkl', 'wb') as file:
  pkl.dump(x_trainval, file)

y_trainval_one_hot = np.concatenate((y_train_one_hot, y_val_one_hot))
y_trainval_one_hot.shape

with open('y_trainval_one_hot.pkl', 'wb') as file:
  pkl.dump(y_trainval_one_hot, file)

In [None]:
# Create cropped train+val
x_trainval_crop = np.concatenate((x_train_crop, x_val_crop))
x_trainval_crop.shape

with open('x_trainval_crop.pkl', 'wb') as file:
  pkl.dump(x_trainval_crop, file)

y_trainval_one_hot_crop = np.concatenate((y_train_one_hot_crop, y_val_one_hot_crop))
y_trainval_one_hot_crop.shape

with open('y_trainval_one_hot_crop.pkl', 'wb') as file:
  pkl.dump(y_trainval_one_hot_crop, file)

# Model
- VGG 16
- ResNet50V2
- DenseNet121

In [None]:
# load the VGG16 network, ensuring the head FC layer sets are left off
#baseModel = VGG16(weights="imagenet", include_top=False, input_tensor=Input(shape=(224, 224, 3)))
baseModel = ResNet50V2(weights="imagenet", include_top=False, input_tensor=Input(shape=(224, 224, 3)))
#baseModel = DenseNet121(weights="imagenet", include_top=False, input_tensor=Input(shape=(224, 224, 3)))

# construct the head of the model that will be placed on top of the the base model
headModel = baseModel.output
headModel = AveragePooling2D(pool_size=(4, 4))(headModel)
headModel = Flatten(name="flatten")(headModel)
headModel = Dense(64, activation="relu")(headModel)
headModel = Dropout(0.5)(headModel)
headModel = Dense(3, activation="softmax")(headModel)

# place the head FC model on top of the base model (this will become the actual model we will train)
model = Model(inputs=baseModel.input, outputs=headModel)

# loop over all layers in the base model and freeze them so they will *not* be updated during the first training process
for layer in baseModel.layers:
	layer.trainable = False

In [None]:
model.summary()

Have tested connecting deeper network but performs worse.

In [None]:
# # load the VGG16 network, ensuring the head FC layer sets are left off
# # baseModel = VGG16(weights="imagenet", include_top=False, input_tensor=Input(shape=(224, 224, 3)))
# baseModel = ResNet50V2(weights="imagenet", include_top=False, input_tensor=Input(shape=(224, 224, 3)))

# # construct the head of the model that will be placed on top of the the base model
# headModel = baseModel.output
# headModel = AveragePooling2D(pool_size=(4, 4))(headModel)
# headModel = Flatten(name="flatten")(headModel)

# headModel = Dense(1024)(headModel)
# headModel = BatchNormalization()(headModel)
# headModel = Activation('relu')(headModel)
# headModel = Dropout(0.5)(headModel)

# headModel = Dense(512)(headModel)
# headModel = BatchNormalization()(headModel)
# headModel = Activation('relu')(headModel)
# headModel = Dropout(0.5)(headModel)

# headModel = Dense(256)(headModel)
# headModel = BatchNormalization()(headModel)
# headModel = Activation('relu')(headModel)
# headModel = Dropout(0.5)(headModel)

# headModel = Dense(128)(headModel)
# headModel = BatchNormalization()(headModel)
# headModel = Activation('relu')(headModel)
# headModel = Dropout(0.5)(headModel)

# headModel = Dense(64)(headModel)
# headModel = BatchNormalization()(headModel)
# headModel = Activation('relu')(headModel)
# headModel = Dropout(0.5)(headModel)

# headModel = Dense(3, activation="softmax")(headModel)

# # place the head FC model on top of the base model (this will become the actual model we will train)
# model = Model(inputs=baseModel.input, outputs=headModel)

# # loop over all layers in the base model and freeze them so they will *not* be updated during the first training process
# for layer in baseModel.layers:
# 	layer.trainable = False

# Train

- Set hyperparameters
- Compile
- Create callbacks to save only the best checkpoint
- Augment data
- Train

In [None]:
# Set hyperparameters
INIT_LR = 1e-3
EPOCHS = 20
BS = 8

# compile our model
opt = Adam(lr=INIT_LR, decay=INIT_LR / EPOCHS)
model.compile(loss="categorical_crossentropy", optimizer=opt, metrics=["accuracy", tf.keras.metrics.AUC()])

# Save only best checkpoints
time = datetime.now().strftime('_%H-%M-%S')

checkpoint_filepath_best = 'resnet50_all_checkpoints_best'+time+'/'
os.mkdir(checkpoint_filepath_best)

model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_filepath_best,
    save_weights_only=False,
    monitor='val_accuracy',# val_auc_4 val_accuracy
    mode='max',
    save_best_only=True)

# train the head of the network
trainAug = ImageDataGenerator(rotation_range=15, fill_mode="nearest") # , horizontal_flip=True, vertical_flip=True

print("[INFO] training head...")
H = model.fit_generator(
	trainAug.flow(x_trainval_crop, y_trainval_one_hot_crop, batch_size=BS),
	steps_per_epoch=len(x_trainval_crop) // BS,
	validation_data=(x_val, y_val_one_hot),
	validation_steps=len(y_train_one_hot) // BS,
	epochs=EPOCHS)#,
  callbacks=[model_checkpoint_callback]) # , cp_callback

In [None]:
# Plot training history
acc = H.history['accuracy']
val_acc = H.history['val_accuracy']
loss=H.history['loss']
val_loss=H.history['val_loss']
epochs_range = range(EPOCHS)

plt.figure(figsize=(16, 4))
plt.subplot(1, 2, 1)
plt.plot(epochs_range, acc, label='Training Accuracy')
plt.plot(epochs_range, val_acc, label='Validation Accuracy')
plt.legend(loc='lower right')
plt.title('Training and Validation Accuracy')

plt.subplot(1, 2, 2)
plt.plot(epochs_range, loss, label='Training Loss')
plt.plot(epochs_range, val_loss, label='Validation Loss')
plt.legend(loc='upper right')
plt.title('Training and Validation Loss')
plt.show()

# Benchmark
Focus mainly on F1-score

In [None]:
def benchmark(model, save_path, x_val, y_val_one_hot):
  # Create prediction
  predictions = model.predict(x_val)

  # Create Confusion Matrix
  cm = confusion_matrix(np.argmax(y_val_one_hot, axis=1), np.argmax(predictions, axis=1))

  # Plot Confusion Matrix
  plot_confusion_matrix(cm, normalize=False, target_names=['covid', 'normal', 'pneumonia'], title="Confusion Matrix", path=save_path)

  # Print classification_report
  report = classification_report(np.argmax(y_val_one_hot, axis=1), np.argmax(predictions, axis=1), target_names=['covid','normal','pneumonia'], output_dict=True)
  avg_f1 = (report['covid']['f1-score'] + report['normal']['f1-score'] + report['pneumonia']['f1-score']) / 3
  avg_acc = report['accuracy']
  print(classification_report(np.argmax(y_val_one_hot, axis=1), np.argmax(predictions, axis=1), target_names=['covid','normal','pneumonia']))
  print('Average F1-Score =', avg_f1)

  # Save avg_f1 and avg_acc to txt file
  with open(save_path+'benchmark.txt', 'w') as file:
    file.write('Average F1-Score: %.4f\nAverage Accuracy: %.4f' % (avg_f1, avg_acc))
    file.write(str(report))


In [None]:
# Benchmark the model
benchmark(tf.keras.models.load_model(checkpoint_filepath_best), save_path=checkpoint_filepath_best, x_val=x_val, y_val_one_hot=y_val_one_hot)

In [None]:
def plot_confusion_matrix(cm, target_names, path, title='Confusion matrix', cmap=None, normalize=True):
    """
    given a sklearn confusion matrix (cm), make a nice plot

    Arguments
    ---------
    cm:           confusion matrix from sklearn.metrics.confusion_matrix

    target_names: given classification classes such as [0, 1, 2]
                  the class names, for example: ['high', 'medium', 'low']

    title:        the text to display at the top of the matrix

    cmap:         the gradient of the values displayed from matplotlib.pyplot.cm
                  see http://matplotlib.org/examples/color/colormaps_reference.html
                  plt.get_cmap('jet') or plt.cm.Blues

    normalize:    If False, plot the raw numbers
                  If True, plot the proportions

    Usage
    -----
    plot_confusion_matrix(cm           = cm,                  # confusion matrix created by
                                                              # sklearn.metrics.confusion_matrix
                          normalize    = True,                # show proportions
                          target_names = y_labels_vals,       # list of names of the classes
                          title        = best_estimator_name) # title of graph

    Citiation
    ---------
    http://scikit-learn.org/stable/auto_examples/model_selection/plot_confusion_matrix.html

    """
    import matplotlib.pyplot as plt
    import numpy as np
    import itertools

    accuracy = np.trace(cm) / float(np.sum(cm))
    misclass = 1 - accuracy

    if cmap is None:
        cmap = plt.get_cmap('Blues')

    plt.figure(figsize=(8, 6))
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title, fontsize = 'xx-large')

    if target_names is not None:
        tick_marks = np.arange(len(target_names))
        plt.xticks(tick_marks, target_names, rotation=45)
        plt.yticks(tick_marks, target_names)

    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]


    thresh = cm.max() / 1.5 if normalize else cm.max() / 2
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        if normalize:
            plt.text(j, i, "{:0.4f}".format(cm[i, j]),
                     horizontalalignment="center",
                     color="white" if cm[i, j] > thresh else "black")
        else:
            plt.text(j, i, "{:,}".format(cm[i, j]),
                     horizontalalignment="center",
                     color="white" if cm[i, j] > thresh else "black")


    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label\naccuracy={:0.4f}; misclass={:0.4f}'.format(accuracy, misclass))
    plt.savefig(path+'confusion_matrix.png')
    plt.show()