In [None]:
import sys, time, datetime, shutil, os
import pandas as pd
import tensorflow as tf
import numpy as np
from tensorflow.keras.preprocessing.image import ImageDataGenerator

In [None]:
BATCH_SIZE = 32
NB_EPOCHS = 100
IMG_WIDTH = 96
IMG_HEIGHT = 96
TRAIN_DATA_PATH = 'chest_xray/train'
TEST_DATA_PATH = 'chest_xray/test'
VAL_DATA_PATH = 'chest_xray/val'
CLASS_NAMES = ['NORMAL', 'BACTERIA', 'VIRUS']
AUTOTUNE = tf.data.experimental.AUTOTUNE
VERBOSE = 1
MODEL_NAME = 'resnet_18'

METRICS = [
  tf.keras.metrics.BinaryAccuracy(name='accuracy', dtype=tf.float32),
  tf.keras.metrics.TruePositives(name='true_positives', dtype=tf.float32),
  tf.keras.metrics.FalsePositives(name='false_positives', dtype=tf.float32),
  tf.keras.metrics.TrueNegatives(name='true_negatives', dtype=tf.float32),
  tf.keras.metrics.FalseNegatives(name='false_negatives', dtype=tf.float32), 
  tf.keras.metrics.Precision(name='precision', dtype=tf.float32),
  tf.keras.metrics.Recall(name='recall', dtype=tf.float32),
  tf.keras.metrics.AUC(name='auc', dtype=tf.float32),
]

In [None]:
def save_model(model=None, model_name='vgg16'):
  """
  Save a TF Model into h5 format
  """
  model.save('saved_model/{}/model.h5'.format(model_name))
  print("Model saved successfully.")

In [None]:
def get_callbacks():
  """
  Define the callbacks for the ML model
  """
  return [
    # tf.keras.callbacks.EarlyStopping(monitor='loss', patience=10),
    tf.keras.callbacks.TensorBoard(os.path.join("logs/{}".format('resnet_18'), datetime.datetime.now().strftime("%Y%m%d-%H%M%S")), histogram_freq=1)
  ]

In [None]:
def get_label(file_path):
  """
  Get the label of a file - Can be NORMAL or PNEUMONIA
  """
  # convert the path to a list of path components
  parts = tf.strings.split(file_path, os.path.sep)
  # The second to last is the class-directory
  return parts[-2] == CLASS_NAMES

In [None]:
def decode_img(img):
  """
  Convert an image into a tensor with needed size
  """
  # convert the compressed string to a 3D uint8 tensor
  img = tf.image.decode_jpeg(img, channels=3)
  # Use `convert_image_dtype` to convert to floats in the [0,1] range.
  img = tf.image.convert_image_dtype(img, tf.float32)
  # resize the image to the desired size.
  return tf.image.resize(img, [IMG_WIDTH, IMG_HEIGHT])

In [None]:
def process_path(file_path):
  """
  Process a file
  """
  label = get_label(file_path)
  # load the raw data from the file as a string
  img = tf.io.read_file(file_path)
  img = decode_img(img)
  return img, label

In [None]:
class BasicBlock(layers.Layer):
  """
  BasicBlock class
  """
  def __init__(self, filter_num, stride=1):
    super(BasicBlock, self).__init__()
    self.conv_b_1 = layers.Conv2D(filters=filter_num, kernel_size=(3, 3), strides=stride, padding='same')
    self.bn_b_1 = layers.BatchNormalization()
    self.conv_b_2 = layers.Conv2D(filters=filter_num, kernel_size=(3, 3), strides=1, padding='same')
    self.bn_b_2 = layers.BatchNormalization()
    if stride != 1:
      self.downsample = tf.keras.Sequential()
      self.downsample.add(layers.Conv2D(filters=filter_num, kernel_size=(1, 1), strides=stride))
      self.downsample.add(layers.BatchNormalization())
    else:
      self.downsample = lambda x: x

  def call(self, inputs, **kwargs):
    residual = self.downsample(inputs)
    x = self.conv_b_1(inputs)
    x = self.bn_b_1(x)
    x = tf.nn.relu(x)
    x = self.conv_b_2(x)
    x = self.bn_b_2(x)
    # print(residual)
    output = tf.nn.relu(x)
    return output


class BottleNeck(layers.Layer):
  """
  Bottleneck class
  """
  def __init__(self, filter_num, stride=1):
    super(BottleNeck, self).__init__()
    self.conv_b_1 = layers.Conv2D(filters=filter_num, kernel_size=(1, 1), strides=1, padding='same')
    self.bn_b_1 = layers.BatchNormalization()
    self.conv_b_2 = layers.Conv2D(filters=filter_num, kernel_size=(3, 3), strides=stride, padding='same')
    self.bn_b_2 = layers.BatchNormalization()
    self.conv_b_3 = layers.Conv2D(filters=filter_num * 4, kernel_size=(1, 1), strides=1, padding='same')
    self.bn_b_3 = layers.BatchNormalization()

    self.downsample = tf.keras.Sequential()
    self.downsample.add(layers.Conv2D(filters=filter_num * 4, kernel_size=(1, 1), strides=stride))
    self.downsample.add(layers.BatchNormalization())

  def call(self, inputs, **kwargs):
    residual = self.downsample(inputs)
    x = self.conv_b_1(inputs)
    x = self.bn_b_1(x)
    x = tf.nn.relu(x)
    x = self.conv_b_2(x)
    x = self.bn_b_2(x)
    x = tf.nn.relu(x)
    x = self.conv_b_3(x)
    x = self.bn_b_3(x)
    output = tf.nn.relu(layers.add([residual, x]))
    return output


def make_basic_block_layer(filter_num, blocks, stride=1):
  """
  Generate a block layer
  """
  res_block = tf.keras.Sequential()
  res_block.add(BasicBlock(filter_num, stride=stride))

  for _ in range(1, blocks):
    res_block.add(BasicBlock(filter_num, stride=1))

  return res_block


def make_bottleneck_layer(filter_num, blocks, stride=1):
  """
  Generate a bottleneck layer
  """
  res_block = tf.keras.Sequential()
  res_block.add(BottleNeck(filter_num, stride=stride))

  for _ in range(1, blocks):
    res_block.add(BottleNeck(filter_num, stride=1))

  return res_block

In [None]:
NUM_CLASSES = 3

class ResNetTypeI(tf.keras.Model):
  """
  A ResNetTypeI Model - Use of basic bloc layers (conv2D + batch normalization + conv2D + batch normalization)
  """
  def __init__(self, nodes, name=None, final_activation='softmax'):
    super(ResNetTypeI, self).__init__(name=name)
    # first conv
    self.conv_1 = layers.Conv2D(16, 3, padding='same', input_shape=(IMG_HEIGHT, IMG_WIDTH , 3), strides=2)
    # batch normalization
    self.bn_1 = layers.BatchNormalization()
    # pooling
    self.pooling_1 = layers.MaxPooling2D(pool_size=(3, 3), strides=2, padding='same')
    # basic block layers
    self.layer_1 = make_basic_block_layer(filter_num=64, blocks=nodes[0])
    self.layer_2 = make_basic_block_layer(filter_num=128, blocks=nodes[1], stride=2)
    self.layer_3 = make_basic_block_layer(filter_num=256, blocks=nodes[2], stride=2)
    self.layer_4 = make_basic_block_layer(filter_num=512, blocks=nodes[3], stride=2)
    # global average pool
    self.avg_pool = layers.GlobalAveragePooling2D()
    # prediction
    self.pred_layer = layers.Dense(units=NUM_CLASSES, activation=final_activation, name='predictions')

  def call(self, inputs):
    x = self.conv_1(inputs)
    x = self.bn_1(x)
    x = tf.nn.relu(x)
    x = self.pooling_1(x)
    x = self.layer_1(x)
    x = self.layer_2(x)
    x = self.layer_3(x)
    x = self.layer_4(x)
    x = self.avg_pool(x)
    return self.pred_layer(x)

class ResNetTypeII(tf.keras.Model):
  """
  A ResNetTypeII Model - Use of bottleneck layers (conv2D + batch normalization + conv2D + batch normalization)
  """
  def __init__(self, nodes, name=None, final_activation='softmax'):
    super(ResNetTypeII, self).__init__(name=name)
    # first conv
    self.conv_1 = layers.Conv2D(16, 3, padding='same', input_shape=(IMG_HEIGHT, IMG_WIDTH , 3), strides=2)
    # batch normalization
    self.bn_1 = layers.BatchNormalization()
    # pooling
    self.pooling_1 = layers.MaxPooling2D(pool_size=(3, 3), strides=2, padding='same')
    # basic block layers
    self.layer_1 = make_bottleneck_layer(filter_num=64, blocks=nodes[0])
    self.layer_2 = make_bottleneck_layer(filter_num=128, blocks=nodes[1], stride=2)
    self.layer_3 = make_bottleneck_layer(filter_num=256, blocks=nodes[2], stride=2)
    self.layer_4 = make_bottleneck_layer(filter_num=512, blocks=nodes[3], stride=2)
    # global average pool
    self.avg_pool = layers.GlobalAveragePooling2D()
    # prediction
    self.pred_layer = layers.Dense(units=NUM_CLASSES, activation=final_activation, name='predictions')

  def call(self, inputs):
    x = self.conv_1(inputs)
    x = self.bn_1(x)
    x = tf.nn.relu(x)
    x = self.pooling_1(x)
    x = self.layer_1(x)
    x = self.layer_2(x)
    x = self.layer_3(x)
    x = self.layer_4(x)
    x = self.avg_pool(x)
    return self.pred_layer(x)

def get_model_resnet(model='resnet_18', optimizer='adam', loss='binary_crossentropy', final_activation='softmax', metrics='accuracy'):
  """
  Return a basic model
  """
  if model == 'resnet_18':
    model = resnet_18(final_activation=final_activation)
    model.compile(optimizer=optimizer,
                loss=loss,
                metrics=metrics)
    return model
  elif model == 'resnet_34':
    model = resnet_34(final_activation=final_activation)
    model.compile(optimizer=optimizer,
                loss=loss,
                metrics=metrics)
    return model

def resnet_18(final_activation):
    return ResNetTypeI(nodes=[2, 2, 2, 2], name='resnet1_18', final_activation=final_activation)


def resnet_34(final_activation):
    return ResNetTypeI(nodes=[3, 4, 6, 3], name='resnet1_34', final_activation=final_activation)


In [None]:
"""
Main function
"""
modelLoaded = False
model = None
# if the model already exist, load it.
if (os.path.isfile('saved_model/{}/model.h5'.format(MODEL_NAME))):
    model = tf.keras.models.load_model('saved_model/{}/model.h5'.format(MODEL_NAME))
    modelLoaded = True
    print('Model successfully loaded.')
else:
    # Get the model
    # model = get_model_vgg(
    #   model=MODEL_NAME,
    #   nodes=16,
    #   optimizer='adam',
    #   loss=tf.keras.losses.BinaryCrossentropy(),
    #   hidden_activation='relu',
    #   final_activation='sigmoid',
    #   metrics=None
    # )
    model = get_model_resnet(
        model=MODEL_NAME,
        optimizer='adam',
        loss=tf.keras.losses.BinaryCrossentropy(),
        final_activation='softmax',
        metrics=None
    )

# To get the nb of steps and how many images we got
nb_normal_tr = len(os.listdir('{}/NORMAL'.format(TRAIN_DATA_PATH)))
nb_bacteria_tr = len(os.listdir('{}/BACTERIA'.format(TRAIN_DATA_PATH)))
nb_virus_tr = len(os.listdir('{}/VIRUS'.format(TRAIN_DATA_PATH)))
nb_normal_val = len(os.listdir('{}/NORMAL'.format(VAL_DATA_PATH)))
nb_bacteria_val = len(os.listdir('{}/BACTERIA'.format(VAL_DATA_PATH)))
nb_virus_val = len(os.listdir('{}/VIRUS'.format(VAL_DATA_PATH)))
total_train = nb_normal_tr + nb_bacteria_tr + nb_virus_tr
total_val = nb_normal_val + nb_bacteria_val + nb_virus_val

# Our datas generators
train_image_generator = ImageDataGenerator(
    rescale=1./255,
    rotation_range=45,
    width_shift_range=.15,
    height_shift_range=.15,
    horizontal_flip=True,
    zoom_range=0.5
)
# Generator for our training data
validation_image_generator = ImageDataGenerator(rescale=1./255) # Generator for our validation data
test_image_generator = ImageDataGenerator(rescale=1./255) # Generator for our validation data

train_data_gen = train_image_generator.flow_from_directory(
    batch_size=BATCH_SIZE,
    directory=TRAIN_DATA_PATH,
    shuffle=True,
    target_size=(IMG_HEIGHT, IMG_WIDTH),
    class_mode='binary'
)
val_data_gen = validation_image_generator.flow_from_directory(
    batch_size=BATCH_SIZE,
    directory=VAL_DATA_PATH,
    target_size=(IMG_HEIGHT, IMG_WIDTH),
    class_mode='binary'
)
test_data_gen = test_image_generator.flow_from_directory(
    batch_size=BATCH_SIZE,
    directory=TEST_DATA_PATH,
    target_size=(IMG_HEIGHT, IMG_WIDTH),
    class_mode='binary'
)

# Train the model
model.fit(
    train_data_gen,
    callbacks=get_callbacks(),
    steps_per_epoch=total_train // BATCH_SIZE,
    epochs=NB_EPOCHS,
    validation_data=val_data_gen,
    validation_steps=total_val // BATCH_SIZE
)
save_model(model, 'resnet_18')

model.summary()

# Use a testing model to display metrics
testing_model = tf.keras.Sequential([model, tf.keras.layers.Softmax()])
testing_model.compile(
    loss=tf.keras.losses.BinaryCrossentropy(from_logits=False),
    optimizer='adam',
    metrics=METRICS
)

# Display metrics for testing purpose
print('Normal, Virus or Bacteria resnet 18 trained model : ')
results = testing_model.evaluate(test_data_gen)
for name, value in zip(testing_model.metrics_names, results):
    print(f'{name} : {value}')

# predictions
predictions = testing_model.predict(test_data_gen)
for predict in predictions:
    print('Predictions : ', predict)