## Check GPU Use:

In [None]:
import tensorflow as tf
print(bool(len(tf.config.list_physical_devices('GPU'))))

In [None]:
print(tf.__version__)

## Import Libaries:

In [None]:
import warnings
warnings.simplefilter('ignore')

In [None]:
# Basic libaries:
import os
import copy
import random
import gc

import pandas as pd
import numpy as np

# Plotting and Image Display / Manipulation libaries:
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
from matplotlib.pyplot import imshow

from skimage.segmentation import mark_boundaries
from skimage import morphology, io, color, exposure, img_as_float, transform

# Image processing liabries:
from PIL import Image, ImageOps, ImageFilter

# General machine learning libaries:
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.metrics import roc_auc_score, f1_score

# Neural Network libaries:
import keras
from keras import backend 
from tensorflow.keras.models import load_model

from keras.preprocessing.image import load_img 
from keras.preprocessing.image import img_to_array
from keras.preprocessing.image import array_to_img
from keras.preprocessing.image import ImageDataGenerator

from keras.models import Model
from keras.layers import Input, Flatten, Dense

from keras.optimizers import SGD

from keras.applications.vgg16 import VGG16 as VGG
from keras.applications.vgg16 import preprocess_input as VGG_preprocess_input

from keras.applications import DenseNet121 as DenseNet
from keras.applications.densenet import preprocess_input as DenseNet_preprocess_input

from keras.applications import ResNet50V2 as ResNet
from keras.applications.resnet import preprocess_input as ResNet_preprocess_input

# Explanation libaries:
import lime
from lime import lime_image


## Prepare Images:

In [None]:
# Define data directories:
data_dir_path = "../../group/hedge/Data"
data_dir_path = "../../group/hedge/Data/data"

# Define path to meta data files:
train_meta_data = "../../group/hedge/Data/data/train_split_v3.txt"
test_meta_data = "../../group/hedge/Data/data/test_split_v3.txt"

# Define directory to save models & metrics into:
model_dir_path = "../results/models"
metric_dir_path = "../results/metrics"

# Define path to lung segmentation model file:
lung_seg_model_path = "../results/models/trained_model.hdf5"


In [None]:
# Load all images from a data directory:
def load_all_images_from_dir(data_dir, target_size):
    
    image_list = []
    for image in os.listdir(data_dir):
        
        file_typ = image.split(".")[-1]
        
        if (file_typ == "jpeg") or (file_typ == "jpg") or (file_typ == "png"):    
            image = load_img(os.path.join(data_dir, image), target_size=target_size)
            image_list.append(image)
        
    return image_list


In [None]:
# Load images of a certain type from a directory (according to meta-data .csv-file):
def load_type_images_from_dir(data_dir, meta_data_file, label, target_size):
    
    meta_data_df = pd.read_csv(meta_data_file, sep=" ")
    type_meta_data_df = meta_data_df.loc[meta_data_df['label'] == label]
    
    type_images_file_name = type_meta_data_df["file_name"].values
    
    image_list = []
    for image in type_images_file_name: # [:5000]: # TODO: Delete 
        
        image = load_img(os.path.join(data_dir, image), target_size=target_size)
        image_list.append(image)
    
    return image_list


In [None]:
# Combine and transform diffrent image-classes to np-arrays and get class-labels: 
def get_dp_label_pair(class_image_lists, nr_samples = None, delete_class_image_lists = True):

    nr_classes = len(class_image_lists)
    
    # Get "nr_samples" images from each class:
    for i in range(0, nr_classes):
        
        if nr_samples is not None:
            class_image_lists[i] = class_image_lists[i][:nr_samples]
            
    # Get datapoints as np-arrays: 
    datapoints = []
    for j in range(0, nr_classes):   
        
        for z, current_dp in enumerate(class_image_lists[j]):
            
            # Convert current image into a np-arry:
            current_dp = img_to_array(current_dp, dtype=np.uint8) 
            current_dp = np.expand_dims(current_dp, axis=0)    
            
            datapoints.append(current_dp)
       
            if delete_class_image_lists:
                temp = class_image_lists[j][z]
                class_image_lists[j][z] = None
                del temp
    
            if z % 100 == 0:
                gc.collect()
            

    datapoints = np.vstack(datapoints)  

    # Get the labels from datapoints:
    label_list = []
    for k in range(0, nr_classes):
        
        # Set labels for current class to 1 if first class, else set to zero:
        if k == 0:
            class_labels = np.ones(len(class_image_lists[k]), dtype=np.uint8)
        else: 
            class_labels = np.zeros(len(class_image_lists[k]), dtype=np.uint8)
            
        for l in range(1, nr_classes):
            
            if l == k:
                class_labels = np.vstack((class_labels, np.ones(len(class_image_lists[k]), dtype=np.uint8)))
            else:
                class_labels = np.vstack((class_labels, np.zeros(len(class_image_lists[k]), dtype=np.uint8)))
        
        label_list.append(class_labels)
    
    # Reshape label arrays:
    labels = label_list[0].T
    
    for ls in label_list[1:]:
        labels = np.append(labels, ls.T, axis=0)
    
    return datapoints, labels 


In [None]:
def preprocess_images(images, resize_scale=None, blur_radius=None, deep_copy=True):
    
    if deep_copy:
        preprocessed_images = []
    else:
        preprocessed_images = images
    
    if type(images) is not list:
        images = [images]
        
    for z, image in enumerate(images):

        if deep_copy:
            image = image.copy()
        
        # Resize images, if specified:
        if resize_scale is not None:
            image = image.resize(resize_scale)
        
        # Convert images to greyscale:
        image = image.convert('L').convert('RGB')
        
        # Equalize the image histogram:
        image = ImageOps.equalize(image)
        
        # Use blur, if specified:
        if blur_radius is not None:
            image = image.filter(ImageFilter.GaussianBlur(radius=blur_radius)) # default radius = 2;
        
        if deep_copy:
            preprocessed_images.append(image)
        else:
            preprocessed_images[z] = image
        
    return preprocessed_images
 

In [None]:
def segment_lung_area(images, model_path = lung_seg_model_path):
    
    lung_masks = []
    
    # Preprocess test data:  
    X = []
    for image in images:
        
        temp_image = img_as_float(image)[...,0]
        temp_image = transform.resize(temp_image, (256, 256))
        temp_image = exposure.equalize_hist(temp_image)
        temp_image = np.expand_dims(temp_image, -1)
        
        X.append(temp_image)
        
    X = np.array(X)    
    X -= X.mean()
    X /= X.std()

    # Load model
    UNet = load_saved_model(model_path)
    
    i = 0
    for xx in ImageDataGenerator(rescale=1.).flow(X, batch_size=1):
        
        pred_lung_area = UNet.predict(xx)[..., 0].reshape(X[0].shape[:2])
        
        size = 0.02 * np.prod(xx.shape[1:3])
        
        pred_lung_area = pred_lung_area > 0.5
        pred_lung_area = morphology.remove_small_objects(pred_lung_area, size)
        pred_lung_area = morphology.remove_small_holes(pred_lung_area, size)

        # Filter pixels in original image with mask: 

        # [rows, cols] = pr.shape
        # color_mask = np.zeros((rows, cols, 3))
        # color_mask[pr == 1] = [1, 1, 1]
        
        lung_masks.append(pred_lung_area)
        
        i += 1
        if i == X.shape[0]:
            break
            
    return lung_masks
    

In [None]:
def invert_masks_area(masks):
    
    inverted_masks = []
    
    for mask in masks:
        inverted_masks.append(np.logical_not(mask))
        
    return inverted_masks


In [None]:
def mask_lung_area(images, masks):
    
    masked_images = []
    
    # inverted_masks = invert_masks_area(masks)
    
    for i, image in enumerate(images):
        
        mask = np.zeros_like(image, dtype=bool)
        
        for j in range(3):
            mask[:,:,j] = masks[i].copy()
            
        image = img_to_array(image)
            
        image[mask] = 0   
        masked_images.append(array_to_img(image))
        
    return masked_images


## Get and Retrain Model:

In [None]:
def redefine_model_wo_top(model, input_shape = (224,224,3), nr_classes = 2, fc_layer_structure = [2048, 2048]):

    # Define the new input format for the given model (e.g. 3 channels - 224 width x 224 height):
    model_input = Input(shape=input_shape, name = 'model_input')
    
    # Reuse convolutional layer of the given model: 
    model_output = model(model_input)

    # Define new fully connected layer structure:
    x = Flatten(name='flatten')(model_output)
    
    for i, fc_layer_neurons in enumerate(fc_layer_structure): 
        x = Dense(fc_layer_neurons, activation='relu', name='fc' + str(i))(x)
    
    x = Dense(nr_classes, activation='softmax', name='predictions')(x)

    #Create your own model 
    new_model = Model(input = model_input, output = x)

    return new_model


In [None]:
# Define custom weighted categorical crossentropy loss:
def weighted_categorical_crossentropy(w):
    w = backend.variable(w)
        
    def loss(y, y_pred):
        # Preprocess predictions to enable use in loss:
        y_pred /= backend.sum(y_pred, axis=-1, keepdims=True)
        y_pred = backend.clip(y_pred, backend.epsilon(), 1 - backend.epsilon())
        
        # Calculate and return calculated loss:
        loss = -backend.sum(y * backend.log(y_pred) * w, -1)
        return loss
    
    return loss

# Train model for binary classification (loss = categorical_crossentropy):
def train_model(model, x_train, y_train, x_test = None, y_test = None, weighted_loss=None, batch_size = 1, epochs = 10, shuffle = True, optimizer = SGD(lr = 0.0001, decay = 1e-6, momentum = 0.01, nesterov = True)):
    
    if weighted_loss is None:
        model.compile(loss='categorical_crossentropy', optimizer = optimizer, metrics=['accuracy'])
    else:
        model.compile(loss=weighted_categorical_crossentropy(weighted_loss), optimizer = optimizer, metrics=['accuracy'])
    
    score = []
    if x_test is not None:
        history = model.fit(x_train, y_train, validation_data=(x_test,y_test), batch_size = batch_size, epochs = epochs, shuffle = shuffle)
        score = model.evaluate(x_test, y_test, batch_size = batch_size)    
        
    else:
        history = model.fit(x_train, y_train, batch_size = batch_size, epochs = epochs, shuffle = shuffle)
        
        
    return model, score


In [None]:
def evaluate_model(model, x_test, y_test, label_list = ["Normal", "Pneumonia"]):
    
    y_true = [np.argmax(pred) for pred in list(y_test)]
    
    y_pred_prob = model.predict(x_test)
    y_pred = [np.argmax(pred) for pred in list(y_pred_prob)]
    
    classification_report_results = classification_report(y_true, y_pred, target_names=label_list)
    confusion_matrix_results = confusion_matrix(y_true, y_pred)    
    
    roc_auc_score_result = roc_auc_score(y_true, y_pred_prob, multi_class="ovr")
    
    weighted_f1_score_results = f1_score(y_true, y_pred, average='weighted')
    
    return weighted_f1_score_results, roc_auc_score_result, classification_report_results, confusion_matrix_results


## Predict Data:

In [None]:
def prepare_images_for_model(loaded_images, preprocess_input_function = None):
    
    images = []
    for image in loaded_images:
    
        # Convert the image into a np-array:
        image = img_to_array(image)

        # Reshape the image array (add first dimension for the batchsize):
        image = np.expand_dims(image, axis=0) 

        # Preprocess the image for the VGG16-model:
        if preprocess_input_function is None:
            pass
        else:
            image = preprocess_input_function(image)
        
        images.append(image)
    
    images = np.vstack(images)
    return images



In [None]:
def predict_xray_image(model, image, label_list = ["Normal", "Pneumonia"], display_results = False, return_prob = False):
    
    prepared_image = prepare_images_for_model([image], preprocess_input_function = None)

    # Predict class-propabilities for loaded images using the new model:
    new_model_predictions_prob = list(model.predict(prepared_image)[0])
    predict_model_label = label_list[new_model_predictions_prob.index(max(new_model_predictions_prob))]
    
    if display_results:
        
        # Display loaded single image:
        imgplot_xray = plt.imshow(image)
        plt.show()

        # Display top class propabilities from single loaded image: 
        print("Label:", predict_model_label, "\n")        
    
    if return_prob:
        return predict_model_label, new_model_predictions_prob
    else:
        return predict_model_label
    
    

## Use LIME to explain predictions:

In [None]:
def explain_predictions(model, explainer, x_dataset, y_labels, nr_samples = None, indecies = None, classes = ["Normal", "Pneumonia"], num_samples_lime = 1000, num_features_lime = 10, figwidth = 10, figheight = 10, print_explanations = True):

    masks = []
    masks_pred_labels = []
    masks_real_labels = []
    
    if (nr_samples is None) and (indecies is None):
        return -1
    
    # Get number of classes:
    nr_classes = len(classes)
    
    # If no indecies are given, select "nr_samples" samples:
    if indecies is None:
        indecies = random.sample(range(x_data.shape[0]), nr_samples)
    else:
        nr_samples = len(indecies)
    
    if print_explanations:
        # Set plot height and width:
        fig, ax = plt.subplots(nr_samples, nr_classes + 1, sharex='col', sharey='row')
        fig.set_figwidth(figwidth)
        fig.set_figheight(figheight)
                
    xs = [np.array(prepare_images_for_model([x]), dtype=np.uint8)[0] for x in x_dataset]

    # Get explanations for all selected images:
    for j in range(nr_samples):
        explanation = explainer.explain_instance(xs[indecies[j]], model.predict, 
                                                 top_labels=nr_classes, hide_color=0, num_samples=num_samples_lime)

        if print_explanations:
            ax[j,0].imshow(xs[indecies[j]])
            ax[j,0].set_title(classes[list(y_data[indecies[j]]).index(max(list(y_data[indecies[j]])))])

        xs_pred_label, xs_pred_prob = predict_xray_image(model, xs[indecies[j]], label_list = classes, 
                                                         display_results = False, return_prob = True)
        
        
        if print_explanations:
        
            for i in range(nr_classes):
                temp, mask = explanation.get_image_and_mask(i, positive_only=True, 
                                                            num_features=num_features_lime, hide_rest=False)

                if i == classes.index(xs_pred_label):
                    masks.append(mask)
                    masks_pred_labels.append(classes.index(xs_pred_label))
                    masks_real_labels.append(list(y_labels[j]).index(1))
                    
                temp = np.array(temp, dtype=np.uint8)

                ax[j,i+1].imshow(mark_boundaries(temp, mask))
                ax[j,i+1].set_title('p({}) = {:.4f}'.format(classes[i], xs_pred_prob[i]))
                
        else: 
            
            temp, mask = explanation.get_image_and_mask(classes.index(xs_pred_label), positive_only=True, 
                                                        num_features=num_features_lime, hide_rest=False)
            
            masks.append(mask)
            masks_pred_labels.append(classes.index(xs_pred_label))
            masks_real_labels.append(list(y_labels[j]).index(1))
            
    return masks, masks_pred_labels, masks_real_labels
    

In [None]:
def calculate_lung_overlay(explanation_masks, lung_segmentation_masks, labels=None):
    
    overlay_percentages = []
    
    for i, explanation in enumerate(explanation_masks):
    
        explanation_area_px_count = np.sum(np.array(explanation, dtype=int))
        overlay_area_px_count = np.sum(np.array(np.logical_and(lung_segmentation_masks, explanation), dtype=int))
    
        overlay_percentages.append(overlay_area_px_count / explanation_area_px_count)
        
    if labels is not None:
        nr_classes = np.max(labels) + 1
        overlay_labels_percentages = []
        
        print(nr_classes)
        print(overlay_percentages)
        
        for j in range(nr_classes):
            
            label_percentage = np.mean([percentage for k, percentage in enumerate(overlay_percentages) if labels[k] == j])
            overlay_labels_percentages.append(label_percentage)
            
        return overlay_labels_percentages
            
    return np.mean(overlay_percentages)
    

## Save and Load Models & Save Metrics:

In [None]:
def save_model(model, file_name):
    model.save(file_name)  # creates a HDF5 file 'my_model.h5'

def load_saved_model(file_name):
    # Returns a compiled model:
    model = load_model(file_name)
    return model


In [None]:
def save_metrics(file_name, model, x_test, y_test, label_list, overlay_metric = None, overlay_label_metric = None):
    
    w_f1_score, roc_auc, class_report, conf_matrix = evaluate_model(set_model, x_test, y_test, label_list = label_list)

    f = open(file_name, "w+")
    f.write("\nWeighted F1-Score: " + str(w_f1_score) + "; ROC Curve and AUC: " + str(roc_auc))
    f.write("\nClassification Report:\n\n" + str(class_report))
    f.write("\nConfusion Matrix:\n\n" + str(conf_matrix))
   
    if overlay_metric is not None:
        f.write("\n\nOverlay Metric: " + str(overlay_metric))

    if overlay_label_metric is not None:
        f.write("\nOverlay Classes: " + str(overlay_label_metric))
        
    f.close()


# Pipeline - Classification Model:

The first part of the project focuses on training a model for classifying images as "normal", "pneumonia" or "COVID-19". In addition, an explainer will be used to calculate the areas "relevant" to the model that are located in the lung area of the images.

In [None]:
# Set radom seeds:
random.seed(42)
np.random.seed(42)

# -------------------------------------------------
# Set the parameters of the following pipeline run:
# -------------------------------------------------

# Image parameters:
nr_train_samples = None
nr_test_samples = None 

image_size = (256, 256)
blur_radius_parameter = None # integer: e.g. 2

# Model parameters:
selected_model = "ResNet52" # , "VGG16", "ResNet121"

nr_classes = 3
input_shape = (image_size[0],image_size[1],3)
fc_layer_structure = [2048, 2048]

# Training parameters:
shuffle = True
batch_size = 1
epochs = 3 # 5

# Loss parameters:
nr_normal = 7966 
nr_pneumonia = 5459 
nr_covid = 473
nr_data_samples = nr_normal + nr_pneumonia + nr_covid 

weighted_loss = [nr_normal/nr_normal, nr_normal/nr_pneumonia, nr_normal/nr_covid] # None

# Evaluation parameters:
data_label_list = ["Normal", "Pneumonia", "Covid"]

nr_explanation_regions = 3
nr_explanation_iter = 10

overlay_metric = None
overlay_label_metric = None

Load the images:

In [None]:
# Get images from train data set:

# Get list of images with "normal" label:
train_normal_image_list_full = load_type_images_from_dir(data_dir_path + "/train", train_meta_data, "normal", image_size)

# Get list of images with "pneumonia" label:
train_pneumonia_image_list_full = load_type_images_from_dir(data_dir_path + "/train", train_meta_data, "pneumonia", image_size)

# Get list of images with "COVID-19" label:
train_covid19_image_list_full = load_type_images_from_dir(data_dir_path + "/train", train_meta_data, "COVID-19", image_size)


In [None]:
# Get images from test data set:

# Get list of images with "normal" label:
test_normal_image_list_full = load_type_images_from_dir(data_dir_path + "/test", test_meta_data, "normal", image_size)

# Get list of images with "pneumonia" label:
test_pneumonia_image_list_full = load_type_images_from_dir(data_dir_path + "/test", test_meta_data, "pneumonia", image_size)

# Get list of images with "COVID-19" label:
test_covid19_image_list_full = load_type_images_from_dir(data_dir_path + "/test", test_meta_data, "COVID-19", image_size)


Preprocess the loaded images:

In [None]:
# Preprocess loaded training images:
train_normal_image_list_full = preprocess_images(train_normal_image_list_full, blur_radius=blur_radius_parameter, deep_copy=False)
train_pneumonia_image_list_full = preprocess_images(train_pneumonia_image_list_full, blur_radius=blur_radius_parameter, deep_copy=False)
train_covid19_image_list_full = preprocess_images(train_covid19_image_list_full, blur_radius=blur_radius_parameter, deep_copy=False)

# Preprocess loaded test images:
test_normal_image_list_full = preprocess_images(test_normal_image_list_full, blur_radius=blur_radius_parameter, deep_copy=False)
test_pneumonia_image_list_full = preprocess_images(test_pneumonia_image_list_full, blur_radius=blur_radius_parameter, deep_copy=False)
test_covid19_image_list_full = preprocess_images(test_covid19_image_list_full, blur_radius=blur_radius_parameter, deep_copy=False)


Split images into training and test data sets:

In [None]:
# Get datapoints and labels from images:
x_data, y_data = get_dp_label_pair([train_normal_image_list_full, train_pneumonia_image_list_full, train_covid19_image_list_full], nr_samples = nr_train_samples) 
x_test, y_test = get_dp_label_pair([test_normal_image_list_full, test_pneumonia_image_list_full, test_covid19_image_list_full], nr_samples = nr_test_samples)


In [None]:
from numpy import save
path_to_arrays = "../results/image_arrays"

save(path_to_arrays + '/x_data_model_1.npy', x_data) 
save(path_to_arrays + '/y_data_model_1.npy', y_data) 

save(path_to_arrays + '/x_test_model_1.npy', x_test) 
save(path_to_arrays + '/y_test_model_1.npy', y_test) 

In [None]:
from numpy import load
path_to_arrays = "../results/image_arrays"

x_data = load(path_to_arrays + '/x_data_model_1.npy')
y_data = load(path_to_arrays + '/y_data_model_1.npy')

x_test = load(path_to_arrays + '/x_test_model_1.npy')
y_test = load(path_to_arrays + '/y_test_model_1.npy') 

Load pretrained models:

In [None]:
# Load pretrained Keras CNNs without the fully connected layer weights (trained on imagenet-data):

if selected_model is "VGG16":
    model_wo_top = VGG(weights='imagenet', include_top=False)
    
elif selected_model is "ResNet52":
    model_wo_top = ResNet(weights='imagenet', include_top=False)

elif selected_model is "ResNet121":
    model_wo_top = DenseNet(weights='imagenet', include_top=False)


Train the loaded pretrained models: 

In [None]:
set_model = redefine_model_wo_top(model = model_wo_top, input_shape = input_shape, nr_classes = nr_classes, fc_layer_structure = fc_layer_structure)
set_model, score = train_model(set_model, x_data, y_data, x_test = x_test, y_test = y_test, weighted_loss = weighted_loss, batch_size = batch_size, epochs = epochs, shuffle = shuffle)


Evaluate the loaded pretrained models on the test data set: 

In [None]:
w_f1_score, roc_auc, class_report, conf_matrix = evaluate_model(set_model, x_test, y_test, label_list = data_label_list)

print("\nWeighted F1-Score:", w_f1_score, "; ROC Curve and AUC:", roc_auc)
print("\nClassification Report:\n\n", class_report)
print("\nConfusion Matrix:\n\n", conf_matrix)


Use an explainer (= LIME) to create masks of relevant areas for each (preprocessed) test image:

In [None]:
# Define LIME-Explainer Instance:
explainer = lime_image.LimeImageExplainer()

# Set specific indecies:
indecies = list(range(len(x_test)))

In [None]:
# Get Explanations for the trained Model:
explanation_masks, explanation_masks_pred_labels, explanation_masks_real_labels = explain_predictions(
                                                set_model, explainer, x_test, y_test, indecies = indecies, classes = data_label_list, 
                                                num_features_lime = nr_explanation_regions, num_samples_lime = nr_explanation_iter,
                                                figheight = 10, figwidth = 15, print_explanations = False)


Load (non preprocessed) test images:

In [None]:
# Get (new not preprocessed) images from test data set:
test_normal_image_list_full_for_lg = load_type_images_from_dir(data_dir_path + "/test", test_meta_data, "normal", image_size)
test_pneumonia_image_list_full_for_lg = load_type_images_from_dir(data_dir_path + "/test", test_meta_data, "pneumonia", image_size)
test_covid19_image_list_full_for_lg = load_type_images_from_dir(data_dir_path + "/test", test_meta_data, "COVID-19", image_size)

test_images = test_normal_image_list_full_for_lg + test_pneumonia_image_list_full_for_lg + test_covid19_image_list_full_for_lg
test_labels = [0] * len(test_normal_image_list_full_for_lg) + [1] * len(test_pneumonia_image_list_full_for_lg) + [2] * len(test_covid19_image_list_full_for_lg)


Use pretrained lung segmentation model to create masks of lung areas for each (preprocessed) test image:

In [None]:
lung_masks_test_images = segment_lung_area(test_images)


Calculate the overlay area: 

In [None]:
overlay_metric = calculate_lung_overlay(explanation_masks, lung_masks_test_images, labels=None)
overlay_label_metric = calculate_lung_overlay(explanation_masks, lung_masks_test_images, labels=test_labels)


In [None]:
print("\nOverlay Metric:", overlay_metric)
print("\nOverlay Classes:", overlay_label_metric)


Store trained model in the model directory and store evaluation metrics in the metrics directory:

In [None]:
# Save trained Keras-Models:

if selected_model is "VGG16":
    save_model(set_model, model_dir_path + '/' + "vgg_model.h5")
    save_metrics(metric_dir_path +  '/' + "vgg_metrics.txt", set_model, x_test, y_test, data_label_list, overlay_metric = overlay_metric, overlay_label_metric = overlay_label_metric)
    
elif selected_model is "ResNet52":
    save_model(set_model, model_dir_path + '/' + "resnet_model.h5")
    save_metrics(metric_dir_path +  '/' + "resnet_metrics.txt", set_model, x_test, y_test, data_label_list, overlay_metric = overlay_metric, overlay_label_metric = overlay_label_metric)
    
elif selected_model is "ResNet121":
    save_model(set_model, model_dir_path + '/' + "densenet_model.h5")
    save_metrics(metric_dir_path +  '/' + "densenet_metrics.txt", set_model, x_test, y_test, data_label_list, overlay_metric = overlay_metric, overlay_label_metric = overlay_label_metric)


# Pipeline - Evaluation Model:

In [None]:
# Set radom seeds:
random.seed(42)
np.random.seed(42)

# -------------------------------------------------
# Set the parameters of the following pipeline run:
# -------------------------------------------------

# Image parameters:
nr_train_samples = None
nr_test_samples = None 

image_size = (256, 256)
blur_radius_parameter = None # integer: e.g. 2

# Model parameters:
selected_model = "VGG16" # ,"ResNet52", "ResNet121"

nr_classes = 3
input_shape = (image_size[0],image_size[1],3)
fc_layer_structure = [2048, 2048]

# Training parameters:
shuffle = True
batch_size = 1
epochs = 5

# Loss parameters:
weighted_loss = None

# Evaluation parameters:
data_label_list = ["Normal", "Pneumonia", "Covid"]


Load the images:

In [None]:
# Get images from train data set:

# Get list of images with "normal" label:
train_normal_image_list_full = load_type_images_from_dir(data_dir_path + "/train", train_meta_data, "normal", image_size)

# Get list of images with "pneumonia" label:
train_pneumonia_image_list_full = load_type_images_from_dir(data_dir_path + "/train", train_meta_data, "pneumonia", image_size)

# Get list of images with "COVID-19" label:
train_covid19_image_list_full = load_type_images_from_dir(data_dir_path + "/train", train_meta_data, "COVID-19", image_size)


In [None]:
# Get images from test data set:

# Get list of images with "normal" label:
test_normal_image_list_full = load_type_images_from_dir(data_dir_path + "/test", test_meta_data, "normal", image_size)

# Get list of images with "pneumonia" label:
test_pneumonia_image_list_full = load_type_images_from_dir(data_dir_path + "/test", test_meta_data, "pneumonia", image_size)

# Get list of images with "COVID-19" label:
test_covid19_image_list_full = load_type_images_from_dir(data_dir_path + "/test", test_meta_data, "COVID-19", image_size)


Use pretrained lung segmentation model to create masks of lung areas for each (non preprocessed) train & test image:

In [None]:
masked_train_normal_image_list_full = segment_lung_area(train_normal_image_list_full)
masked_train_pneumonia_image_list_full = segment_lung_area(train_pneumonia_image_list_full)
masked_train_covid19_image_list_full = segment_lung_area(train_covid19_image_list_full)

masked_test_normal_image_list_full = segment_lung_area(test_normal_image_list_full)
masked_test_pneumonia_image_list_full = segment_lung_area(test_pneumonia_image_list_full)
masked_test_covid19_image_list_full = segment_lung_area(test_covid19_image_list_full)


Preprocess the loaded images:

In [None]:
# Preprocess loaded training images:
train_normal_image_list_full = preprocess_images(train_normal_image_list_full, blur_radius=blur_radius_parameter, deep_copy=False)
train_pneumonia_image_list_full = preprocess_images(train_pneumonia_image_list_full, blur_radius=blur_radius_parameter, deep_copy=False)
train_covid19_image_list_full = preprocess_images(train_covid19_image_list_full, blur_radius=blur_radius_parameter, deep_copy=False)

# Preprocess loaded test images:
test_normal_image_list_full = preprocess_images(test_normal_image_list_full, blur_radius=blur_radius_parameter, deep_copy=False)
test_pneumonia_image_list_full = preprocess_images(test_pneumonia_image_list_full, blur_radius=blur_radius_parameter, deep_copy=False)
test_covid19_image_list_full = preprocess_images(test_covid19_image_list_full, blur_radius=blur_radius_parameter, deep_copy=False)


Mask lung areas of preprocessed images:

In [None]:
train_normal_image_list_full = mask_lung_area(train_normal_image_list_full, masked_train_normal_image_list_full)
train_pneumonia_image_list_full = mask_lung_area(train_pneumonia_image_list_full, masked_train_pneumonia_image_list_full)
train_covid19_image_list_full = mask_lung_area(train_normal_image_list_full, masked_train_normal_image_list_full)

test_normal_image_list_full = mask_lung_area(test_pneumonia_image_list_full, masked_test_normal_image_list_full)
test_pneumonia_image_list_full = mask_lung_area(test_pneumonia_image_list_full, masked_test_pneumonia_image_list_full)
test_covid19_image_list_full = mask_lung_area(test_covid19_image_list_full, masked_test_covid19_image_list_full)


Split images into training and test data sets:

In [None]:
# Create train-test-split:
normal_image_list_full = [train_normal_image_list_full, test_normal_image_list_full]
pneumonia_image_list_full = [train_pneumonia_image_list_full, test_pneumonia_image_list_full]
covid19_image_list_full = [train_covid19_image_list_full, test_covid19_image_list_full]

# Combine lists of diffrent class images into single test/train list:
train_combined_lists_full = [normal_image_list_full[0], pneumonia_image_list_full[0], covid19_image_list_full[0]]
test_combined_lists_full = [normal_image_list_full[1], pneumonia_image_list_full[1], covid19_image_list_full[1]]

# Get datapoints and labels from images:
x_data, y_data = get_dp_label_pair(train_combined_lists_full, nr_samples = nr_train_samples) 
x_test, y_test = get_dp_label_pair(test_combined_lists_full, nr_samples = nr_test_samples)


Load pretrained models:

In [None]:
# Load pretrained Keras CNNs without the fully connected layer weights (trained on imagenet-data):

if selected_model is "VGG16":
    model_wo_top = VGG(weights='imagenet', include_top=False)
    
elif selected_model is "ResNet52":
    model_wo_top = ResNet(weights='imagenet', include_top=False)

elif selected_model is "ResNet121":
    model_wo_top = DenseNet(weights='imagenet', include_top=False)


Train the loaded pretrained model:

In [None]:
set_model = redefine_model_wo_top(model = model_wo_top, input_shape = input_shape, nr_classes = nr_classes, fc_layer_structure = fc_layer_structure)
set_model, score = train_model(set_model, x_data, y_data, x_test = x_test, y_test = y_test, weighted_loss = weighted_loss, batch_size = batch_size, epochs = epochs, shuffle = shuffle)


Evaluate the loaded pretrained models on the test data set: 

In [None]:
w_f1_score, roc_auc, class_report, conf_matrix = evaluate_model(set_model, x_test, y_test, label_list = data_label_list)

print("\nWeighted F1-Score:", w_f1_score, "; ROC Curve and AUC:", roc_auc)
print("\nClassification Report:\n\n", class_report)
print("\nConfusion Matrix:\n\n", conf_matrix)


Store trained model in the model directory and store evaluation metrics in the metrics directory:

In [None]:
# Save trained Keras-Models:

if selected_model is "VGG16":
    save_model(set_model, model_dir_path + '/' + "vgg_evaluation_model.h5")
    save_metrics(metric_dir_path +  '/' + "vgg_evaluation_metrics.txt", set_model, x_test, y_test, data_label_list)
    
elif selected_model is "ResNet52":
    save_model(set_model, model_dir_path + '/' + "resnet_evaluation_model.h5")
    save_metrics(metric_dir_path +  '/' + "resnet_evaluation_metrics.txt", set_model, x_test, y_test, data_label_list)
    
elif selected_model is "ResNet121":
    save_model(set_model, model_dir_path + '/' + "densenet_evaluation_model.h5")
    save_metrics(metric_dir_path +  '/' + "densenet_evaluation_metrics.txt", set_model, x_test, y_test, data_label_list)
