## SIIM Data Covid-19 ANN classifier
#### Configuration

In [None]:
# default config
model_name = "DenseNet121"
learning_rate = 0.001 # -------------------------- 0.001 - 0.0001
min_learning_rate = 1e-8
batch_size = 32# -------------------------------- 50 - 25
epochs = 10# ------------------------------------ 10 - 20
verbose = 1
img_process_function = "equalize_adapthist"
isKaggleData = True
classification_type = "multi" #------------------ multi - binary
classifier = "ann"

# for SVM classifier
train_num = 5500 / batch_size # 5500 train - validation data
val_num = 800 / batch_size   # 800 test data
show_cv_scores = False
feature_number = 128

use_fine_tuning = True #--------------------------- False - True
use_chex_weights = True

libraries = ["pandas","numpy","sklearn","tensorflow","keras","skimage","matplotlib","seaborn"]
show_versions = True
# svm_hyp_search = "bayes"

show_model_summary = False
save_weights = False

# typical-none, atypical-none, indeterminate-none, all
classes = "all"

#### Libraries

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import accuracy_score, confusion_matrix, roc_auc_score, classification_report, average_precision_score, f1_score, precision_score, recall_score
from sklearn.model_selection import GridSearchCV, StratifiedKFold
from sklearn.svm import SVC
from skimage import exposure

from tensorflow.keras import layers
from tensorflow.keras.layers import Input, Dense, Flatten, Conv2D, MaxPooling2D, Dropout, GlobalAveragePooling2D
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import models
from tensorflow.keras.callbacks import ReduceLROnPlateau, ModelCheckpoint,  EarlyStopping
from tensorflow.keras.models import Model
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.models import Sequential

from tensorflow.keras.metrics import Recall,Precision

from tensorflow.keras.layers import InputLayer, BatchNormalization, Dropout, Flatten, Dense, Activation, MaxPool2D, Conv2D

from tensorflow.keras.applications import VGG16, VGG19, InceptionV3, NASNetMobile, NASNetLarge, DenseNet121, ResNet50, Xception, InceptionResNetV2, EfficientNetB7

import importlib
from skimage import exposure

import warnings
warnings.filterwarnings("ignore")

import os

In [None]:
def display_versions(libraries = None):
    
    from importlib import import_module
    
    for library in libraries:
        print(f"{library} version: {import_module(library).__version__}")

if show_versions:
    display_versions(libraries)

#### Transfer Models

In [None]:
from tensorflow.keras.applications import VGG16, VGG19, InceptionV3, NASNetMobile, NASNetLarge, DenseNet121, ResNet50, Xception, InceptionResNetV2, EfficientNetB7


def get_models():
    
    models_ = dict(
                   # this is used for ChexNet
                    DenseNet121=dict(
                        input_shape=(224, 224, 3),
                        module_name="densenet",
                        last_conv_layer="conv5_block16_concat",
                    ),
                    ResNet50=dict(
                        input_shape=(224, 224, 3),
                        module_name="resnet",
                        last_conv_layer="conv5_block3_out",
                    ),
                    InceptionV3=dict(
                        input_shape=(299, 299, 3),
                        module_name="inception_v3",
                        last_conv_layer="mixed10",
                    ),
                    Xception=dict(
                        input_shape=(299, 299, 3),
                        module_name="xception",
                        last_conv_layer="block14_sepconv2_act",
                    ),
                )
    
    return models_

In [None]:
def prepare_data_for_kaggle():
    
    df_image = pd.read_csv('../input/siim-covid19-detection/train_image_level.csv')
    df_study = pd.read_csv('../input/siim-covid19-detection/train_study_level.csv')
    df_study['id'] = df_study['id'].str.replace('_study',"")
    df_study.rename({'id': 'StudyInstanceUID'},axis=1, inplace=True)
    df_train = df_image.merge(df_study, on='StudyInstanceUID')
    df_train.loc[df_train['Negative for Pneumonia']==1, 'study_label'] = 'negative'
    df_train.loc[df_train['Typical Appearance']==1, 'study_label'] = 'typical'
    df_train.loc[df_train['Indeterminate Appearance']==1, 'study_label'] = 'indeterminate'
    df_train.loc[df_train['Atypical Appearance']==1, 'study_label'] = 'atypical'
    df_train.drop(['Negative for Pneumonia','Typical Appearance', 'Indeterminate Appearance', 'Atypical Appearance'], axis=1, inplace=True)
    df_train['id'] = df_train['id'].str.replace('_image', '.jpg')
    df_train['image_label'] = df_train['label'].str.split().apply(lambda x : x[0])
    df_size = pd.read_csv('../input/covid-jpg-512/size.csv')
    data = df_train.merge(df_size, on='id')
    data = data.drop(["boxes","label","StudyInstanceUID","dim0","dim1","split"], axis = 1)
    img_dir = "../input/covid-jpg-512/train"
    
    return data, img_dir

if isKaggleData:
    data, img_dir = prepare_data_for_kaggle()
else:
    data = pd.read_csv("../train_data.csv")
    img_dir = "../images/train"
    
df_data = data.copy()

#### Data Preprocessing

In [None]:
# drop images from dataframe not in images directory
files = os.listdir("../input/covid-jpg-512/train")

not_in_files_index = []

for file_id in df_data.id:
    if file_id in files:
        continue
    else:
        not_in_files_index.append(df_data[df_data["id"] == file_id].index[0])
        
df_data = df_data.drop(not_in_files_index, axis = 0)

In [None]:
# drop images that have unclear view
drop_df = pd.read_csv("../input/dropped-siim/dropped_image_IDs.csv") + ".jpg"
# splitting images train and test

drop_index = []
for row in drop_df.values:
    drop_index.append(df_data[df_data["id"] == row[0]].index[0])
            
df_data = df_data.drop(drop_index, axis = 0)

# other binary classification
if classes == "typical-none":
    df_data = df_data.drop(df_data[(df_data["study_label"] == "atypical") | (df_data["study_label"] == "indeterminate")].index, axis = 0)
    df_train = df_data.iloc[:int(len(df_data) * 0.80)]
    df_test = df_data.iloc[int(len(df_data) * 0.80):]
elif classes == "atypical-none":
    df_data = df_data.drop(df_data[(df_data["study_label"] == "typical") | (df_data["study_label"] == "indeterminate")].index, axis = 0)
    df_train = df_data.iloc[:int(len(df_data) * 0.80)]
    df_test = df_data.iloc[int(len(df_data) * 0.80):]
elif classes == "indeterminate-none":
    df_data = df_data.drop(df_data[(df_data["study_label"] == "typical") | (df_data["study_label"] == "atypical")].index, axis = 0)
    df_train = df_data.iloc[:int(len(df_data) * 0.85)]
    df_test = df_data.iloc[int(len(df_data) * 0.85):]
else:    
    df_train = df_data.iloc[:5500]
    df_test = df_data.iloc[5500:]

#### Model Building

In [None]:
models_ = get_models()

for model_name in models_.keys():
    
    input_shape = models_[model_name]["input_shape"]
    img_size = input_shape[0]
    
    def generate_images_for_model_training(classifier, classification_type, img_process_function, df_train, df_test, img_dir, img_size, batch_size, validation_split = 0.20):
    
        from tensorflow.keras.preprocessing.image import ImageDataGenerator

        from skimage import exposure

        # Defined image preprocessing functions

        def preprocess_function(img):

            if img_process_function == "equalize_adapthist":
                img = exposure.equalize_adapthist(img/255, clip_limit=0.03, kernel_size=24)
            elif img_process_function == "equalize_hist":
                img = exposure.equalize_hist(img/255, clip_limit=0.03, kernel_size=24)
            elif img_process_function == "rescale_intensity":
                img = exposure.rescale_intensity(img/255, clip_limit=0.03, kernel_size=24)

            return img

        if classification_type == "binary":
            y_col = "image_label"
        else:
            y_col = "study_label"


        image_generator_train = ImageDataGenerator(
                        featurewise_center=False,
                        samplewise_center=False,
                        featurewise_std_normalization=False,
                        samplewise_std_normalization=False,
                        zca_epsilon=1e-06,
                        zca_whitening=False,
                        width_shift_range=0.0,
                        height_shift_range=0.0,
                        brightness_range=[0.8, 1.1],
                        shear_range=0.1,
                        zoom_range=0.1,
                        channel_shift_range=0.0,
                        cval=0.0,
                        horizontal_flip=False,
                        vertical_flip=False,
                        rescale=None,
                        rotation_range=30,
                        preprocessing_function=preprocess_function,
                        validation_split=validation_split)

        image_generator_valid = ImageDataGenerator(validation_split=validation_split,
                                                   preprocessing_function=preprocess_function)


        train_generator = image_generator_train.flow_from_dataframe(
                    dataframe = df_train,
                    directory=img_dir,
                    x_col = 'id',
                    y_col =  y_col,  
                    target_size=(img_size, img_size),
                    batch_size=batch_size,
                    subset='training', 
                    seed = 42, 
                    class_mode = "categorical") 

        valid_generator = image_generator_valid.flow_from_dataframe(
                dataframe = df_train,
                directory=img_dir,
                x_col = 'id',
                y_col = y_col,
                target_size=(img_size, img_size),
                batch_size=batch_size,
                subset='validation', 
                shuffle=False,  
                seed=42, 
                class_mode = "categorical")

        return train_generator, valid_generator

    train_generator, valid_generator = generate_images_for_model_training( classifier = classifier, 
                                                                           classification_type = classification_type, 
                                                                           img_process_function = img_process_function, 
                                                                           df_train = df_train, 
                                                                           df_test = df_test, 
                                                                           img_dir = img_dir, 
                                                                           img_size = img_size, 
                                                                           batch_size = batch_size, 
                                                                           validation_split = 0.15)
    
    
    def get_last_conv_layer(base_model, model_name):
    
        models_ = get_models()
        layer = base_model.get_layer(models_[model_name]["last_conv_layer"])

        return layer
    

    base_model_class = getattr(
        importlib.import_module(
            f"keras.applications.{models_[model_name]['module_name']}"
            ),
            model_name)

    img_input = Input(shape = input_shape)

    base_model = base_model_class(
                include_top = False,
                input_tensor = img_input,
                input_shape = input_shape,
                weights = "imagenet",
                pooling = "avg")
    

    if (model_name == "DenseNet121") & use_chex_weights:

        chex_weights_path = '../input/chexnet-weights/brucechou1983_CheXNet_Keras_0.3.0_weights.h5'
        out = Dense(14, activation='sigmoid')(base_model.output)
        base_model = Model(inputs=base_model.input, outputs=out)
        base_model.load_weights(chex_weights_path)
        base_model.trainable = False
        x = get_last_conv_layer(base_model, model_name).output
        output = GlobalAveragePooling2D()(x)
        output = Dropout(0.1)(output)

    else:
        base_model.trainable = False
        x = get_last_conv_layer(base_model, model_name).output
        output = GlobalAveragePooling2D()(x)
        output = Dropout(0.1)(output)

    if use_fine_tuning:

        base_model.trainable = True

        if classification_type == "multi":
            predictions = Dense(len(df_train.study_label.unique()), activation = "softmax", name = "multi_predictions")(output)
            model = Model(base_model.input, predictions)
            model.compile(Adam(lr=learning_rate),loss='categorical_crossentropy',metrics=['accuracy',Precision(),Recall()])

        else:
            predictions = Dense(len(df_train.image_label.unique()), activation = "softmax", name = "binary_predictions")(output)
            model = Model(base_model.input, predictions)
            model.compile(Adam(lr=learning_rate),loss='binary_crossentropy',metrics=['accuracy',Precision(),Recall()])

        if show_model_summary:
            print(model.summary())

        # Keras callbacks
        rlr = ReduceLROnPlateau(monitor = 'val_loss', factor = 0.1, patience = 2, verbose = verbose, 
                                        min_delta = 1e-4, min_lr = min_learning_rate, mode = 'min')

        es = EarlyStopping(monitor = 'val_loss', min_delta = 1e-4, patience = 5, mode = 'min', 
                            restore_best_weights = True, verbose = verbose)

        ckp = ModelCheckpoint('model.h5',monitor = 'val_loss',
                              verbose = verbose, save_best_only = True, mode = 'min')
        
        print("Training and Validation........................")

        # Model fitting
        history = model.fit(
              train_generator,
              epochs= epochs,
              validation_data=valid_generator,
              callbacks=[es, rlr, ckp],
              verbose= verbose
              )

        if save_weights:
            model.save_weights(f"{model_name}-model.h5")

    else:
        if classification_type == "multi":

            # Building Connecting Model
            predictions = Dense(len(df_train.study_label.unique()), activation = "softmax", name = "multi_predictions")(output)
            model = Model(base_model.input, predictions)
            model.compile(Adam(lr=learning_rate),loss='categorical_crossentropy',metrics=['accuracy',Precision(),Recall()])

        else:
            predictions = Dense(len(df_train.image_label.unique()), activation = "softmax", name = "binary_predictions")(output)
            model = Model(base_model.input, predictions)
            model.compile(Adam(lr=learning_rate),loss='categorical_crossentropy',metrics=['accuracy',"AUC",Precision(),Recall()])

        # Keras callbacks
        rlr = ReduceLROnPlateau(monitor = 'val_loss', factor = 0.1, patience = 2, verbose = verbose, 
                                            min_delta = 1e-4, min_lr = min_learning_rate, mode = 'min')

        es = EarlyStopping(monitor = 'val_loss', min_delta = 1e-4, patience = 5, mode = 'min', 
                                restore_best_weights = True, verbose = verbose)

        ckp = ModelCheckpoint('model.h5',monitor = 'val_loss',
                                  verbose = verbose, save_best_only = True, mode = 'min')
        
        print("Training and Validation........................")

        # Model fitting
        history = model.fit(
                  train_generator,
                  epochs= epochs,
                  validation_data=valid_generator,
                  callbacks=[es, rlr, ckp],
                  verbose= verbose
                )


            
    def plot_tl_metrics(history, model_name):

        hist = pd.DataFrame(history.history)
        hist.index += 1

        fig, (ax1, ax2) = plt.subplots(figsize=(12,12),nrows=2, ncols=1)
        hist['loss'].plot(ax=ax1,c='k',label='Eğitim')
        hist['val_loss'].plot(ax=ax1,c='r',linestyle='--', label='Doğrulama')
        ax1.legend()

        hist['accuracy'].plot(ax=ax2,c='k',label='Eğitim')
        hist['val_accuracy'].plot(ax=ax2,c='r',linestyle='--',label='Doğrulama')
        ax2.legend()
        plt.suptitle(f"{model_name} Kayıp ve Doğruluk Grafikleri")
        plt.show()
        
        
    plot_tl_metrics(history, model_name)
    
    print("Testing.................")
    
    def generate_test_images(classifier, classification_type, 
                       img_process_function, df_train, df_test, img_dir, 
                       img_size, batch_size, validation_split = 0.20):

        from tensorflow.keras.preprocessing.image import ImageDataGenerator

        from skimage import exposure

        if classification_type == "binary":
            y_col = "image_label"
        else:
            y_col = "study_label"

        def preprocess_function(img):

            if img_process_function == "equalize_adapthist":
                img = exposure.equalize_adapthist(img/255, clip_limit=0.03, kernel_size=24)
            elif img_process_function == "equalize_hist":
                img = exposure.equalize_hist(img/255, clip_limit=0.03, kernel_size=24)
            elif img_process_function == "rescale_intensity":
                img = exposure.rescale_intensity(img/255, clip_limit=0.03, kernel_size=24)

            return img


        image_generator_test = ImageDataGenerator(preprocessing_function=preprocess_function)

        test_generator = image_generator_test.flow_from_dataframe(
                    dataframe = df_test,
                    directory=img_dir,
                    x_col = 'id',
                    y_col = y_col,
                    target_size=(img_size, img_size),
                    batch_size=batch_size,
                    shuffle=False,  
                    seed=42, 
                    class_mode = "categorical")

        return test_generator

    test_generator = generate_test_images(classifier, 
                                         classification_type, 
                                         img_process_function, 
                                         df_train, df_test, 
                                         img_dir, img_size, batch_size)
    

    
    actual =  test_generator.labels
    preds_ = model.predict(test_generator)
    preds = np.argmax(model.predict(test_generator), axis=1)
    cfmx = confusion_matrix(actual, preds)

    sns.heatmap(cfmx, annot=True, cmap='Blues',
        xticklabels=list(test_generator.class_indices.keys()),
            fmt='.0f', 
            yticklabels=list(test_generator.class_indices.keys())
            )

    plt.xlabel("Predictions")
    plt.ylabel("True Labels")
    plt.show()
    
    print("Results:")
    print("-------------------------")
    print("Model name:",model_name)
    print("Classification type:",classification_type)
    print("Classifier:",classifier)
    print("Image preprocess function:",img_process_function)

    print("-------------------------")
    print("Batch size:",batch_size)
    print("Learning rate:",learning_rate)
    print("Number of Epoch:",epochs)

    if model_name == "DenseNet121":
        print("Chexnet weights status:",use_chex_weights)

    print("Classification Metrics:")
    print("-------------------------")
    
    print(classification_report(actual, preds, digits = 3))
    
    if classification_type == "binary":
        def acc_score_manual(cfmx):
            TP = cfmx[1][1]
            FP = cfmx[1][0]
            FN = cfmx[0][1]
            TN = cfmx[0][0]
            return (TP + TN) / (TP + FN + FP + TN)

        def spe_score_manual(cfmx):
            TP = cfmx[1][1]
            FP = cfmx[1][0]
            FN = cfmx[0][1]
            TN = cfmx[0][0]
            return (TN) / (FP + TN)

        def sen_rec_score_manual(cfmx):
            TP = cfmx[1][1]
            FP = cfmx[1][0]
            FN = cfmx[0][1]
            TN = cfmx[0][0]
            return (TP) / (TP + FN)

        def pre_score_manual(cfmx):
            TP = cfmx[1][1]
            FP = cfmx[1][0]
            FN = cfmx[0][1]
            TN = cfmx[0][0]
            return (TP) / (TP + FP)

        def f1_score_manual(cfmx):
            TP = cfmx[1][1]
            FP = cfmx[1][0]
            FN = cfmx[0][1]
            TN = cfmx[0][0]
            return (2*TP) / (FP + FN + (2*TP))

        acc = acc_score_manual(cfmx)
        spe = spe_score_manual(cfmx)
        sen = sen_rec_score_manual(cfmx)
        pre = pre_score_manual(cfmx)
        f11 = f1_score_manual(cfmx)
        
        print("Accuracy: ",acc)
        print("Specificity:",spe)
        print("Precision:",pre)
        print("Sensitivity *Recall*:",sen)
        print("F1 score:",f11)
        print("ROC AUC Score",roc)
        
    print("Accuracy: ",accuracy_score)
    print("Weighted Precision:",precision_score(actual, preds, average = "weighted"))
    print("Weighted Sensitivity *Recall*:",recall_score(actual, preds, average = "weighted"))
    print("Weighted F1 score:",f1_score(actual, preds, average = "weighted"))
    if classification_type == "binary":
        print("Weighted ROC AUC Score",roc_auc_score(actual, preds, average="weighted"))
    else:
        print("Weighted ROC AUC Score",roc_auc_score(actual, preds_, average="weighted", multi_class="ovr"))