**Imports**

In [None]:
import os, shutil, PIL, keras
import matplotlib.pyplot as plt
import tensorflow as tf
import keras.backend as K
import random
import numpy as np

from pathlib import Path
from keras import layers, models, optimizers, regularizers

from keras.preprocessing.image import ImageDataGenerator
from keras.preprocessing import image
from keras.applications.resnet50 import ResNet50
from keras.applications.densenet import DenseNet121
from keras.applications.nasnet import NASNetMobile
from keras.models import Model, Input, Sequential,load_model
from keras.layers import AveragePooling2D, Dense, Dropout
from keras.callbacks import Callback, ModelCheckpoint
from keras.wrappers.scikit_learn import KerasClassifier


Using TensorFlow backend.


**Mounting Drive**

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive


**Getting paths and directories, setting global variables**

In [None]:
train_batch_size = 10
val_batch_size = 10
num_epochs = 20
image_size = (224, 224)

data_dir = '/content/drive/My Drive/CVT'

train_dir = os.path.join(data_dir, 'train')
Path(train_dir).mkdir(parents=True, exist_ok=True)

validation_dir = os.path.join(data_dir, 'validation')
Path(validation_dir).mkdir(parents=True, exist_ok=True)

test_dir = os.path.join(data_dir, 'test')
Path(test_dir).mkdir(parents=True, exist_ok=True)

train_txt = os.path.join(data_dir, 'TrainingTXT')
Path(train_txt).mkdir(parents=True, exist_ok=True)

validation_txt = os.path.join(data_dir, 'ValidationTXT')
Path(validation_txt).mkdir(parents=True, exist_ok=True)

test_txt = os.path.join(data_dir, 'TestingTXT')
Path(test_txt).mkdir(parents=True, exist_ok=True)

augmented_dir = os.path.join(data_dir, 'augmented')
Path(augmented_dir).mkdir(parents=True, exist_ok=True)

**Reading Data**

In [None]:
def read(datadir, fname):
    with open(os.path.join(datadir, fname), 'r') as f:
        data_fnames = [line.rstrip('\n') for line in f.readlines()]
    return data_fnames

def createData(dirname, arr, original_data_dir):
    Path(dirname).mkdir(parents=True, exist_ok=True)
    for fname in arr:
        src = os.path.join(data_dir, original_data_dir, fname)
        dst = os.path.join(dirname, fname)
        shutil.copyfile(src, dst)

def initData():
    training_covid = read(train_txt, 'trainCT_COVID.txt')
    #createData(os.path.join(train_dir, 'COVID'), training_covid, 'CT_COVID')
    training_non_covid = read(train_txt, 'trainCT_NonCOVID.txt')
    #createData(os.path.join(train_dir, 'NonCOVID'), training_non_covid, 'CT_NonCOVID')
    num_train_samples = len(training_covid) + len(training_non_covid)

    validation_covid = read(validation_txt, 'valCT_COVID.txt')
    #createData(os.path.join(validation_dir, 'COVID'), validation_covid, 'CT_COVID')
    validation_non_covid = read(validation_txt, 'valCT_NonCOVID.txt')
    #createData(os.path.join(validation_dir, 'NonCOVID'), validation_non_covid, 'CT_NonCOVID')
    num_val_samples = len(validation_covid) + len(validation_non_covid)

    test_covid = read(test_txt, 'testCT_COVID.txt')
    #createData(os.path.join(test_dir, 'COVID'), test_covid, 'CT_COVID')
    test_non_covid = read(test_txt, 'testCT_NonCOVID.txt')
    #createData(os.path.join(test_dir, 'NonCOVID'), test_non_covid, 'CT_NonCOVID')

    return (num_train_samples, num_val_samples)

**Preprocessing and Data Augmentation**

In [None]:
def add_noise_contrast(img):
    VARIABILITY = 50
    deviation = VARIABILITY*random.random()
    noise = np.random.normal(0, deviation, img.shape)
    img += noise
    np.clip(img, 0., 255.)
    return img

def preprocess_data():
    train_datagen = ImageDataGenerator(
                rescale=1./255,
                rotation_range=40,
                width_shift_range=0.2,
                height_shift_range=0.2,
                brightness_range=[0.4, 1.0],
                # shear_range=0.2,
                zoom_range=0.4,
                horizontal_flip=True,
                vertical_flip=True,
                fill_mode="nearest",
                preprocessing_function=add_noise_contrast
                )

    test_datagen = ImageDataGenerator(rescale=1./255)


    test_generator = test_datagen.flow_from_directory(
        test_dir,
        target_size=image_size,
        class_mode='binary'
    )

    train_generator = train_datagen.flow_from_directory(
        train_dir,
        target_size=image_size,
        batch_size=train_batch_size,
        class_mode='binary'
    )

    validation_generator = train_datagen.flow_from_directory(
        validation_dir,
        target_size=image_size,
        batch_size=val_batch_size,
        class_mode='binary'
    )

    return (train_generator, validation_generator, test_generator)

def test_augmentation():
    datagen = ImageDataGenerator(
            rescale=1./255,
            rotation_range=20,
            width_shift_range=0.2,
            height_shift_range=0.2,
            brightness_range=[0.4, 1.0],
            # shear_range=0.2,
            zoom_range=0.4,
            horizontal_flip=True,
            vertical_flip=True,
            fill_mode="nearest",
            preprocessing_function=add_noise_contrast
            )

    fnames_covid = [os.path.join(train_dir, 'COVID', fname) for fname in os.listdir(os.path.join(train_dir, 'COVID'))]
    fnames_noncovid = [os.path.join(train_dir, 'NonCOVID', fname) for fname in os.listdir(os.path.join(train_dir, 'NonCOVID'))]
    fnames = fnames_covid + fnames_noncovid
    print(len(fnames))
    img_path = fnames[1]
    print(img_path)

    img = image.load_img(img_path, target_size=image_size)
    x = image.img_to_array(img)
    x = x.reshape((1,) + x.shape)
    print(x.shape)
    i = 0
    for batch in datagen.flow(x, batch_size=1):
        plt.figure(i)
        imgplot = plt.imshow(image.array_to_img(batch[0]))
        i += 1
        if (i % 10 == 0):
            break
    plt.show()

**Function to calculate f1 metric**

In [None]:
#to be used to calculate f1 metric
def get_f1(y_true, y_pred): #taken from old keras source code
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
    precision = true_positives / (predicted_positives + K.epsilon())
    recall = true_positives / (possible_positives + K.epsilon())
    f1_val = 2*(precision*recall)/(precision+recall+K.epsilon())
    return f1_val

**Model build without transfer learning**

In [None]:
#original model build without transfer learning
def build():
    model = models.Sequential()
    # (150, 150) means the input image has to be 150 x 150 px
    model.add(layers.Conv2D(32, (3, 3), activation='relu',
                            input_shape=(image_size[0], image_size[1], 3)))
    model.add(layers.MaxPooling2D(2, 2))
    model.add(layers.Conv2D(64, (3, 3), activation='relu'))
    model.add(layers.MaxPooling2D(2, 2))
    model.add(layers.Conv2D(128, (3, 3), activation='relu'))
    model.add(layers.MaxPooling2D(2, 2))
    model.add(layers.Conv2D(128, (3, 3), activation='relu'))
    model.add(layers.MaxPooling2D(2, 2))
    model.add(layers.Flatten())
    model.add(layers.Dropout(0.5))
    model.add(layers.Dense(512, activation='relu'))
    model.add(layers.Dense(1, activation='sigmoid'))

    model.compile(loss='binary_crossentropy',
                  optimizer=optimizers.RMSprop(lr=1e-4),
                  metrics=['acc'])
    return model

**Model build using Densenet121 architecture.**

In [None]:
def build_transfer():
  IMG_SHAPE = (224, 224, 3)

  pre_trained_model = DenseNet121(input_shape=IMG_SHAPE, include_top=False)

  regularizer = tf.keras.regularizers.l1_l2(l1=0.01, l2=0.01)

  for layer in pre_trained_model.layers:
      for attr in ['kernel_regularizer']:
          if hasattr(layer, attr):
            setattr(layer, attr, regularizer)

  last_layer = pre_trained_model.get_layer('relu')
  last_output = last_layer.output


  x = layers.Flatten()(last_output)
  x = layers.Dense(512, activation='relu')(x)
  x = layers.Dropout(0.5)(x)     
  # x = layers.SpatialDropout2D(0.5)(x)                     
  x = layers.Dense (1, activation='sigmoid')(x)           

  model = Model( pre_trained_model.input, x) 

  model.compile(loss='binary_crossentropy',
              optimizer=optimizers.Adam(lr=1e-4),
              metrics=['acc', get_f1])
  return model

**Model build using NASNetMobile architecture.**

In [None]:
def build_transfer2():
  pre_trained_NASNetMobile_model = NASNetMobile(input_shape= (224,224,3), include_top = False, weights = 'imagenet')

  pre_trained_NASNetMobile_model.trainable = True

  regularizer = tf.keras.regularizers.l1_l2(l1=0.01, l2=0.01)

  for layer in pre_trained_NASNetMobile_model.layers:
      for attr in ['kernel_regularizer']:
          if hasattr(layer, attr):
            setattr(layer, attr, regularizer)

  X = layers.Flatten()(pre_trained_NASNetMobile_model.output)
  X = layers.Dense(512, activation='relu')(X)
  X = layers.Dropout(0.5)(X)
  X = layers.Dense(1, activation = 'sigmoid')(X)

  model = Model(pre_trained_NASNetMobile_model.input, X)  

  model.compile(loss = 'binary_crossentropy',
                optimizer = optimizers.Adam(learning_rate=0.0001),
                metrics = ['acc', get_f1, keras.metrics.BinaryAccuracy()])
  return model

**Function to graph model metrics (acc, loss, f1)**

In [None]:
def graph_metrics(history):
    acc = history.history['acc']
    val_acc = history.history['val_acc']
    loss = history.history['loss']
    val_loss = history.history['val_loss']
    f1 = history.history['get_f1']
    val_f1 = history.history['val_get_f1']

    epochs = range(1, len(acc) + 1)

    plt.plot(epochs, acc, 'bo', label='Training acc')
    plt.plot(epochs, val_acc, 'b', label='Validation acc')
    plt.title('Training and validation accuracy')
    plt.legend()

    plt.figure()

    plt.plot(epochs, loss, 'bo', label='Training loss')
    plt.plot(epochs, val_loss, 'b', label='Validation loss')
    plt.title('Training and validation loss')
    plt.legend()

    plt.figure()

    plt.plot(epochs, f1, 'bo', label='Training f1')
    plt.plot(epochs, val_f1, 'b', label='Validation f1')
    plt.title('Training and validation f1')
    plt.legend

    plt.show()

**Train and Save Model**

In [None]:
def save(model):
    # serialize model to JSON
    model_json = model.to_json()
    with open("model.json", "w") as json_file:
        json_file.write(model_json)
    # serialize weights to HDF5
    model.save_weights("model_no_overfit.h5")
    print("Saved model to disk")

def train(model, train_generator, validation_generator, num_train_samples, num_val_samples):
    history = model.fit(
                    train_generator,
                    steps_per_epoch=num_train_samples // train_batch_size,
                    epochs=num_epochs,
                    validation_data=validation_generator,
                    validation_steps=num_val_samples // val_batch_size
            )
    return history

In [None]:
# model = build()
# model = build_transfer2()
model = build_transfer()

In [None]:
num_train_samples, num_val_samples = initData()
train_generator, validation_generator, test_generator = preprocess_data()
history = train(model, train_generator, validation_generator, num_train_samples, num_val_samples)
graph_metrics(history)
save(model)

Found 203 images belonging to 2 classes.
Found 425 images belonging to 2 classes.
Found 118 images belonging to 2 classes.
Epoch 1/20
 3/42 [=>............................] - ETA: 14:44 - loss: 2.1717 - acc: 0.4667 - get_f1: 0.4665

KeyboardInterrupt: ignored

**Evaluate Model**

In [None]:
test_loss, test_acc, test_f1, x = model.evaluate(test_generator, verbose=1)
print(test_loss)
print(test_acc)
print(test_f1)

In [None]:
test_augmentation()

NameError: ignored

**Currently unsuccesful model builds using ResNet**

In [None]:
#unsuccesful builds with resnet transfer learning
#
#
# def build_transfer2():
#   baseModel = ResNet50(weights="imagenet", include_top=False,
# 	input_tensor=Input(shape=(224, 224, 3)))
#   # construct the head of the model that will be placed on top of the
#   # the base model
#   headModel = baseModel.output
#   headModel = layers.AveragePooling2D(pool_size=(7, 7))(headModel)
#   headModel = layers.Flatten(name="flatten")(headModel)
#   headModel = layers.Dense(256, activation="relu")(headModel)
#   headModel = layers.Dropout(0.5)(headModel)
#   headModel = layers.Dense(1, activation="softmax")(headModel)
#   # place the head FC model on top of the base model (this will become
#   # the actual model we will train)
#   model = Model(inputs=baseModel.input, outputs=headModel)
#   # loop over all layers in the base model and freeze them so they will
#   # *not* be updated during the training process
#   for layer in baseModel.layers:
# 	  layer.trainable = False
#   #opt = optimizers.Adam(lr=1e-4, decay=1e-4 / 50)
#   #opt = optimizer=optimizers.RMSprop(lr=1e-4)
#   model.compile(loss="binary_crossentropy", optimizer='adam',
# 	metrics=["accuracy", get_f1])
#   return model
# def build_transfer():
#   input_tuple = (224, 224, 3)
#   resnet = ResNet50(include_top=False, weights='imagenet', input_shape= input_tuple)
#   #resnet.summary()
#   # for layer in resnet.layers:
#   # layer.trainable = False
#   model = models.Sequential()
#   model.add(resnet)
#   model.add(layers.Conv2D(32, (3, 3), activation='relu', padding='same'))
#                              # input_shape=(image_size[0], image_size[1], 3)))
#   model.add(layers.MaxPooling2D(2, 2, padding='same'))
#   model.add(layers.Dropout(0.5))
#     # model.summary()
#   model.add(layers.Conv2D(16, (3, 3), activation='relu', padding='same'))
#   model.add(layers.MaxPooling2D(2, 2, padding='same'))
#     # model.add(layers.Conv2D(128, (3, 3), activation='relu', padding='same'))
#     # model.add(layers.MaxPooling2D(2, 2, padding='same'))
#     # model.add(layers.Conv2D(128, (3, 3), activation='relu', padding='same'))
#     # model.add(layers.MaxPooling2D(2, 2, padding='same'))
#   model.add(layers.Flatten())
#   model.add(layers.Dropout(0.5))
#   model.add(layers.Dense(512, activation='relu'))
#   model.add(layers.Dense(1, activation='sigmoid'))

#   opt = optimizers.Adam(lr=1e-4, decay=1e-4 / 50)
#   # opt = optimizer=optimizers.RMSprop(lr=1e-4)
#   model.compile(loss='binary_crossentropy',
#                   optimizer=opt,
#                   metrics=[get_f1, 'acc'])
#   return model