In [None]:
import os
from dotenv import dotenv_values
import itertools
import cv2 #image operations

from imutils import paths

import numpy as np

# Display
#from IPython.display import Image, display
from PIL import Image
import matplotlib.cm as cm
import matplotlib.pyplot as plt
import matplotlib.image as mpimg



import scikitplot as skplt
from sklearn.metrics import classification_report
from sklearn.preprocessing import LabelBinarizer
from sklearn.model_selection import train_test_split

import tensorflow as tf
from tensorflow import keras
from keras.preprocessing.image import ImageDataGenerator
from keras.utils import img_to_array, load_img, to_categorical
from tensorflow.keras.optimizers import Adam

In [None]:
#get files from .env file
config = dotenv_values(".env")

In [None]:
# define repetitive values
raw_dir = config['RAW_PATH']

img_dim = int(config['DIM'])
batch_size = int(config['BATCH_SIZE'])
epoch_nbr = int(config['EPOCH'])

input_shape = (img_dim, img_dim, 3)

In [None]:
#filepath where all of our images are stored
original_data_path = raw_dir+'/dataset/Original' 
segmented_data_path =raw_dir+'/dataset/Segmented'


In [None]:
CATEGORIES = ["Benign", "Early", "Pre", "Pro"]

In [None]:
# helper functions

In [None]:
# Plot the validation and training data separately
def plot_loss_curves(history , metric , val_metric):
  """
  Returns separate loss curves for training and validation metrics.
  """ 
  loss = history.history['loss']
  val_loss = history.history['val_loss']

  accuracy = history.history[metric]
  val_accuracy = history.history[val_metric]

  epochs = range(len(history.history['loss']))

  # Plot loss
  plt.plot(epochs, loss, label='training_loss')
  plt.plot(epochs, val_loss, label='val_loss')
  plt.title('Loss')
  plt.xlabel('Epochs')
  plt.legend()

  # Plot accuracy
  plt.figure()
  plt.plot(epochs, accuracy, label='training_accuracy')
  plt.plot(epochs, val_accuracy, label='val_accuracy')
  plt.title('Accuracy')
  plt.xlabel('Epochs')
  plt.legend();

In [None]:
# image data generator for train, val and test datasets
# data augmentation is applied on the traing dataset

def image_data_generator(preprocessing_fn = None):
    train_datagen = ImageDataGenerator(
        rescale = 1./255,
        rotation_range = 20,
        horizontal_flip = True,
        preprocessing_function = preprocessing_fn,
        shear_range=0.2, # shear the image
        zoom_range=0.2, # zoom into the image
        width_shift_range=0.2, # shift the image width ways
        height_shift_range=0.2, # shift the image height ways
    )
    
    validation_datagen = ImageDataGenerator(
        rescale = 1./255, 
        preprocessing_function = preprocessing_fn
    )
    
    test_datagen = ImageDataGenerator(
        rescale = 1./255, 
        preprocessing_function = preprocessing_fn
    )
    
    return [train_datagen, validation_datagen, test_datagen]

In [None]:
def get_split_dataset (data_path):
    data = []
    labels = []
    for category_pos in [0,1,2,3]:
        path = os.path.join(data_path, CATEGORIES[category_pos])
        for img in os.listdir(path):
            img_path = os.path.join(path, img)
            image = load_img(img_path, target_size=(224, 224))
            image = img_to_array(image)
            data.append(image)
            labels.append(category_pos)

    data = np.array(data, dtype="float32")
    labels = np.array(labels)

    (training_x, test_x, training_y, test_y) = train_test_split(data, labels,
                                                      test_size=0.20,
                                                      random_state=42)
    (train_x, val_x, train_y, val_y) = train_test_split(training_x, training_y,
                                                      test_size=0.20,
                                                      random_state=42)
    
    return [train_x, train_y, val_x, val_y, test_x, test_y]

In [None]:
[org_train_x, org_train_y, org_val_x, org_val_y, org_test_x, org_test_y] = get_split_dataset(original_data_path)
[seg_train_x, seg_train_y, seg_val_x, seg_val_y, seg_test_x, seg_test_y] = get_split_dataset(segmented_data_path)

In [None]:
# build the model

In [None]:
#create data generators
train_datagen, val_datagen, test_datagen = image_data_generator()

In [None]:
# feature block

In [None]:
densenet_model = keras.applications.DenseNet201(
    include_top=False,
    weights="imagenet",
    input_tensor=None,
    input_shape=input_shape,
    pooling=None
)

In [None]:
# By setting the trainable property of this model we created to False,
# we prevented the weights in non-trainable layers from being updated. 
# Otherwise, what the model learned would be destroyed.
densenet_model.trainable = False

In [None]:
# # with training case
# densenet_model.trainable = True
# count = 0
# for layer in densenet_model.layers:
#     if 'conv5' in layer.name:
#         count = count + 1
#         layer.trainable = True
#     else:
#         layer.trainable = False

# count

In [None]:
initializer = keras.initializers.he_normal(seed=32)

seg_input = keras.Input(shape=input_shape)
seg_model = densenet_model(inputs= seg_input)

org_input = keras.Input(shape=input_shape)
org_model = densenet_model(inputs= org_input)

output = keras.layers.concatenate([org_model, seg_model])
output = keras.layers.Flatten()(output)
#batch normailzation
output = keras.layers.BatchNormalization()(output)
    
#Fully-connected block 1
output = keras.layers.Dense(units=32,
                             activation='relu',
                             kernel_initializer=initializer,
                             kernel_regularizer=keras.regularizers.l2(0.001)
                            )(output)
output = keras.layers.BatchNormalization()(output)
output = keras.layers.LeakyReLU()(output)
output = keras.layers.Dropout(0.2)(output)

#Fully-connected block 2
output = keras.layers.Dense(units=32,
                             activation='relu',
                             kernel_initializer=initializer,
                             kernel_regularizer=keras.regularizers.l2(0.001)
                            )(output)

output = keras.layers.BatchNormalization()(output)
output = keras.layers.ReLU()(output)

#Classifier block
output = keras.layers.Dense(units=4,
                             kernel_initializer=initializer,
                             activation='softmax'
                            )(output)

In [None]:
two_inputs_model_densenet201 = keras.models.Model(inputs=[org_input, seg_input], outputs=[output])
two_inputs_model_densenet201.summary()

In [None]:
two_inputs_model_densenet201.compile(loss='binary_crossentropy',
            optimizer=keras.optimizers.Adam(),
            metrics=['accuracy'])

In [None]:
# Fit the model
model_history = two_inputs_model_densenet201.fit(train_datagen.flow([org_train_x, seg_train_x], 
                                                                   org_train_y,
                                                                   batch_size=32,
                                                                   shuffle=True),
                                                steps_per_epoch=len(org_train_x)//32,
                                                 validation_data=val_datagen.flow([org_val_x, seg_val_x], 
                                                                                   org_val_y,
                                                                                   batch_size=32),
                                                validation_steps=len(org_val_x)//32,
                                                epochs=100
                                               )

In [None]:
plot_loss_curves(model_history , 'accuracy' , 'val_accuracy') 

In [None]:
pred = two_inputs_model_densenet201.predict(test_datagen.flow([org_test_x, seg_test_x], 
                                                                   org_test_y,
                                                                   batch_size=32))

In [None]:
correct_labels =  org_test_y
predicted_labels = np.argmax(pred,axis = 1)

In [None]:
skplt.metrics.plot_confusion_matrix(
    correct_labels, 
    predicted_labels,
    title = "two inputs densenet201 model confusion matrix")

In [None]:
print(classification_report(correct_labels, predicted_labels))

In [None]:
## Save a model
two_inputs_model_densenet201.save("saved_model/two_inputs_model.h5")

# Load in a model and evaluate it
loaded_model = tf.keras.models.load_model("saved_model/two_inputs_model.h5")