In [None]:
"""
EVALUIERUNG VON XAI-METHODEN MITTELS DER "POINTING GAME"-ANALYSE

Dieses Skript implementiert die "Pointing Game"-Analyse, eine Methode zur Bewertung der
Lokalisierungsgenauigkeit von XAI-Methoden. Das Ziel ist es zu überprüfen, ob der von einer XAI-Methode als am
relevantesten identifizierte Bildpunkt tatsächlich auf dem zu klassifizierenden Objekt liegt.

Der Prozess umfasst folgende Schritte:
1.  **Laden von Modell und Daten:** Ein vortrainiertes VGG16-Modell sowie ein Testdatensatz
    (z.B. ImageNette) werden geladen.
2.  **Generierung von Erklärungen:** Für jedes Bild im Testdatensatz werden mithilfe der
    "innvestigate"-Bibliothek Relevanz-Heatmaps (Erklärungen) für eine Auswahl an XAI-Methoden
    (z.B. Gradient, LRP, Guided Backprop) berechnet.
3.  **Laden von Segmentierungsmasken:** Für jedes Testbild wird die zugehörige Ground-Truth-
    Segmentierungsmaske geladen, die die exakte Position des Objekts im Bild markiert.
4.  **Durchführung des "Pointing Game":** Für jede generierte Heatmap wird der Punkt mit der höchsten
    Relevanz ermittelt (argmax). Anschließend wird überprüft, ob die Koordinaten dieses Punktes
    innerhalb der Segmentierungsmaske des Objekts liegen.
5.  **Berechnung des Scores:** Ein Treffer ("Hit") wird gezählt, wenn der relevanteste Punkt auf dem Objekt
    liegt. Der finale "Pointing Game Score" für jede XAI-Methode ist der prozentuale Anteil der
    Treffer an der Gesamtzahl der Testbilder.
6.  **Ergebnisauswertung:** Die Scores werden tabellarisch ausgegeben, um die XAI-Methoden hinsichtlich
    ihrer Fähigkeit, das korrekte Objekt zu lokalisieren, quantitativ zu vergleichen.

"""

In [None]:
import imp
import os
import sys
import cv2

from types import SimpleNamespace
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
import tensorflow_datasets as tfds
import tensorflow.keras as keras
from tensorflow.keras import models
import tensorflow.keras.backend
from tensorflow.keras.applications import VGG16
from tensorflow.keras.applications.vgg16 import preprocess_input
from tensorflow.keras.optimizers import Adam

import tensorflow.compat.v1 as tf1
tf1.reset_default_graph()
tf.compat.v1.disable_eager_execution() #must have für innvestigate weil für tf1 gebaut.

#laden der lib lokal für eigene anpassungen, sonst kann library von pip install genutzt.
example_utils_path = "/mnt/data/environments/innvestigate/examples/"
example_utils_path = os.path.abspath(example_utils_path)

if example_utils_path not in sys.path:
    sys.path.append(example_utils_path)

dev_path = "/mnt/data/environments/innvestigate/src/"

if dev_path not in sys.path:
    sys.path.insert(0, dev_path) # Fügt Ihren Entwicklungspfad an den Anfang der Liste ein

#innvestigate imports
import innvestigate
import innvestigate.tools.perturbate
import innvestigate.utils as iutils
#examples utils1 help funcitons
import utils1 as eutils
import utils1.imagenet as imagenetutils
from innvestigate.tools.perturbate import Perturbation, PerturbationAnalysis

#VRAM Mem growth
physical_gpus = tf.config.list_physical_devices('GPU')
if physical_gpus:
    tf.config.experimental.set_memory_growth(physical_gpus[0], True)

print(f"sys.path[0]: {sys.path[0]}") # Sollte jetzt Ihr dev_path sein
print(f"Geladenes innvestigate Paket: {innvestigate.__file__}")
print(f"Geladene perturbate.py: {innvestigate.tools.perturbate.__file__}")


In [None]:
MODEL_PATH = './models/VGG16 MODELS/vgg16_imagenet_a96_va63_l150_vl278.h5'
#MODEL_PATH = './models/vgg16_cifar10_a87_va85_l378_vl448.h5'
#MODEL_PATH = './models/VGG16 MODELS/vgg16_oxfordpets_a69_va60_l1310_vl1920.h5'
#MODEL_PATH = './models/VGG16 MODELS/vgg16_caltech101_a68_va56_l1277_vl2056.h5'
#MODEL_PATH = './models/vgg16_sun397_a79_va58_l761_vl1768.h5'

model = tf.compat.v1.keras.models.load_model(MODEL_PATH, compile=False)

model.compile(optimizer=Adam(learning_rate=0.0001), loss='categorical_crossentropy', metrics=['accuracy'])#learning rate should be set to 0.0001 as models are trained that way


In [None]:
# Load dataset
(ds_test, ds_train), ds_info = tfds.load(
    'imagenette/full-size-v2',
    split=['validation', 'train'],
    shuffle_files=False,
    as_supervised=True,
    with_info=True,
    data_dir='/mnt/data/datasets'
)

def preprocess_vgg(image, label):
    # Resize to VGG16's expected input size
    image = tf.image.resize(image, [224, 224])
    # Apply VGG16 preprocessing (convert to float32 + mean subtraction)
    image = preprocess_input(image)
    return image, label

def one_hot_encode_labels(image, label):
    # Apply one-hot encoding to the label
    label = tf.one_hot(label, 10)
    return image, label

BATCH_SIZE = 256
#ds_train = ds_train.take(1000)
ds_test = ds_test.take(800)


# Apply preprocessing to both datasets
ds_train = ds_train.map(preprocess_vgg)
ds_train = ds_train.shuffle(buffer_size=1000)
ds_train = ds_train.batch(BATCH_SIZE)
ds_train = ds_train.prefetch(buffer_size=tf.data.AUTOTUNE)

ds_test = ds_test.map(preprocess_vgg,deterministic=True)
ds_test = ds_test.batch(BATCH_SIZE)
ds_test = ds_test.prefetch(buffer_size=tf.data.AUTOTUNE)

# Apply one-hot encoding within the data pipeline
ds_train = ds_train.map(one_hot_encode_labels)
ds_test = ds_test.map(one_hot_encode_labels)

# Convert TF Dataset to NumPy arrays by iterating and concatenating batches
train_images_list = []
train_labels_list = []

# Iterate through the dataset and append batches to the lists, hier übergebe ich ob train oder validation data für die validierung/perturbationsanalyse benutzt werden soll.
# if ds_train, dann subset zuerst davon erstellen.
for images, labels in tfds.as_numpy(ds_test):
    train_images_list.append(images)
    train_labels_list.append(labels)

# Concatenate the batches into single NumPy arrays
train_images = np.concatenate(train_images_list, axis=0)
train_labels = np.concatenate(train_labels_list, axis=0)

#test_sample für perturbationsanalyse
test_sample = np.copy(train_images[0:1])
print("Test sample shape: {}".format(test_sample.shape))
print("generator variables data type:",(test_sample.dtype))

#check ob tensor alle richtig.
print("Train images shape:", train_images.shape)  # Should be (N, 224, 224, 3)
print("Train images dtype:", train_images.dtype)  # Should be float32
print("Train labels shape:", train_labels.shape)  # Should be (N,)
print("Train labels dtype:", train_labels.dtype)  # Should be float32


In [None]:
generator = iutils.sequence.BatchSequence([train_images, train_labels], batch_size=16)

perturbation_function = "uniform"
region_shape = (56, 56)
steps = 16
regions_per_step = 1  # Perturbate 1 region per step

# set postprocessing parameter
channels_first = keras.backend.image_data_format() == "channels_first"
color_conversion = "BGRtoRGB"  # keras.applications use BGR format

# VGG16 preprocess_input erwartet Eingaben im [0, 255] Bereich (RGB),
# konvertiert intern zu BGR und subtrahiert Mittelwerte [103.939, 116.779, 123.68] (BGR)
VGG_MEANS_BGR = np.array([103.939, 116.779, 123.68]) # B, G, R

def revert_preprocessing_vgg(X_preprocessed_bgr):
    X_bgr = X_preprocessed_bgr + VGG_MEANS_BGR
    X_rgb = X_bgr[..., ::-1] # Kanalreihenfolge umkehren (BGR zu RGB)
    return X_rgb

#vgg revert postprocessing scale to
def input_postprocessing_vgg(X_vgg_preprocessed):
    X_reverted_to_0_255_rgb = revert_preprocessing_vgg(X_vgg_preprocessed)
    X_reverted_to_0_255_rgb = np.clip(X_reverted_to_0_255_rgb, 0, 255)
    return X_reverted_to_0_255_rgb / 255.0

input_range = (-128, 128)  # format used by keras.applications
noise_scale = (input_range[1] - input_range[0]) * 0.1

ri = input_range[0]  # reference input

# Configure analysis methods and properties
# fmt: off
methods = [
    # NAME                    OPT.PARAMS                POSTPROC FXN            TITLE
    # Show input
    ("input",               {},                         input_postprocessing_vgg,   "Input"),
    # Random
    ("random",              {},                         imagenetutils.graymap,     "Random"),
    # Function
    ("gradient",            {"postprocess": "abs"},     imagenetutils.graymap,     "Gradient"),
    ("smoothgrad",
                            {"noise_scale": noise_scale,
                             "postprocess": "square"},  imagenetutils.graymap,     "SmoothGrad"),
    # Signal
    ("deconvnet",           {},                         imagenetutils.bk_proj,     "Deconvnet"),
    ("guided_backprop",     {},                         imagenetutils.bk_proj,     "Guided Backprop"),
    # Interaction
    ("deep_taylor.bounded", {"low": input_range[0],
                             "high": input_range[1]},   imagenetutils.heatmap,     "DeepTaylor"),
    ("input_t_gradient",    {},                         imagenetutils.heatmap,     "Input * Gradient"),
    ("integrated_gradients",{"reference_inputs": ri},   imagenetutils.heatmap,     "Integrated Gradients"),
    ("lrp.z",               {},                         imagenetutils.heatmap,     "LRP-Z"),
    ("lrp.epsilon",         {"epsilon": 1},             imagenetutils.heatmap,     "LRP-Epsilon"),
]
# fmt: on

# Select methods of your choice
selected_methods_indices = [2,3,4,5,6,7,8,9,10]
selected_methods = [methods[i] for i in selected_methods_indices]
print(f'Using method(s) "{[method[0] for method in selected_methods]}".')

In [None]:
# Create model without softmax
model_wo_softmax = innvestigate.model_wo_softmax(model)

analyzers = []
for method in selected_methods:
    try:
        analyzer = innvestigate.create_analyzer(
            method[0],          # analysis method identifier
            model_wo_softmax,   # model without softmax output
            **method[1]
        )                     # optional analysis parameters
    except innvestigate.NotAnalyzeableModelException:
        # Not all methods work with all models.
        analyzer = None
    analyzers.append(analyzer)

print("Initial analyzers list created.")

# wenn random ausgewählt ist, find the index of the 'random' method in your list
method_names = [m[0] for m in selected_methods]
try:
    random_method_index = method_names.index("random")

    # Create a very simple object that just has an .analyze() method
    correct_random_analyzer = SimpleNamespace(
        analyze=lambda x: np.random.rand(*x.shape)
    )

    # It replaces the broken analyzer in your list with the correct one.
    print("Applying simple fix for 'random' analyzer...")
    analyzers[random_method_index] = correct_random_analyzer

except ValueError:
    # This will run if 'random' is not in your selected_methods list
    print("'random' method not found, no fix applied.")


In [None]:
all_explanations = {method[0]: [] for method in selected_methods}
for method, analyzer in zip(selected_methods, analyzers):
    print(f"Generiere 800 Erklärungen für die Methode: {method[0]}...")

    # Gehe durch jedes der 800 Testbilder
    for image in train_images:
        # Das Modell erwartet einen "Batch" von Bildern, auch wenn es nur eines ist.
        # Wir fügen daher eine Dimension hinzu: (224, 224, 3) -> (1, 224, 224, 3)
        image_with_batch_dim = image[None, :, :, :]

        # Berechne die Erklärung für das einzelne Bild
        explanation = analyzer.analyze(image_with_batch_dim)

        # Entferne die Batch-Dimension wieder und speichere das Array
        # (1, 224, 224, 3) -> (224, 224, 3)
        all_explanations[method[0]].append(explanation.squeeze())

    # Optional: Wandle die Liste von Arrays in ein einziges großes NumPy-Array um
    all_explanations[method[0]] = np.array(all_explanations[method[0]])

In [None]:
mask_filename = '/mnt/data/masken/imagenette_test.npy'
final_masks_array = None
try:
    original_masks = np.load(mask_filename, allow_pickle=True)
    print(f" Original-Masken-Datei '{mask_filename}' geladen.")

    target_size = (224, 224)
    resized_masks = [cv2.resize(mask.astype(np.uint8), target_size, interpolation=cv2.INTER_NEAREST) for mask in original_masks]
    final_masks_array = np.array(resized_masks, dtype=np.uint8)
    print(f"Skalierung abgeschlossen. Neues Masken-Array hat die Form: {final_masks_array.shape}")
except Exception as e:
    print(f"FEHLER beim Laden der Masken: {e}")

if final_masks_array is not None and 'all_explanations' in locals():
    pointing_game_scores = {}
    print("\n--- Starte Pointing Game Auswertung ---")

    for method_name, explanations in all_explanations.items():
        hit_counter = 0
        total_images = len(explanations)
        print(f"\n--- Methode: {method_name} ---")

        for i in range(total_images):
            explanation_map_color = explanations[i]
            segmentation_mask = final_masks_array[i]

            if explanation_map_color.ndim == 3 and explanation_map_color.shape[-1] > 1:
                explanation_map_gray = np.mean(explanation_map_color, axis=-1)
            else:
                explanation_map_gray = np.squeeze(explanation_map_color)

            argmax_flat = np.argmax(explanation_map_gray)
            y, x = np.unravel_index(argmax_flat, explanation_map_gray.shape)

            is_hit = segmentation_mask[y, x] == 1
            if is_hit:
                hit_counter += 1

        pg_score = (hit_counter / total_images) * 100
        pointing_game_scores[method_name] = (pg_score, hit_counter, total_images)
        print(f"Ergebnis für '{method_name}': {hit_counter} von {total_images} Treffern => PG-Score: {pg_score:.2f}%")


    # Sortiere nach dem Score, der an Position 0 des Wert-Tupels ist
    sorted_scores = sorted(pointing_game_scores.items(), key=lambda item: item[1][0], reverse=True)

    # Gib eine Kopfzeile aus (wird nicht mitkopiert)
    print("Methode;Hits;Total;Score (%)")
    print("-" * 40) # Visuelle Trennlinie

    # Gib die Datenzeilen im CSV-Format (mit Semikolon) aus
    for method, (score, hits, total) in sorted_scores:
        # Formatiere den Score mit Komma als Dezimaltrennzeichen
        score_german_format = f"{score:.2f}".replace('.', ',')

        # Gib die Zeile mit Semikolon als Trenner aus
        print(f"{method};{hits};{total};{score_german_format}")

elif 'all_explanations' not in locals():
     print("\nVariable 'all_explanations' nicht gefunden.")
else:
     print("\nMasken konnten nicht geladen werden, Auswertung wird übersprungen.")

In [None]:
#Hilfsfunktion zum überprüfen ob die richtigen Masken geladen wurden.
try:
    ds_plot = ds_test.take(10)
    # Deine funktionierende Methode zum Extrahieren der Daten
    for images, labels in tfds.as_numpy(ds_plot):
        original_images_for_plotting = images
        break

    print(f"{len(original_images_for_plotting)} Originalbilder nach deiner Methode geladen und einfach verarbeitet.")

    #masken laden aus npy datei
    mask_filename = '/mnt/data/masken/caltech101_test.npy'
    original_masks = np.load(mask_filename, allow_pickle=True)

    target_size = (224, 224)
    resized_masks_for_plotting = []

    for mask in original_masks[:10]:
        mask_uint8 = mask.astype(np.uint8)
        resized_mask = cv2.resize(mask_uint8, target_size, interpolation=cv2.INTER_NEAREST)
        resized_masks_for_plotting.append(resized_mask)

    final_10_masks = np.array(resized_masks_for_plotting)
    print(f"{len(final_10_masks)} Masken geladen und skaliert.")

    print("Erstelle den Plot...")
    num_examples = 10
    fig, axes = plt.subplots(num_examples, 2, figsize=(8, 40))
    fig.suptitle("Finaler Abgleich: Originalbild vs. Maske", fontsize=16, y=1.0)

    for i in range(num_examples):
        ax1 = axes[i, 0]
        ax1.imshow(original_images_for_plotting[i])
        ax1.set_title(f"Originalbild #{i}")
        ax1.axis('off')

        ax2 = axes[i, 1]
        ax2.imshow(final_10_masks[i], cmap='gray')
        ax2.set_title(f"Skalierte Maske #{i}")
        ax2.axis('off')

    plt.tight_layout(rect=[0, 0, 1, 0.99])
    output_filename = 'final_image_mask_comparison.png'
    plt.savefig(output_filename)
    plt.close()

    print(f"\nPlot wurde erfolgreich als '{output_filename}' gespeichert.")

except Exception as e:
    print(f"\nEin Fehler ist aufgetreten: {e}")