In [None]:
from keras.applications import VGG19, VGG16, Xception, InceptionV3
from keras.layers import Dense, GlobalAveragePooling2D, Dropout
from keras.models import Model
from tensorflow import keras
import keras
import numpy as np

METRICS = [
      keras.metrics.TruePositives(name='tp'),
      keras.metrics.FalsePositives(name='fp'),
      keras.metrics.TrueNegatives(name='tn'),
      keras.metrics.FalseNegatives(name='fn'), 
      keras.metrics.BinaryAccuracy(name='accuracy'),
      keras.metrics.Precision(name='precision'),
      keras.metrics.Recall(name='recall'),
      keras.metrics.AUC(name='auc'),
]


def generate_model(model, input_shape, fully_n=128, opt='sgd', lr=0.01): #,metrics=METRICS
    """Return a modelo from Keras
      model: model name. 
      input_shape: input shape images.
      fully_n: number of neurons of fully connected layer.
      opt: optimizer.
      lr: optimizer learning rate.
      metrics: metrics to measure model performance.
    """

    models = {'vgg19':VGG19(weights='imagenet', include_top=False, input_shape=input_shape),
              'vgg16':VGG16(weights='imagenet', include_top=False, input_shape=input_shape),
              'xception':Xception(weights='imagenet', include_top=False, input_shape=input_shape),
              'inception':InceptionV3(weights='imagenet', include_top=False, input_shape=input_shape)}
    
    optimizer= {'sgd': keras.optimizers.SGD(learning_rate=lr),
                'adam':keras.optimizers.Adam(learning_rate=lr),
                'rmsprop':keras.optimizers.RMSprop(learning_rate=lr),
                'adadelta': keras.optimizers.Adadelta(learning_rate=lr)}

    print("CHOOSED OPTIMIZER:", str(optimizer[opt]))
    # create the base model
    base_model = models[model]

    # Add average and pooling layer
    x = base_model.output
    x = GlobalAveragePooling2D()(x)
    # Añade classifier layer
    x = Dense(fully_n, activation='relu')(x)
    x = Dropout(0.2)(x)
    # Output layer
    predictions = Dense(units=1, activation='sigmoid')(x)

    model = Model(inputs=base_model.input, outputs=predictions)

    for layer in model.layers:
        layer.trainable = True

    model.compile(optimizer[opt], loss='binary_crossentropy')
    model.summary()
    
    return model

In [None]:
import pandas as pd 
import os
import matplotlib.pyplot as plt
from keras.preprocessing.image import ImageDataGenerator


TRAIN_PATH = '../input/chest-xray-pneumonia/chest_xray/train'
TEST_PATH = '../input/chest-xray-pneumonia/chest_xray/test'
PATHS = [TRAIN_PATH, TEST_PATH]
IMG_SIZE = (400 , 400)
COLOR = "rgb"
BATCH_S = 16
CLASS_M = 'binary'

def create_generators(paths, img_size, color, batch_s, class_m, seed=1):  
    """ Return ImageDataGenerators from a path.
      paths: training and test folder path.
      img_size: new images shape.
      batch_s: batch size.
      class_m: binary o categorical for one or more classes.
      seed: remove randomness.
    """
    #ImageDataGenerator 
    train_datagen = ImageDataGenerator(rescale=1./255)
    test_datagen = ImageDataGenerator(rescale=1./255)
    valid_datagen = ImageDataGenerator(rescale=1./255)

    # Training generator
    train_generator = train_datagen.flow_from_directory(directory=paths[0],
                                                        target_size=img_size,
                                                        color_mode=color,
                                                        batch_size=batch_s,
                                                        class_mode=class_m,
                                                        shuffle=True,
                                                        seed=seed)
  
    # Test generator
    test_generator = test_datagen.flow_from_directory(directory=paths[1],
                                                      target_size=img_size,
                                                      color_mode=color,
                                                      batch_size=batch_s,
                                                      class_mode=class_m,
                                                      shuffle=False)
    
  
    return train_generator, test_generator



def create_generators_DA(paths, img_size, color, batch_s, class_m, seed=1):  
    """ Return ImageDataGenerators from a path using Data Generator.
      paths: training and test folder path.
      img_size: new images shape.
      batch_s: batch size.
      class_m: binary o categorical for one or more classes.
      seed: remove randomness.
    """
    #ImageDataGenerator using Data Augmentation
    train_datagen_DA = ImageDataGenerator(
        rescale=1. / 255,
        shear_range=0.2,
        zoom_range=0.2,
        horizontal_flip=True,
        brightness_range=[0.5, 1.5])

    train_generator_DA = train_datagen_DA.flow_from_directory(
      directory=paths[0],
      target_size=img_size,
      color_mode=color,
      batch_size=batch_s,
      class_mode=class_m,
      shuffle=True,
      seed=seed)

    return train_generator_DA


In [None]:
from sklearn.metrics import confusion_matrix, roc_curve

def conf_matrix(model, identifier):
    model_name = identifier.split('_')[0]
    true_labels = test_generator.classes.tolist()

    test_generator.reset()
    # Generate predictions array
    predicted_labels = np.squeeze(model.predict_generator(test_generator))
    predicted_labels = [1 if prediccion>=0.5 else 0 for prediccion in predicted_labels]

    cm = confusion_matrix(true_labels, predicted_labels)

    plt.style.use('seaborn-ticks')
    fig, ax = plt.subplots()
    im = ax.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
    ax.figure.colorbar(im, ax=ax)
    ax.set(xticks=np.arange(cm.shape[1]),
         yticks=np.arange(cm.shape[0]),
         xticklabels=['Normal', 'Pneumonia'], yticklabels=['Normal', 'Pneumonia'],
         title=identifier,
         ylabel='Valor verdadero',
         xlabel='Predicción')

    plt.setp(ax.get_xticklabels(), rotation=45, ha="right",
           rotation_mode="anchor")

    fmt = '.0f' 
    thresh = cm.max() / 2.
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            ax.text(j, i, format(cm[i, j], fmt),
                    ha="center", va="center",
                    color="white" if cm[i, j] > thresh else "black")
    fig.tight_layout()
    plt.savefig(('{}_MatrizConfusion.png').format(identifier), dpi=300)
    plt.show()
    return cm

    
def plot_roc_curve(identifier, fpr, tpr, label=None):
    model_name = identifier.split('_')[0]
    plt.plot(fpr, tpr, linewidth=2, label=label)
    plt.plot([0,1], [0,1], 'k--') 
    plt.grid(True)
    plt.xlabel('Falsos positivos [%]')
    plt.ylabel('Verdaderos positivos (Recall) [%]')
    plt.savefig(('{}_plot_ROC.png').format(identifier), dpi=300)


In [None]:
import pandas as pd
from sklearn.metrics import f1_score
import time

# Clase para medir la duración de cada epoch
class TimeHistory(keras.callbacks.Callback):
    def on_train_begin(self, logs={}):
        self.times = []
        self.total_time = 0
    def on_epoch_begin(self, batch, logs={}):
        self.epoch_time_start = time.time()

    def on_epoch_end(self, batch, logs={}):
        self.times.append(time.time() - self.epoch_time_start)
        self.total_time += time.time() - self.epoch_time_start


def train_and_save(opt,  model_name, lr, epochs, train_generator, test_generator, input_shape = None, fully_n=None,  weights=None):
    time_callback = TimeHistory()
    model = generate_model(model_name, input_shape = input_shape, fully_n = fully_n, opt=opt, lr=lr)
    identifier =  '{}_{}_{}_{}'.format(model_name, epochs, opt, lr) 
    print("Model: {}.\nNumber of epochs: {}. \nOptimizer: {}\nLearning rate: {}".format(model_name, epochs, opt, lr))
    
    # Model train
    history = train(model, train_generator, epochs, val_gen=test_generator, weights=weights) #, callbacks=[time_callback]
    
    # Printing metrics
    test_generator.reset()
    #baseline_results = model.evaluate_generator(test_generator, verbose=0)
    #for name, value in zip(model.metrics_names, baseline_results):
    #      print(name, ': ', value)

    # Calculating F1-Score
    #test_generator.reset()
    #Y_test =  test_generator.classes.tolist()
    #Y_pred = model.predict_generator(test_generator)
    #Y_pred_class = [1 if i[0] >=0.5 else 0 for i in Y_pred]
    #f1 = f1_score(Y_test, Y_pred_class)
    #print("f1-score: ", f1)
    
    # Plot metrics
    #plot_metrics(history, identifier)
    
    # Matrix confussion and ROC
    #colors = plt.rcParams['axes.prop_cycle'].by_key()['color']
    #conf_matrix(model, identifier)
    
    #fpr, tpr, thresholds = roc_curve(Y_test, Y_pred)
    #plot_roc_curve(identifier, fpr, tpr)
    #plt.show()

    #Save history as np object
    #np.save('{}_acc.npy'.format(identifier), history.history['val_accuracy'])
    #np.save('{}_recall.npy'.format(identifier), history.history['val_recall'])
    #np.save('{}_precision.npy'.format(identifier), history.history['val_precision'])
    #np.save('{}_AUC.npy'.format(identifier), history.history['val_auc'])

    #Save training in csv file
    #csv_columns = ["Modelo","Optimizador", "Tasa de aprendizaje", "Duración del entrenamiento", "Pérdida", "Precisión", "Recall", "AUC", "Tasa de aciertos", "Puntuación F1"]
    #results_df = pd.DataFrame(columns=csv_columns)
    #results_df.loc[len(results_df)] = [
    #    model_name,
    #    opt,
    #    lr,
    #    "{} s".format(str(int(time_callback.total_time))),
    #    round(baseline_results[0], 3),
    #    round(baseline_results[6], 3),
    #    round(baseline_results[7], 3),
    #    round(baseline_results[8], 3),
    #    round(baseline_results[5],3),
    #    round(f1,3)
    #]
    #results_df.to_csv('{}_df.csv'.format(identifier), index=False, encoding='utf-8-sig')

    # Save weights
    model.save('{}.h5'.format(identifier))
    

In [None]:
def plot_metrics(history, identifier):
    """Plot loss, auc, precision and recall metrics and save it as png file
    history: History. training history.
    identifier: String. 
    """
    model_name = identifier.split('_')[0]
    colors = plt.rcParams['axes.prop_cycle'].by_key()['color']
    metrics =  ['loss', 'auc', 'precision', 'recall']
    plt.figure(figsize=(11, 9))
    for n, metric in enumerate(metrics):
        name = metric.replace("_"," ").capitalize()
        plt.subplot(2,2,n+1)
        plt.plot(history.epoch, history.history[metric], color=colors[0], label='Entrenamiento')
        plt.plot(history.epoch, history.history['val_'+metric],
                 color=colors[0], linestyle="--", label='Validación')
        plt.xlabel('Epoch')
        plt.ylabel(name)
        if metric == 'loss':
            plt.ylim([0, plt.ylim()[1]])
        elif metric == 'auc':
            plt.ylim([0.8,1])
        else:
            plt.ylim([0,1])

        plt.legend()
    plt.tight_layout()
    plt.savefig(('{}_plot_metricas.png').format(identifier), dpi=300)


In [None]:
def train(model, tr_gen, epochs, val_gen=None,callbacks=None ,weights=None):
    """Train a model.
    model: model that will be trained.
    tr_gen: train generator.
    epochs: number of iterations.
    val_gen: Generator to test the model every epoch.
    callbacks: Callback Keras.
    weights: wieght for each class.
    """
    hist = model.fit_generator(
            tr_gen,
            epochs=epochs,
            verbose=1,
            validation_data=val_gen,
            callbacks=callbacks,
            class_weight=weights)
    return hist

# Entrenamiento básico

In [None]:
# Training parameters
opt = 'adam' # sgd, adam, adadelta, rmsprop
lr = 0.0001 #0.01, 0.001, 0.0001
epochs = 10
model = 'inception'

INPUT_SHAPE = (400, 400, 3)
train_generator, test_generator = create_generators(PATHS, IMG_SIZE,
                                                    COLOR, BATCH_S,
                                                    CLASS_M)  #

train_and_save(opt,
             model,
             train_generator,
             test_generator,
             input_shape = INPUT_SHAPE,
             fully_n = 256)


# Data augmentation

In [None]:
#Training parameters
opt = 'sgd' # sgd, adam, adadelta, rmsprop
lr = 0.001 #0.01, 0.001, 0.0001
epochs = 2
model = 'vgg16'

INPUT_SHAPE = (400, 400, 3)
train_generator_DA = create_generators_DA(PATHS, IMG_SIZE,
                                      COLOR, BATCH_S,
                                      CLASS_M)  #

_,test_generator = create_generators(PATHS, IMG_SIZE,
                                   COLOR, BATCH_S,
                                   CLASS_M)  


train_and_save(opt,
             model,
             lr,
             epochs,
             train_generator_DA,
             test_generator,
             input_shape = INPUT_SHAPE,
             fully_n = 256)
test_generator.reset()

# DA + Pesos

In [None]:
from sklearn.utils import class_weight
# Training parameters
opt = 'rmsprop' #adam, adadelta, rmsprop
lr = 0.0001 #0.01, 0.001, 0.0001
epochs = 10
model = 'inception'

INPUT_SHAPE = (400, 400, 3)
train_generator_DA = create_generators_DA(PATHS, IMG_SIZE,
                                       COLOR, BATCH_S,
                                       CLASS_M)  

_,test_generator = create_generators(PATHS, IMG_SIZE,
                                    COLOR, BATCH_S,
                                    CLASS_M)  

# Generating weights
true_labels = test_generator.classes.tolist()
class_weight = class_weight.compute_class_weight('balanced',
                                                 np.unique(train_generator_DA.classes),
                                                 true_labels)

# Convertin weigths to dict (Keras requiriment)
class_weight_dict = dict()
for clas, weight in zip(np.unique(train_generator_DA.classes), class_weight):
      class_weight_dict[clas] = weight
  
print("Choosed weigths: ", class_weight_dict)



train_and_save(opt,
              model,
              lr,
              epochs,
              train_generator_DA,
              test_generator,
              input_shape = INPUT_SHAPE,
              fully_n = 256,
              weights = class_weight_dict)