In [None]:
import os
import sys
import tensorflow as tf
import matplotlib.pyplot as plt
import numpy as np
from xception_benchmark_utility import plot_cnn_performance, plot_cnn_performance_wo_lr

from matplotlib.backends.backend_pdf import PdfPages

from pathlib import Path

#os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"
from tensorflow import keras
from keras import layers
from livelossplot import PlotLossesKeras


print(tf.__version__)
print(keras.__version__)

physical_devices = tf.config.list_physical_devices('GPU')

print(tf.config.list_logical_devices)
print('GPU name: ', tf.config.experimental.list_physical_devices('GPU'))

Path locations for project root & data

In [None]:
WORKING_DIR = Path.cwd().resolve()
print(f"Working directory: {WORKING_DIR}")
ROOT_PATH = WORKING_DIR.parent
print(f"Project root path: {ROOT_PATH}")
DATA_PATH = os.path.join(ROOT_PATH.parent.parent, "Data", "Deep_Data")
print(f"Data path: {DATA_PATH}")

In [None]:
SAVED_MODELS_DIR = os.path.join(WORKING_DIR, "saved_models")
print(SAVED_MODELS_DIR)

Used Model:

In [None]:
used_model = "Xception"

Image & batch size configuration

In [None]:
IMG_SIZE = (150, 150)
BATCH_SIZE = 32  # usually 32, small batch size, to be able to split validation batches in test- and validation set, with only little images available

In [None]:
train_ds = tf.keras.utils.image_dataset_from_directory(
  os.path.join(DATA_PATH, 'train_img'),
  validation_split=0.2,
  subset="training",
  seed=24,
  image_size=IMG_SIZE,
  batch_size=BATCH_SIZE)

train_ds_np = np.array(train_ds.as_numpy_iterator())

In [None]:
val_ds = tf.keras.utils.image_dataset_from_directory(
    os.path.join(DATA_PATH, "train_img"),
    validation_split=0.2,
    subset="validation",
    seed=24,
    image_size=IMG_SIZE,
    batch_size=BATCH_SIZE,
)

# Splits the validation dataset batch wise for the test set
test_size = int(0.5 * len(val_ds))  # int(0.5 * len(val_ds))
test_ds = val_ds.take(test_size)
val_ds = val_ds.skip(test_size)

# Prints size of reserved batches for each set
print('Batches for training -->', train_ds.cardinality())
print('Batches for validating -->', val_ds.cardinality())
print('Batches for testing -->', test_ds.cardinality())



In [None]:
class_names = train_ds.class_names


plt.figure(figsize=(10, 10))
for images, labels in train_ds.take(1):
    for i in range(9):
        ax = plt.subplot(3, 3, i + 1)
        plt.imshow(images[i].numpy().astype("uint8"))
        plt.title(class_names[labels[i]], color="green")
        plt.axis("off")
plt.show()

Resizing the data

In [None]:
size = (150, 150)

train_ds = train_ds.map(lambda x, y: (tf.image.resize_with_pad(x, 150,150), y))
validation_ds = val_ds.map(lambda x, y: (tf.image.resize_with_pad(x, 150,150), y))
test_ds = test_ds.map(lambda x, y: (tf.image.resize_with_pad(x, 150,150), y))


Conversion to greyscale

In [None]:
for images, labels in train_ds.take(1):
    for i in range(9):
        ax = plt.subplot(3, 3, i + 1)
        plt.imshow(images[i].numpy().astype("uint8"), cmap = "Greys")
        plt.title(class_names[labels[i]], color="green")
        plt.axis("off")
plt.show()

Data augmentation

In [None]:
data_augmentation = keras.Sequential(
    [
        layers.RandomFlip("horizontal_and_vertical"),
        layers.RandomRotation(0.2),
        layers.RandomContrast(factor=0.2, seed=24),
        keras.layers.RandomBrightness(factor=0.2, seed=24) # Images are usually exposure compensated to middle grey ->
        # normalized exposure, hence are random brightness various only sensible for very small values *dani
    ]
)

for images, labels in train_ds.take(1):
    plt.figure(figsize=(10, 10))
    first_image = images[0]
    for i in range(9):
        ax = plt.subplot(3, 3, i + 1)
        augmented_image = data_augmentation(
            tf.expand_dims(first_image, 0), training=True
        )
        plt.imshow(augmented_image[0].numpy().astype("int32"), cmap = "Greys")
        plt.title(int(labels[0]), color="green")
        plt.axis("off")

Building a model

In [None]:
def build_base_model():
    base_model = keras.applications.Xception(
        weights="imagenet",  # Load weights pre-trained on ImageNet.
        input_shape=(150, 150, 3),
        include_top=False,
    )  # Do not include the ImageNet classifier at the top.

    # Freezing the base model layers
    base_model.trainable = False

    # Create new model on top
    inputs = keras.Input(shape=(150, 150, 3))  # (150, 150, 3))
    x = data_augmentation(inputs)  # Apply random data augmentation

    # Pre-trained Xception weights requires that input be scaled
    # from (0, 255) to a range of (-1., +1.), the rescaling layer
    # outputs: `(inputs * scale) + offset`
    scale_layer = keras.layers.Rescaling(scale=1 / 127.5, offset=-1)
    x = scale_layer(x)

    # The base model contains batchnorm layers. We want to keep them in inference mode
    # when we unfreeze the base model for fine-tuning, so we make sure that the
    # base_model is running in inference mode here.
    x = base_model(x, training=False)

    return x, inputs

# Call this function to compare the optimized model to trained stock model

def model_stock():
    # WORKING_DIR = Path(__file__).resolve().parents[1]
    saved_stock_model_exists = os.path.exists(
        os.path.join(WORKING_DIR, r"xception\Saved_models\stock_model")
    )

    if saved_stock_model_exists == True:
        model_stock = keras.models.load_model(
            os.path.join(WORKING_DIR, r"xception\Saved_models\stock_model")
        )

    else:
        base_model = keras.applications.Xception(
            weights="imagenet",  # Load weights pre-trained on ImageNet.
            input_shape=(150, 150, 3),
            include_top=False,
        )  # Do not include the ImageNet classifier at the top.

        # Freezing the base model layers
        base_model.trainable = False

        inputs = keras.Input(shape=(150, 150, 3))
        x = data_augmentation(inputs)  # Apply random data augmentation

        scale_layer = keras.layers.Rescaling(scale=1 / 127.5, offset=-1)
        x = scale_layer(x)

        # The base model contains batchnorm layers. We want to keep them in inference mode
        # when we unfreeze the base model for fine-tuning, so we make sure that the
        # base_model is running in inference mode here.
        x = base_model(x, training=False)
        x = keras.layers.GlobalAveragePooling2D()(x)
        x = keras.layers.Dropout(0.2)(x)  # Regularize with dropout
        x = keras.layers.Dense(4, activation="relu")(x)
        outputs = tf.keras.layers.Softmax()(x)

        model_stock = keras.Model(inputs, outputs)

        model_stock.compile(
            optimizer="Nadam",
            loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
            metrics=["accuracy"],
        )

        epochs = 20
        model_stock.fit(train_ds, epochs=epochs, validation_data=validation_ds)

    # model_stock.save(os.path.join(WORKING_DIR,r"saved_models\stock_model"))

    return model_stock

Setup of top layer hyperparameter optimization
We assume that hyperparameter adjustments make the most sense on the top layer, as this is not per se part of the Xception architecture and adjusted to our problem, extensive parameters search while fine tuning would make the use of transfer learning somewhat redundant

Training the top layer - change "compile_base" if you want to train the base model, loads otherwise a prior base model

In [None]:
import keras_tuner


def model_top_grad_student_descent():
    x, inputs = build_base_model()

    x = keras.layers.GlobalAveragePooling2D()(x)
    x = keras.layers.Dropout(0.2)(x)  # Regularize with dropout

    #### Custom Custom Top Layer
    x = keras.layers.Dense(128, activation="relu")(x)
    x = keras.layers.Dense(4, activation="relu")(x)
    outputs = tf.keras.layers.Softmax()(x)

    model = keras.Model(inputs, outputs)

    optimizer_freeze = keras.optimizers.Nadam()

    model.compile(
        optimizer=optimizer_freeze,
        loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=["accuracy"],
    )

    return model


def model_top_simple_classifier():
    x, inputs = build_base_model()

    x = keras.layers.GlobalAveragePooling2D()(x)
    x = keras.layers.Dropout(0.2)(x)  # Regularize with dropout

    #### Custom Custom Top Layer
    x = keras.layers.Dense(4, activation="relu")(x)
    outputs = tf.keras.layers.Softmax()(x)

    model = keras.Model(inputs, outputs)

    optimizer_freeze = keras.optimizers.Nadam()

    model.compile(
        optimizer=optimizer_freeze,
        loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=["accuracy"],
    )

    return model


### Top Layer for hyperparameter search
def model_top_layer(hp):

    x, inputs = build_base_model()
    x = keras.layers.GlobalAveragePooling2D()(x)
    x = keras.layers.Dropout(0.2)(x)  # Regularize with dropout

    for i in range(hp.Int("layers", 0, 3)):
        if i > 0:
            x = keras.layers.Dense(
                units=hp.Int("units_" + str(i), 64, 961, step=64),
                activation=hp.Choice("act_" + str(i), ["relu", "selu"]),
            )(x)
            x = keras.layers.Dropout(
                rate=hp.Float(
                    "dropout_" + str(i),
                    min_value=0.2,
                    max_value=0.5,
                    default=0.20,
                    step=0.10,
                )
            )(
                x
            )  # Regularize with dropout
        else:
            continue

    x = keras.layers.Dense(4, activation=hp.Choice("activation", ["relu", "selu"]))(x)

    outputs = tf.keras.layers.Softmax()(x)
    model = keras.Model(inputs, outputs)
    optimizer_freeze = keras.optimizers.Nadam()

    model.compile(
        optimizer=optimizer_freeze,
        loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=["accuracy"],
    )

    return model


Sets runlog ids hyperparameter and fine tuning

In [None]:
LOG_DIR = os.path.join(Path.cwd(), "logs")
print(f"TensorBoard logs saved to follwing dir: {LOG_DIR}")


def get_run_logdir():
    import time

    run_id = time.strftime("run_%Y_%m_%d-%H_%M_%S")
    return os.path.join(LOG_DIR, run_id), run_id


run_logdir, run_id = get_run_logdir()
print(run_id)
print(os.path.join("hyperparam_logs", run_id))


Setup of top layer hyperparameter search


In [None]:
tuner_typ_1 = "random"
tuner_typ_2 = "hyper"
tuner_typ_3 = "bayes"

tuner_select = tuner_typ_1

tuner_switch_on = "on"
tuner_switch_off = "off"

tuner_state = tuner_switch_off

In [None]:
if tuner_state == tuner_switch_off:
    print("No hyperparameter tuning")

    model = model_top_grad_student_descent()
    model.summary()
    best_model = model

else:
    model = model_top_layer(keras_tuner.HyperParameters())
    model.summary()

    if tuner_select == tuner_typ_1:
        tuner = keras_tuner.RandomSearch(
            hypermodel=model_top_layer,
            objective="val_loss",
            max_trials=10,
            executions_per_trial=3,
            overwrite=True,
            directory="hyperparam_logs",
            project_name=run_id,
        )

        tuner.search_space_summary()
        tuner.search(
            train_ds,
            epochs=3,
            validation_data=val_ds,
            callbacks=[
                keras.callbacks.TensorBoard(os.path.join("hyperparam_logs", run_id))
            ],
        )

    elif tuner_select == tuner_typ_2:
        tuner = keras_tuner.Hyperband(
            model_top_layer,
            objective="val_loss",
            max_epochs=30,
            factor=3,
            directory="hyperparam_logs",
            project_name=run_id,
        )

        early_stop_hyperband = tf.keras.callbacks.EarlyStopping(
            monitor="val_loss", patience=3
        )

        tuner.search(
            train_ds,
            epochs=2,
            validation_data=val_ds,
            callbacks=[
                keras.callbacks.TensorBoard(os.path.join("hyperparam_logs", run_id)),
                early_stop_hyperband,
            ],
        )

    elif tuner_select == tuner_typ_3:
        tuner = keras_tuner.BayesianOptimization(
            model_top_layer,
            objective="val_loss",
            max_trials=10,
            num_initial_points=2,
            alpha=0.0001,
            beta=2.6,
            seed=24,
            directory="hyperparam_logs",
            project_name=run_id,
        )
        tuner.search(
            train_ds,
            epochs=3,
            validation_data=val_ds,
            callbacks=[
                keras.callbacks.TensorBoard(os.path.join("hyperparam_logs", run_id))
            ],
        )
    else:
        print("Invalid Tuner selected")

In [None]:
tensorboard_cb = keras.callbacks.TensorBoard(run_logdir)

In [None]:
#Training the top layer with the best hps

model_top = best_model

early_stop_callback_top = tf.keras.callbacks.EarlyStopping(monitor='loss', patience=10, restore_best_weights=True)
epochs = 20
history_top_training = model_top.fit(train_ds, epochs=epochs, validation_data=validation_ds, callbacks = [early_stop_callback_top, PlotLossesKeras(), tensorboard_cb])

In [None]:
plot_cnn_performance_wo_lr(history_top_training)

with PdfPages(os.path.join(run_logdir, run_id +  "history_top_tuning"  'performance.pdf')) as pdf:
    pdf.savefig(plot_cnn_performance_wo_lr(history_top_training), bbox_inches='tight')

keras.backend.clear_session() #releases allocated memory

In [None]:
np.save(os.path.join(run_logdir, run_id +"_history_top_training.npy"), history_top_training.history)

In [None]:
# Tensorboard allows to monitor performance

%load_ext tensorboard
%tensorboard --reload_multifile True --logdir = #-removed-

In [None]:
# Get model summary as a string
def get_summary_str(model):
    lines = []
    model.summary(print_fn=lines.append)
    # Add initial spaces to avoid markdown formatting in TensorBoard
    return "    " + "\n    ".join(lines)


# Write a string to TensorBoard (2.x)
def write_string_summary_v2(writer, s, run_id):
    with writer.as_default():
        tf.summary.text(
            "Model configuration for " + run_id + "\n Tuner: " + tuner_select,
            #  "\n Hyperparameters: " + tuner.get_best_models(num_models=1)
            s,
            step=0,
        )


writer = tf.summary.create_file_writer(os.path.join("logs", run_id))
write_string_summary_v2(writer, get_summary_str(model), run_id)

Fancy progressbar, live plotting for each epoch and callbacks

In [None]:
from tqdm.keras import TqdmCallback

# muss von hand installiert werden bei einem conda env *dani

ft_learning_rate = 1e-5  # learning rate for fine tuning default: 1e-5 limits for adaptive optimizers the maximum learning rate

# Warmstarting/Ramp Up of learning rate, gets to specified learning rate after 10 epochs
def scheduler_fine_tuning_ramp_up(epoch, lr):
    if epoch > 10:
        return lr
    else:
        return lr * tf.math.exp(0.01)


def scheduler_fine_tuning_ramp_up_aggressive(epoch, lr):
    if epoch > 20:
        return lr
    else:
        return lr * tf.math.exp(0.1)


def scheduler_fine_tuning_decay(epoch, lr):
    if epoch > 10:
        return lr
    else:
        return lr * tf.math.exp(-0.01)


def scheduler_fine_tuning_decay_aggressive(epoch, lr):
    if epoch > 20:
        return lr
    else:
        return lr * tf.math.exp(-0.1)


def scheduler_fine_tuning_monitoring(epoch, lr):
    return lr


learning_rate_callback = tf.keras.callbacks.LearningRateScheduler(
    scheduler_fine_tuning_ramp_up
)
early_stop_callback = tf.keras.callbacks.EarlyStopping(
    monitor="loss", patience=10, restore_best_weights=True
)

In [None]:
# Unfreeze the base_model. Note that it keeps running in inference mode
# since we passed `training=False` when calling it. This means that
# the batchnorm layers will not update their batch statistics.
# This prevents the batchnorm layers from undoing all the training
# we've done so far.

keras.backend.clear_session()  # releases allocated memory

best_model = model_top

best_model.compile(
    # 1-e5
    optimizer=keras.optimizers.Nadam(1e-5),  # Low learning rate
    loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=["accuracy"],
)


best_model.trainable = True
best_model.summary()

In [None]:
# no early stopping
callbacks_ft = [
    tensorboard_cb,
    TqdmCallback(verbose=0),
    learning_rate_callback,
    PlotLossesKeras(),
]
# not logging tensorboard, however early stop
# callbacks_ft = [TqdmCallback(verbose=0), learning_rate_callback, early_stop_callback, PlotLossesKeras()]
# all possible callbacks
# callbacks_ft = [tensorboard_cb,TqdmCallback(verbose=0), learning_rate_callback, early_stop_callback, PlotLossesKeras()]

epochs = 20
history_fine_tuning = best_model.fit(
    train_ds, epochs=epochs, validation_data=validation_ds, callbacks=callbacks_ft
)

In [None]:
plot_cnn_performance_wo_lr(history_fine_tuning)
np.save(os.path.join(run_logdir, run_id +"_history_fine_training.npy"), history_fine_tuning.history)

with PdfPages(os.path.join(run_logdir, run_id +"history_fine_tuning"+ 'performance.pdf')) as pdf:
    pdf.savefig(plot_cnn_performance_wo_lr(history_fine_tuning), bbox_inches='tight')

Fancy plot resulting charts

In [None]:
import palettable
import matplotlib.font_manager as font_manager

plt.rc('font', family='serif', size=12)

font = font_manager.FontProperties(family='serif',
                                   style='normal', size=12)

def cm2inch(value):
        return value / 2.54

def plot_lr_loss():

    fig2, axes2 =   plt.subplots(nrows=1, ncols=1, figsize=(cm2inch(40), cm2inch(10))) #title = 'Training Analysis')

    axes2.set_prop_cycle('color', palettable.matplotlib.Plasma_4.mpl_colors)

    plt.semilogx(
    history_fine_tuning.history['lr'],
    history_fine_tuning.history['loss'],
    lw=1)


    plt.title('Learning rate vs. loss', size=16)
    plt.xlabel('Learning rate', size=14)
    plt.ylabel('Loss', size=14);

In [None]:
plot_lr_loss()

Prep for confusion matrix


In [None]:
import numpy as np
from sklearn.metrics import (
    precision_score,
    accuracy_score,
    balanced_accuracy_score,
    recall_score,
    f1_score,
    cohen_kappa_score,
    precision_recall_curve,
    PrecisionRecallDisplay,
    average_precision_score,
)
from itertools import cycle

image_batch, label_batch = test_ds.as_numpy_iterator().next()

print(image_batch)

best_model_pre_ft = model_stock()

predictions_best_model = best_model.predict(image_batch)  # .flatten()
predictions_initial_model = best_model_pre_ft.predict(image_batch)

y_pred = np.argmax(predictions_best_model, axis=-1)


accuracy = accuracy_score(label_batch, y_pred)
balance_accuracy = balanced_accuracy_score(
    label_batch, y_pred
)  # Balance accuracy for the unbalanced test dataset
kappa = cohen_kappa_score(
    label_batch, y_pred
)  # Overall accuracy of the model given the distributions of the target and predicted classes:
recall = recall_score(label_batch, y_pred, average=None)  #  True positive rate
precision = precision_score(
    label_batch, y_pred, average=None
)  # Ability of the classifier not to label as positive a sample that is negative.
f1 = f1_score(
    label_batch, y_pred, average=None
)  #  Harmonic mean of precision & recall for each class individually

array_met = np.concatenate((precision, recall))
array_met = np.concatenate((array_met, f1))
array_acc = np.array([accuracy, balance_accuracy])

statistical_metrics = np.concatenate((array_acc, array_met))

print("\n--- Performance evaluation on test --- \n")
print(f"The accuracy of the best model: {accuracy}%")
print(f"The balanced accuracy of the best model: {balance_accuracy}%")
print(f"The precision of the best model: {precision}%")
print(f"The recall of the best model: {recall}%")
print(f"The f1 score of the best model: {f1}%")


In [None]:

def plot_confusion_comparative():

    import seaborn as sns

    plt.rc("font", family="serif", size=16)
    plt.rcParams["axes.titlepad"] = 20

    confusion_mtx = tf.math.confusion_matrix(label_batch, np.argmax(predictions_best_model,axis=-1))
    confusion_mtx_initial = tf.math.confusion_matrix(label_batch, np.argmax(predictions_initial_model,axis=-1))

    fig3, axes3 = plt.subplots(nrows=1, ncols=2, figsize=(cm2inch(40), cm2inch(10)), sharey='row') #title = 'Training Analysis')

    sns.heatmap(confusion_mtx_initial, ax=axes3[0],
                xticklabels=class_names,
                yticklabels=class_names,
                annot=True, fmt='g')

    sns.heatmap(confusion_mtx, ax=axes3[1],
                xticklabels=class_names,
                yticklabels=class_names,
                annot=True, fmt='g')

    axes3[0].title.set_text('Stock Model')
    axes3[1].title.set_text('Custom Fine-Tuned Model')

    for ax in axes3.flat:
             ax.set_ylabel('Prediction')
             ax.set_xlabel('Ground Truth')

    sns.reset_orig()


def plot_test_statistical_metrics(statistical_metrics):

    plt.rcParams["axes.titlepad"] = 20
    plt.rc("font", family="serif", size=16)
    fig3, axes3 =   plt.subplots(nrows=1, ncols=1, figsize=(cm2inch(40), cm2inch(10))) #title = 'Training Analysis')

    axes3.grid(which="major")
    width = 0.7

    axes3.set_prop_cycle('color', palettable.matplotlib.Plasma_12.mpl_colors)
    for i in range(14):

        axes3.bar(i, height=statistical_metrics[i], width=width, ecolor='black', lw=3, capsize=5,
              alpha=1, label='UR 10e', zorder=3)

    plt.title('Statistical Metrics on the Test Set \n Custom Fine-Tuned Model', size=16)
    plt.xlabel(r'p = precision, r = recall, f1 = f1-score', size=16)
    plt.ylabel(r'%', size=16);

    plt.xticks(np.arange(0, 14, step=1), (
        r'accuracy', 'balanced \n accuracy', 'p dent', 'p other', 'p rim', "p scratch", "r1", "r2", "r3", "r4", "f1_1", "f1_2","f1_3", "f14"  ))

    # Achtung je nach batch kann sich die Reihenfolge von den labels ändern?
    plt.setp(axes3.get_xticklabels(), rotation=45, horizontalalignment='right')

    axes3.set_ylim(0, 1)



In [None]:
from Project_Paperwork.Legacy_Code.xception import xception_benchmark_utility
eval_test = xception_benchmark_utility.run_test_eval_10(model,test_ds)
eval_test.describe()
plot_test_statistical_metrics(statistical_metrics=xception_benchmark_utility.run_evaluation_on_test(model,test_ds))

In [None]:
def plot_test_eval_xception(eval_dataframe):
    """Plots the statistical metrics from the function run_evaluation_on_test() """

    plt.rcParams["axes.titlepad"] = 20
    plt.rc("font", family="serif", size=16)
    fig3, axes3 = plt.subplots(
        nrows=1, ncols=1, figsize=(cm2inch(40), cm2inch(10))
    )  # title = 'Training Analysis')

    axes3.grid(which="major")
    width = 0.5

    eval_dataframe_stats = eval_dataframe.describe()

    # reindexing alphabetical class name order (1. dent, 2. other, 3. rim, 4. scratch) to scratch dent rim other
    columns_to_swap = [
            "accuracy",
            "balanced accuracy",
            "cohens kappa",
            "p scratch",
            "p dent",
            "p rim",
            "p other",

            "r4",
            "r1",
            "r3",
            "r2",


            "f1_4",
            "f1_1",
            "f1_3",
            "f1_2"]


    eval_dataframe_stats = eval_dataframe_stats.reindex(columns = columns_to_swap)
    eval_dataframe_stats.columns = ["accuracy",
        "balanced accuracy",
        "cohens kappa",
        "precision scratch",
        "precision dent",
        "precision rim",
        "precision other",
        "recall scratch",
        "recall dent",
        "recall rim",
        "recall other",
        "f1 scratch",
        "f1 dent",
        "f1 rim",
        "f1 other"]

    axes3.set_prop_cycle("color", palettable.matplotlib.Plasma_12.mpl_colors)
    for i in range(15):
        axes3.bar(
            i,
            yerr = eval_dataframe_stats.iloc[2,i],
            height=eval_dataframe_stats.iloc[1,i],
            width=width,
            ecolor="black",
            lw=3,
            capsize=5,
            alpha=1,
            label="UR 10e",
            zorder=3,
        )

    plt.title("Statistical Metrics on the Test Set", size=16)
   # plt.xlabel(r"p = precision, r = recall, f1 = f1-score", size=16)
    plt.ylabel(r"%", size=16)
    #   plt.suptitle("p = precision", y=1.05, fontsize=18)



    plt.xticks(

         np.arange(0, 15, step=1),
        (list(eval_dataframe_stats.columns.values))
        ),

    # Achtung je nach batch kann sich die Reihenfolge von den labels ändern?
    plt.setp(axes3.get_xticklabels(), rotation=45, horizontalalignment="right")

    axes3.set_ylim(0)

    return eval_dataframe_stats

In [None]:
eval_stats = plot_test_eval_xception(eval_test)

In [None]:
import sklearn

def concat_tensor_and_labels():
    """Ist hier für Testzwecke um mit dem tf.Data Object die Test Peformance in um das test_ds in einem rutsch zu evaluieren """

    predictions = np.array([])
   # predictions_prob = np.array([])
    labels = np.array([])
    for x, y in test_ds:
        predictions = np.concatenate([predictions, np.argmax(model.predict(x), axis=-1)])
        labels = np.concatenate([labels, y.numpy()])

    return predictions,   labels

y_pred, labels = concat_tensor_and_labels()

In [None]:
from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_curve, auc
from sklearn.preprocessing import label_binarize


def plot_roc_curve(y_test, y_pred):
    y_test = y_test.reshape((-1, 1))
    y_pred = y_pred.reshape((-1, 1))

    n_classes = 4
    # Compute ROC curve and ROC area for each class
    fpr = dict()
    tpr = dict()
    roc_auc = dict()
    thresholds = dict()
    for i in range(n_classes):
        fpr[i], tpr[i], thresholds[i] = roc_curve(
            y_test[:, i], y_pred[:], drop_intermediate=False
        )
    roc_auc[i] = auc(fpr[i], tpr[i])

    # Compute micro-average ROC curve and ROC area
    fpr["micro"], tpr["micro"], _ = roc_curve(y_test.ravel(), y_pred.ravel())
    roc_auc["micro"] = auc(fpr["micro"], tpr["micro"])

    # First aggregate all false positive rates
    all_fpr = np.unique(np.concatenate([fpr[i] for i in range(n_classes)]))

    # Then interpolate all ROC curves at this points
    mean_tpr = np.zeros_like(all_fpr)
    for i in range(n_classes):
        mean_tpr += np.interp(all_fpr, fpr[i], tpr[i])

    # Finally average it and compute AUC
    mean_tpr /= n_classes

    fpr["macro"] = all_fpr
    tpr["macro"] = mean_tpr
    roc_auc["macro"] = auc(fpr["macro"], tpr["macro"])

    # Plot all ROC curves
    # plt.figure(figsize=(10,5))
    plt.figure(dpi=600)
    lw = 2
    plt.plot(
        fpr["micro"],
        tpr["micro"],
        label="micro-average ROC curve (area = {0:0.2f})".format(roc_auc["micro"]),
        color="deeppink",
        linestyle=":",
        linewidth=4,
    )

    plt.plot(
        fpr["macro"],
        tpr["macro"],
        label="macro-average ROC curve (area = {0:0.2f})".format(roc_auc["macro"]),
        color="navy",
        linestyle=":",
        linewidth=4,
    )

    colors = cycle(["aqua", "darkorange", "darkgreen", "yellow"])
    for i, color in zip(range(n_classes), colors):
        plt.plot(
            fpr[i],
            tpr[i],
            color=color,
            lw=lw,
            label="ROC curve of class {0} (area = {1:0.2f})".format(i, roc_auc[i]),
        )

    plt.plot([0, 1], [0, 1], "k--", lw=lw)
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel("False Positive Rate")
    plt.ylabel("True Positive Rate")
    plt.title("Receiver Operating Characteristic (ROC) curve")
    plt.legend()

In [None]:
print(labels.shape)
print(y_pred.shape)
print(labels.reshape((-1, 1)).shape)
plot_roc_curve(labels, y_pred)

In [None]:
def plot_roc_curve(y_test, y_pred):

  n_classes = len(np.unique(y_test))
  y_test = label_binarize(y_test, classes=np.arange(n_classes))
  y_pred = label_binarize(y_pred, classes=np.arange(n_classes))

  # Compute ROC curve and ROC area for each class
  fpr = dict()
  tpr = dict()
  roc_auc = dict()
  for i in range(n_classes):
    fpr[i], tpr[i], _ = roc_curve(y_test[:, i], y_pred[:, i])
    roc_auc[i] = auc(fpr[i], tpr[i])

  # Compute micro-average ROC curve and ROC area
  fpr["micro"], tpr["micro"], _ = roc_curve(y_test.ravel(), y_pred.ravel())
  roc_auc["micro"] = auc(fpr["micro"], tpr["micro"])

  # First aggregate all false positive rates
  all_fpr = np.unique(np.concatenate([fpr[i] for i in range(n_classes)]))

  # Then interpolate all ROC curves at this points
  mean_tpr = np.zeros_like(all_fpr)
  for i in range(n_classes):
    mean_tpr += np.interp(all_fpr, fpr[i], tpr[i])

  # Finally average it and compute AUC
  mean_tpr /= n_classes

  fpr["macro"] = all_fpr
  tpr["macro"] = mean_tpr
  roc_auc["macro"] = auc(fpr["macro"], tpr["macro"])

  # Plot all ROC curves
  plt.figure(figsize=(cm2inch(50),cm2inch(15)))
  plt.figure(dpi=300)
  lw = 2
  plt.plot(fpr["micro"], tpr["micro"],
    label="micro-average ROC curve (area = {0:0.2f})".format(roc_auc["micro"]),
    color="deeppink", linestyle=":", linewidth=4,)

  plt.plot(fpr["macro"], tpr["macro"],
    label="macro-average ROC curve (area = {0:0.2f})".format(roc_auc["macro"]),
    color="navy", linestyle=":", linewidth=4,)

  colors = cycle(["aqua", "darkorange", "darkgreen", "yellow", "blue"])
  for i, color in zip(range(n_classes), colors):
    plt.plot(fpr[i], tpr[i], color=color, lw=lw,
        label="ROC curve of class {0} (area = {1:0.2f})".format(i, roc_auc[i]),)

  plt.plot([0, 1], [0, 1], "k--", lw=lw)
  plt.xlim([0.0, 1.0])
  plt.ylim([0.0, 1.05])
  plt.xlabel("False Positive Rate")
  plt.ylabel("True Positive Rate")
  plt.title("Receiver Operating Characteristic (ROC) curve")
  plt.legend()

In [None]:
plot_roc_curve(labels,y_pred)