Import our essential Libraries


In [None]:
import os
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator, img_to_array, load_img
from tensorflow.keras.applications.xception import Xception
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Conv2D, MaxPooling2D, Dropout, Flatten, MaxPool2D, BatchNormalization, GlobalAveragePooling2D, Input, Activation
from tensorflow.keras.applications.resnet_v2 import ResNet50V2
from tensorflow.keras.applications.vgg16 import VGG16

Mount your google drive so that the dataset can be cloned to it from the git.

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


Some shell script to check if the data already exists, if not clone it from git.



In [None]:
!if [ ! -d '/content/gdrive/MyDrive/pneumoniaDataset' ]; then mkdir '/content/gdrive/MyDrive/pneumoniaDataset'; fi


!if [ ! -d '/content/gdrive/MyDrive/pneumoniaDataset/.git' ]; then git clone "https://github.com/Amzo/xray_images" '/content/gdrive/MyDrive/pneumoniaDataset/'; fi

Load our data and generate additional augmented data due to the nature of the small data set. Since the data is already structured into train, test and validate folders, we don't need to split the data here.


In [None]:
def getData(trainDir, testDir, valDir):
        class_names = os.listdir(trainDir)
        class_types = len(os.listdir(trainDir))

        print('Number of classes for Classification: ',class_types)
        print(f'The class names are {class_names[0]} and {class_names[1]}')
        print('--> Count of Train Images <--')

        for i in class_names:
                print(i + ':' + str(len(os.listdir(trainDir + "/" +i))))
        print('--> Count of Test Images <--')

        for i in class_names:
                print(i + ':' + str(len(os.listdir(testDir + '/' +i))))

        print('--> Count of Validation Images <---')
        for i in class_names:
                print(i + ':' + str(len(os.listdir(valDir + '/' +i))))

        train_datagen = ImageDataGenerator(
                rescale=1/255.0,
                rotation_range=7,
                width_shift_range=0.5,
                height_shift_range=0.45,
                shear_range=0.2,
                zoom_range=0.45,
                horizontal_flip=True
        )

        test_datagen = ImageDataGenerator(rescale=1./255)

        xTrainGen = train_datagen.flow_from_directory(
                trainDir,
                target_size=(224,224),
                shuffle=True,
                batch_size=24,
                class_mode='binary'
        )

        xTestGen = test_datagen.flow_from_directory(
                testDir,
                target_size=(224,224),
                batch_size=16,
                shuffle=True,
                class_mode='binary'
        )

        xValGen = train_datagen.flow_from_directory(
                valDir,
                target_size=(224,224),
                batch_size=32,
                class_mode='binary'
        )

        return xTrainGen, xTestGen, xValGen

In [None]:
def myModel():
  print("Defaulting to basic CNN")
  model = Sequential()
  model.add(Conv2D(64, (3, 3), activation='relu', kernel_initializer='he_uniform', padding='same', input_shape=(224,224,3)))
  model.add(MaxPooling2D((2, 2)))
  model.add(Conv2D(64, (3, 3), activation='relu', kernel_initializer='he_uniform', padding='same'))
  model.add(BatchNormalization())
  model.add(MaxPooling2D((2, 2)))
  model.add(Dropout(0.5))
  model.add(Conv2D(128, (3, 3), activation='relu', kernel_initializer='he_uniform', padding='same'))
  model.add(BatchNormalization())
  model.add(Conv2D(128, (3, 3), activation='relu', kernel_initializer='he_uniform', padding='same'))
  model.add(BatchNormalization())
  model.add(MaxPooling2D((2, 2)))
  model.add(Dropout(0.6))
  model.add(Conv2D(256, (3, 3), activation='relu', kernel_initializer='he_uniform', padding='same'))
  model.add(BatchNormalization())
  model.add(Conv2D(256, (3, 3), activation='relu', kernel_initializer='he_uniform', padding='same'))
  model.add(BatchNormalization())
  model.add(MaxPooling2D((2, 2)))
  model.add(Dropout(0.7))
  model.add(Flatten())
  model.add(Dense(128, activation='relu', kernel_initializer='he_uniform'))
  model.add(BatchNormalization())
  model.add(Dropout(0.2))
  model.add(Dense(1,activation='sigmoid'))

  model.compile(optimizer=tf.keras.optimizers.RMSprop(lr=0.0001),
    loss="binary_crossentropy",
    metrics=["accuracy"])
  
  return model

Create an Exception model. This model needs a lot of resources and will not run on my system.

Using RMSprop optimiser and setting a learning rate to 0.0001. As our classification is either true or false, E.G they have something or they don't, use binary_crossentropy.


In [None]:
def modelBuild(inputShape, modelType):
  model = Sequential()
  if (modelType == "xception"):
    print("Setting up xception model")
    xception = Xception(include_top=False,
      weights= 'imagenet',
      input_shape=inputShape,
    )
    model.add(xception)

  elif (modelType == "resnet"):
    print("Setting up resnet50 model")
    resnet = ResNet50V2(include_top=False,
      weights= 'imagenet',
      input_shape=inputShape,
    )
    model.add(resnet)

  elif (modelType == "vgg16"):
    print("Setting up vgg16 model")
    vgg = VGG16(include_top=False,
      weights= 'imagenet',
      input_shape=inputShape,
    )
    model.add(vgg)

  model.add(Flatten())
  model.add(Dense(512, activation="relu"))
  model.add(Dense(1,activation="sigmoid"))

  model.compile(optimizer=tf.keras.optimizers.RMSprop(lr=0.0001),
    loss="binary_crossentropy",
    metrics=["accuracy"])

  return model


In [None]:
def modelTrain(model, xTrain, xVal, batchSize):
  model.fit(xTrain,
    epochs=10,
    validation_data=xVal,
    verbose=1,
    batch_size=batchSize
  )

  return model


In [None]:
physical_devices = tf.config.experimental.list_physical_devices('GPU')
assert len(physical_devices) > 0, "Not enough GPU hardware devices available"


inputTrain = '/content/gdrive/MyDrive/pneumoniaDataset/train'
inputTest = '/content/gdrive/MyDrive/pneumoniaDataset/test'
inputValidate = '/content/gdrive/MyDrive/pneumoniaDataset/val'
imageSize = (224,224,3)

xTrain, xTest, xVal = getData(inputTrain, inputTest, inputValidate)


Number of classes for Classification:  2
The class names are NORMAL and PNEUMONIA
--> Count of Train Images <--
NORMAL:1305
PNEUMONIA:3851
--> Count of Test Images <--
NORMAL:234
PNEUMONIA:390
--> Count of Validation Images <---
NORMAL:44
PNEUMONIA:32
Found 5156 images belonging to 2 classes.
Found 624 images belonging to 2 classes.
Found 76 images belonging to 2 classes.


In [None]:
xception = modelBuild(imageSize, 'xception')
vgg16 = modelBuild(imageSize, 'vgg16')
resnet = modelBuild(imageSize, 'resnet')
myCNN = myModel()



Setting up xception model
Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/xception/xception_weights_tf_dim_ordering_tf_kernels_notop.h5
Setting up vgg16 model
Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/vgg16/vgg16_weights_tf_dim_ordering_tf_kernels_notop.h5
Setting up resnet50 model
Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet50v2_weights_tf_dim_ordering_tf_kernels_notop.h5
Defaulting to basic CNN


In [None]:
trainedVGG = modelTrain(vgg16, xTrain, xVal, 32)


Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10

In [None]:
trainedResnet = modelTrain(resnet, xTrain, xVal, 32)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [None]:
trainedCNN = modelTrain(myCNN, xTrain, xVal, 32)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [None]:
trainedXception = modelTrain(xception, xTrain, xVal, 32)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [None]:
print("Running Xception model against test set")
ev = trainedXception.evaluate(xTest)
print("\n%s: %.f%%" % (trainedXception.metrics_names[1], ev[1]*100))
print("Running VGG16 model against test set")
ev = trainedVGG.evaluate(xTest)
print("\n%s: %.f%%" % (trainedVGG.metrics_names[1], ev[1]*100))
print("Running resnet model against test set")
ev = trainedResnet.evaluate(xTest)
print("\n%s: %.f%%" % (trainedResnet.metrics_names[1], ev[1]*100))
print("Running myCNN model against test set")
ev = trainedCNN.evaluate(xTest)
print("\n%s: %.f%%" % (trainedCNN.metrics_names[1], ev[1]*100))

Running Xception model against test set


NameError: ignored

In [None]:
trainedXception.save('/content/drive/MyDrive/inception')
trainedVGG.save('/content/drive/MyDrive/VGG')
trainedResnet.save('/content/drive/MyDrive/Resnet')
trainedCNN.save('/content/drive/MyDrive/myCNN')

NameError: ignored