In [1]:
import os
import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.metrics import average_precision_score
import matplotlib.pyplot as plt

In [2]:
# Optional block, class satisfied earlier
# MAPCallback definition (as before, needed for loading the x.keras model)
class MAPCallback(tf.keras.callbacks.Callback):
    def __init__(self, train_data, val_data):
        super().__init__()
        self.tr_ds  = train_data
        self.val_ds = val_data
    def _compute_map(self, ds):
        y_true, y_prob = [], []
        for bx, by in ds:
            # Ensure bx is a single tensor or list of tensors usable by model.predict
            # In this case, bx is likely a batch of images, so it's fine.
            y_prob.append(self.model.predict(bx, verbose=0))
            # Convert TensorFlow Eager Tensor to NumPy array for y_true
            if isinstance(by, tf.Tensor):
                 y_true.append(by.numpy())
            else: # Handle other potential data types if necessary
                 y_true.append(by)
        # Check if y_true and y_prob have data before vstack and calculation
        if not y_true or not y_prob:
             print("Warning: No data to compute mAP for the given dataset.")
             return 0.0 # Or raise an error, depending on desired behavior

        return average_precision_score(
            np.vstack(y_true), np.vstack(y_prob), average="macro")
    def on_epoch_end(self, epoch, logs=None):
        logs = logs or {}
        # Note: _compute_map on large datasets inside epoch end can be slow
        # and might not be the best practice depending on the use case.
        # For this example, keeping it as in the original.
        # Also, the attack training loop doesn't use this callback anyway.
        logs["mAP"]     = self._compute_map(self.tr_ds)
        logs["val_mAP"] = self._compute_map(self.val_ds)
        print(f"\ntrain mAP: {logs['mAP']:.4f}  "
              f"val mAP: {logs['val_mAP']:.4f}")

In [3]:
# Optional block, no need import model in production
print("Loading pre-trained classifier model 'x.keras'...")
try:
    classifier = tf.keras.models.load_model(
        # '/kaggle/input/x/keras/default/1/x.keras',
        'x.keras',
        custom_objects={'MAPCallback': MAPCallback, 'BinaryFocalCrossentropy': tf.keras.losses.BinaryFocalCrossentropy}
    )
    # Set the classifier to non-trainable as it's our fixed target
    classifier.trainable = False
    print("Classifier loaded successfully and set to non-trainable.")
except Exception as e:
    print(f"Error loading classifier model 'x.keras': {e}")
    # Assuming necessary for the rest of the script
    exit()

Loading pre-trained classifier model 'x.keras'...
Classifier loaded successfully and set to non-trainable.


In [None]:
# --- Configuration (Remains mostly the same, maybe structured as a dict/object later) ---
# Optional, defined earlier or in other method
labels = [
    "aeroplane", "bicycle", "bird", "boat", "bottle", "bus", "car", "cat",
    "chair", "cow", "diningtable", "dog", "horse", "motorbike", "person",
    "pottedplant", "sheep", "sofa", "train", "tvmonitor"
]
# train_dir = '/kaggle/input/kul-computer-vision-ga-2-2025/train/'
train_dir = 'kul-computer-vision-ga-2-2025/train/'
image_dir = train_dir + 'img/'
answer_fpath = train_dir + 'train_set.csv'

# Optional, load true labels
print(f"Loading true labels from {answer_fpath}...")
try:
    true_labels_df = pd.read_csv(answer_fpath)
    true_labels_df.set_index('Id', inplace=True)
    true_labels_df = true_labels_df[labels] # Ensure labels list is defined
    print(f"Loaded true labels for {len(true_labels_df)} images.")
except FileNotFoundError:
    print(f"Error: True labels file not found at {answer_fpath}")
    exit()
except Exception as e:
    print(f"Error loading true labels CSV: {e}")
    exit()

# fixed parameters
TARGET_HEIGHT = 300
TARGET_WIDTH = 300
TARGET_CHANNELS = 3 # RGB3, gray-scale1


# Adversarial specific config
PREDICTION_THRESHOLD = 0.5
EPSILON = 0.04      # 扰动的最大幅度 (L-infinity norm)
GENERATOR_LEARNING_RATE = 1e-4
ADVERSARIAL_TRAINING_EPOCHS = 20 # 训练生成器的轮数
ADVERSARIAL_BATCH_SIZE = 16    # 训练生成器的批大小
DATASET_SHUFFLE_BUFFER = 1000 # A reasonable shuffle buffer size

Loading true labels from /kaggle/input/kul-computer-vision-ga-2-2025/train/train_set.csv...
Error: True labels file not found at /kaggle/input/kul-computer-vision-ga-2-2025/train/train_set.csv


: 

In [None]:
# --- Configuration (Remains mostly the same, maybe structured as a dict/object later) ---
# Optional, defined earlier or in other method
labels = [
    "aeroplane", "bicycle", "bird", "boat", "bottle", "bus", "car", "cat",
    "chair", "cow", "diningtable", "dog", "horse", "motorbike", "person",
    "pottedplant", "sheep", "sofa", "train", "tvmonitor"
]
train_dir = '/kaggle/input/kul-computer-vision-ga-2-2025/train/'
image_dir = train_dir + 'img/'
answer_fpath = train_dir + 'train_set.csv'

# Optional, load true labels
print(f"Loading true labels from {answer_fpath}...")
try:
    true_labels_df = pd.read_csv(answer_fpath)
    true_labels_df.set_index('Id', inplace=True)
    true_labels_df = true_labels_df[labels] # Ensure labels list is defined
    print(f"Loaded true labels for {len(true_labels_df)} images.")
except FileNotFoundError:
    print(f"Error: True labels file not found at {answer_fpath}")
    exit()
except Exception as e:
    print(f"Error loading true labels CSV: {e}")
    exit()

# fixed parameters
TARGET_HEIGHT = 300
TARGET_WIDTH = 300
TARGET_CHANNELS = 3 # RGB3, gray-scale1


# Adversarial specific config
PREDICTION_THRESHOLD = 0.5
EPSILON = 0.04      # 扰动的最大幅度 (L-infinity norm)
GENERATOR_LEARNING_RATE = 1e-4
ADVERSARIAL_TRAINING_EPOCHS = 20 # 训练生成器的轮数
ADVERSARIAL_BATCH_SIZE = 16    # 训练生成器的批大小
DATASET_SHUFFLE_BUFFER = 1000 # A reasonable shuffle buffer size

In [None]:
class AdversarialGenerator(tf.keras.Model):
    def __init__(self, input_shape, num_channels_output, epsilon):
        super().__init__()
        self.epsilon = epsilon
        self.conv1 = tf.keras.layers.Conv2D(64, (3, 3), padding='same', activation='relu')
        self.conv2 = tf.keras.layers.Conv2D(64, (3, 3), padding='same', activation='relu')
        self.conv_output = tf.keras.layers.Conv2D(num_channels_output, (3, 3), padding='same', activation='tanh')
        # Lambda layer to scale the tanh output by epsilon
        # This layer scales the perturbation base [-1, 1] to [-epsilon, epsilon]
        self.epsilon_scaler = tf.keras.layers.Lambda(lambda t: t * self.epsilon)

        self.build((None,) + input_shape)

    @tf.function
    def call(self, inputs, training=False):
        x = self.conv1(inputs)
        x = self.conv2(x)
        perturbation_base = self.conv_output(x)
        # Apply the epsilon scaling
        perturbation = self.epsilon_scaler(perturbation_base)
        return perturbation

In [None]:
# Optional, preprocessor
class ImageDataProcessor:
    def __init__(self, image_dir, true_labels_df, labels, target_height, target_width, target_channels):
        self.image_dir = image_dir
        self.true_labels_df = true_labels_df
        self.labels = labels
        self.target_height = target_height
        self.target_width = target_width
        self.target_channels = target_channels

        # Internal lists to store processed data and corresponding IDs/paths
        self._processed_images = None
        self._processed_labels = None
        self._processed_ids = None
        self._relevant_files_info = None # Stores (id, path, label_array) tuples

    def _find_relevant_files(self):
        """Finds image files that have corresponding true labels."""
        if self._relevant_files_info is not None:
            return self._relevant_files_info # Return cached info if available

        relevant_files_info = []
        print(f"Searching for relevant image files in {self.image_dir}...")
        # Iterate through files in the image directory
        for filename in sorted(os.listdir(self.image_dir)):
            # Check if filename matches the pattern train_ID.npy
            if filename.startswith("train_") and filename.endswith(".npy"):
                # Extract ID
                image_id_str = filename[len("train_"):-len(".npy")]
                try:
                    image_id_int = int(image_id_str)
                except ValueError:
                    # Skip if ID is not a valid integer
                    continue
                # Check if the ID exists in the true labels DataFrame
                if image_id_int in self.true_labels_df.index:
                    file_path = os.path.join(self.image_dir, filename)
                    # Get the corresponding true label array
                    label_val = self.true_labels_df.loc[image_id_int, self.labels].values.astype(np.float32)
                    relevant_files_info.append((image_id_int, file_path, label_val))

        if not relevant_files_info:
            print("Warning: No relevant image files with matching labels found.")
        else:
             print(f"Found {len(relevant_files_info)} relevant image files.")

        self._relevant_files_info = relevant_files_info # Cache the result
        return relevant_files_info

    def _load_and_preprocess_single_image(self, file_path):
        """Loads and preprocesses a single .npy image file."""
        try:
            # Load numpy data from the file path
            img_data = np.load(file_path)
        except Exception as e:
            # Print error and return None if loading fails
            print(f"Error loading .npy file {file_path}: {e}")
            return None

        # Handle potential extra batch dimension (shape [1, H, W, C])
        if img_data.ndim == 4 and img_data.shape[0] == 1:
            img_data = img_data[0]
        # Check if the dimensions are correct (H, W, C)
        elif img_data.ndim != 3:
            print(f"Skipping {file_path}: Unexpected data dimension {img_data.ndim}. Expected 3 (H,W,C) or 4 (1,H,W,C).")
            return None

        # Convert to TensorFlow Tensor and resize to target dimensions
        # Use method='area' or 'bilinear' depending on preference, 'area' is often good for downsampling
        img_data_resized = tf.image.resize(img_data, [self.target_height, self.target_width], method='area')

        # Handle channel mismatch if target_channels is specified
        current_channels = img_data_resized.shape[-1]
        if current_channels != self.target_channels:
            # Convert grayscale (1 channel) to RGB (3 channels) if needed
            if current_channels == 1 and self.target_channels == 3:
                img_data_resized = tf.image.grayscale_to_rgb(img_data_resized)
            # Convert RGB (3 channels) to grayscale (1 channel) if needed
            elif self.target_channels == 1 and current_channels == 3:
                 img_data_resized = tf.image.rgb_to_grayscale(img_data_resized)
            else:
                # Skip if channel conversion is not supported or possible
                print(f"Skipping {file_path}: Channel mismatch. Processed {current_channels} vs Target {self.target_channels}.")
                return None

        # Cast to float32 type for consistency with model input
        img_data_resized = tf.cast(img_data_resized, tf.float32)

        # Normalize pixel values if they are in the 0-255 range
        max_val = tf.reduce_max(img_data_resized)
        # Check if max_val is significantly greater than 1.0
        if max_val > 1.0 + 1e-6:
            min_val = tf.reduce_min(img_data_resized)
            # Assume 0-255 range if min is near 0 and max is near 255
            if min_val >= 0 and max_val <= 255.0 + 1e-6 :
                 img_data_resized = img_data_resized / 255.0
            # Optional: Add warnings or other normalization logic for different ranges
            # else:
            #     print(f"Warning: Image {file_path} has max value {max_val} but not clearly in 0-255 range. Check normalization.")

        # Clip values to the [0.0, 1.0] range after normalization
        img_data_resized = tf.clip_by_value(img_data_resized, 0.0, 1.0)

        # Return the processed image tensor
        return img_data_resized

    def process_all_relevant_data(self):
        """Loads and preprocesses all relevant images and stores them."""
        if self._processed_images is not None:
            print("Using cached processed data.")
            # Return cached data
            return self._processed_images, self._processed_labels, self._processed_ids

        print("Starting to process all relevant images...")
        # Find the list of relevant files first
        relevant_files = self._find_relevant_files()

        # Initialize lists to store processed data
        processed_images = []
        processed_labels = []
        processed_ids = []

        # Iterate through the relevant files list
        for image_id, file_path, label_val in relevant_files:
            # Load and preprocess the single image
            img = self._load_and_preprocess_single_image(file_path)
            # Check if processing was successful and the shape matches the target
            if img is not None and img.shape[-3:] == (self.target_height, self.target_width, self.target_channels):
                # Append to the lists
                processed_images.append(img)
                processed_labels.append(label_val)
                processed_ids.append(image_id)
            elif img is not None:
                 # Print warning for shape mismatch after processing
                 print(f"Skipping image {file_path} due to shape mismatch after processing: {img.shape} vs {(self.target_height, self.target_width, self.target_channels)}")

        # Check if any images were successfully processed
        if not processed_images:
            print("Error: No images could be successfully processed.")
            # Consider raising an exception here if no data is critical
            self._processed_images, self._processed_labels, self._processed_ids = [], [], []
            return [], [], [] # Return empty lists
        else:
            print(f"Successfully processed {len(processed_images)} images.")
            # Store the processed data
            # Convert lists of Tensors/Arrays to NumPy arrays for tf.data.Dataset.from_tensor_slices efficiency
            self._processed_images = np.array(processed_images)
            self._processed_labels = np.array(processed_labels)
            self._processed_ids = processed_ids # Keep IDs as a list

            return self._processed_images, self._processed_labels, self._processed_ids

    def get_training_dataset(self, batch_size, shuffle_buffer_size=None):
        """Creates and returns a TensorFlow Dataset for training."""
        # Ensure data is processed before creating the dataset
        images, labels, _ = self.process_all_relevant_data()

        if not images.size: # Check if the numpy array is not empty
             print("Cannot create dataset: No processed data available.")
             return None

        # Create dataset from the processed NumPy arrays
        dataset = tf.data.Dataset.from_tensor_slices((images, labels))

        # Apply shuffling if buffer size is specified
        if shuffle_buffer_size is not None:
            dataset = dataset.shuffle(buffer_size=shuffle_buffer_size)

        # Apply batching and prefetching
        dataset = dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)

        return dataset

In [None]:
class AdversarialAttackExecutor:
    def __init__(self, classifier_model, generator_model, optimizer_g, loss_fn, epsilon, prediction_threshold):
        self.classifier = classifier_model
        self.generator = generator_model
        self.optimizer_g = optimizer_g
        self.loss_fn = loss_fn # Should be a loss function compatible with classifier output and true labels
        self.epsilon = epsilon # Store epsilon for reference, generator already uses it
        self.prediction_threshold = prediction_threshold

        # Ensure classifier is not trainable
        self.classifier.trainable = False

    def train_generator(self, train_dataset, epochs):
        """Trains the adversarial generator."""
        if train_dataset is None:
            print("Error: Training dataset is not available. Cannot train generator.")
            return

        print("\nStarting adversarial training for the generator...")
        # Iterate through training epochs
        for epoch in range(epochs):
            print(f"Epoch {epoch+1}/{epochs}")
            epoch_g_loss = 0.0 # Initialize total generator loss for this epoch
            num_batches = 0   # Initialize batch counter

            # Iterate through batches in the training dataset
            for original_images_batch, true_labels_batch in train_dataset:
                # Use tf.GradientTape to record operations for automatic differentiation
                with tf.GradientTape() as tape:
                    # Generate perturbations using the generator model
                    # training=True is passed but might not change behavior for this generator structure
                    perturbations = self.generator(original_images_batch, training=True)

                    # Create adversarial images by adding perturbations to original images
                    adversarial_images = original_images_batch + perturbations
                    # Clip the adversarial images to the valid range [0.0, 1.0]
                    # This ensures pixels stay within image boundaries
                    adversarial_images = tf.clip_by_value(adversarial_images, 0.0, 1.0)

                    # Get predictions from the fixed classifier on the adversarial images
                    # training=False because the classifier is not being trained
                    predictions_adversarial = self.classifier(adversarial_images, training=False)

                    # Calculate the generator's loss
                    # The generator wants to MAXIMIZE the classifier's loss
                    # (i.e., make the classifier predict incorrectly)
                    # So, the generator's loss is the NEGATIVE of the classifier's loss
                    # This way, minimizing generator loss corresponds to maximizing classifier loss
                    # Ensure predictions_adversarial and true_labels_batch are compatible with self.loss_fn
                    adv_loss = -self.loss_fn(true_labels_batch, predictions_adversarial)

                # Compute gradients of the adversarial loss with respect to the generator's trainable variables
                gradients_g = tape.gradient(adv_loss, self.generator.trainable_variables)

                # Apply the gradients to update the generator's weights
                self.optimizer_g.apply_gradients(zip(gradients_g, self.generator.trainable_variables))

                # Accumulate the batch loss
                # Ensure adv_loss is a scalar or handle reduction correctly
                epoch_g_loss += adv_loss.numpy()
                num_batches += 1

            # Calculate and print the average generator loss for the epoch
            avg_epoch_g_loss = epoch_g_loss / num_batches if num_batches > 0 else 0
            print(f"  Generator loss: {avg_epoch_g_loss:.4f}")

        print("Finished adversarial training for the generator.")

    def generate_adversarial_predictions(self, relevant_files_info, data_processor_instance):
        """Generates adversarial predictions for a list of image files."""
        print("\nGenerating adversarial examples and evaluating the original classifier on them...")
        predictions_dict_adversarial = {} # Dictionary to store predictions (binary labels) {id: [labels]}
        processed_ids_adversarial = []   # List to store IDs for which predictions were generated

        # Iterate through the list of relevant files (ID, path, label)
        for image_id_int, file_path, _ in relevant_files_info: # We only need ID and path here

            try:
                # Load and preprocess the original image using the provided processor instance
                # This reuses the preprocessing logic defined in ImageDataProcessor
                original_img_processed = data_processor_instance._load_and_preprocess_single_image(file_path)
                # Skip if preprocessing failed or returned None
                if original_img_processed is None:
                    continue

                # Add a batch dimension to the processed image (H, W, C) -> (1, H, W, C)
                original_img_batch = tf.expand_dims(original_img_processed, axis=0)
                # Ensure it's float32, though preprocessing should handle this
                original_img_batch = tf.cast(original_img_batch, tf.float32)

                # Use the trained generator to predict the perturbation
                # training=False in evaluation mode
                perturbation_batch = self.generator(original_img_batch, training=False)

                # Create the adversarial image
                adversarial_image_batch = original_img_batch + perturbation_batch
                # Clip values to ensure they are within the valid [0.0, 1.0] range
                adversarial_image_batch = tf.clip_by_value(adversarial_image_batch, 0.0, 1.0)

                # Get predictions from the classifier on the adversarial image
                predictions_adv = self.classifier.predict(adversarial_image_batch, verbose=0)

                # Check if the prediction output shape matches the number of labels
                if predictions_adv.shape[-1] == len(labels): # Check last dim of output shape
                    # Convert probability predictions to binary labels using the threshold
                    predicted_binary_adv = (predictions_adv[0] > self.prediction_threshold).astype(int)
                    # Store the prediction result in the dictionary
                    predictions_dict_adversarial[image_id_int] = predicted_binary_adv
                    # Add the image ID to the list of processed IDs
                    processed_ids_adversarial.append(image_id_int)
                # else:
                #     print(f"Skipping {file_path}: Classifier output shape {predictions_adv.shape} doesn't match expected label count.")

            except Exception as e:
                # Print error for any exception during processing or prediction
                print(f"Error processing {file_path} for adversarial evaluation: {e}")

        print(f"Finished generating adversarial predictions for {len(processed_ids_adversarial)} files.")
        # Return the dictionary of predictions and the list of processed IDs
        return predictions_dict_adversarial, processed_ids_adversarial

In [None]:
print("\n--- Setting up Adversarial Attack ---")

# 1. Instantiate the Generator Model
# Pass input shape and epsilon
generator = AdversarialGenerator(
    input_shape=(TARGET_HEIGHT, TARGET_WIDTH, TARGET_CHANNELS),
    num_channels_output=TARGET_CHANNELS,
    epsilon=EPSILON
)

# 2. Define Optimizer and Loss for Generator Training
optimizer_g = tf.keras.optimizers.Adam(learning_rate=GENERATOR_LEARNING_RATE)
# The loss function is the classifier's loss function
# Note: In the original code, this was BinaryFocalCrossentropy.
# Let's define it here, matching the one used by the classifier.
# Ensure from_logits matches the classifier's output (False means sigmoid output)
# The reduction strategy should match how you want to average loss over the batch
# SUM_OVER_BATCH_SIZE means sum losses for all items in batch, then divide by batch size
classifier_loss_fn = tf.keras.losses.BinaryFocalCrossentropy(
    from_logits=False,
    reduction=tf.keras.losses.Reduction.SUM_OVER_BATCH_SIZE
)


# 3. Instantiate the Data Processor
data_processor = ImageDataProcessor(
    image_dir=image_dir,
    true_labels_df=true_labels_df, # Pass the loaded dataframe
    labels=labels,                 # Pass the labels list
    target_height=TARGET_HEIGHT,
    target_width=TARGET_WIDTH,
    target_channels=TARGET_CHANNELS
)

# 4. Prepare the Training Dataset
# This calls internal methods to find files, preprocess, and create the dataset
train_dataset_gen = data_processor.get_training_dataset(
    batch_size=ADVERSARIAL_BATCH_SIZE,
    shuffle_buffer_size=DATASET_SHUFFLE_BUFFER # Using defined buffer size
)

# Check if dataset creation was successful
if train_dataset_gen is None:
    print("Failed to prepare training dataset. Exiting.")
    exit()

# 5. Instantiate the Adversarial Attack Executor
attack_executor = AdversarialAttackExecutor(
    classifier_model=classifier,
    generator_model=generator,
    optimizer_g=optimizer_g,
    loss_fn=classifier_loss_fn, # Pass the classifier's loss function
    epsilon=EPSILON, # Pass epsilon (mostly for reference)
    prediction_threshold=PREDICTION_THRESHOLD # Pass the threshold for later evaluation
)

# 6. Train the Generator
attack_executor.train_generator(
    train_dataset=train_dataset_gen,
    epochs=ADVERSARIAL_TRAINING_EPOCHS
)

# 7. Generate Adversarial Predictions
# Get the list of relevant files (IDs and paths) from the data processor
relevant_files_for_pred = data_processor._find_relevant_files() # Use the internal list

# Generate predictions on adversarial examples
predictions_dict_adversarial, processed_ids_adversarial = attack_executor.generate_adversarial_predictions(
    relevant_files_info=relevant_files_for_pred,
    data_processor_instance=data_processor # Pass the processor instance for loading/preprocessing
)

# 8. Evaluate the Attack (Calculate accuracy on adversarial examples)
# This part remains outside the executor class as it's a separate evaluation step
# using the results generated by the executor and the original true labels.
valid_processed_ids_adv = [
    id_val for id_val in processed_ids_adversarial
    if id_val in predictions_dict_adversarial and id_val in true_labels_df.index
]

if not valid_processed_ids_adv:
    print("\nNo valid adversarial predictions with matching true labels stored. Cannot calculate accuracy on adversarial examples.")
else:
    print(f"\nCalculating accuracy on adversarial examples for {len(valid_processed_ids_adv)} images...")
    valid_processed_ids_adv.sort() # Sort IDs for consistent order

    # Extract true labels and predicted labels for the valid IDs
    true_matrices_list_adv = [true_labels_df.loc[id_val, labels].values for id_val in valid_processed_ids_adv]
    predicted_matrices_list_adv = [predictions_dict_adversarial[id_val] for id_val in valid_processed_ids_adv]

    true_matrix_adv = np.array(true_matrices_list_adv)
    predicted_matrix_adv = np.array(predicted_matrices_list_adv)

    # Perform shape and emptiness checks before calculating accuracy
    if true_matrix_adv.shape != predicted_matrix_adv.shape:
        print(f"Error: Shape mismatch for adversarial data. True: {true_matrix_adv.shape}, Pred: {predicted_matrix_adv.shape}")
    elif true_matrix_adv.size == 0:
        print("Error: Adversarial true/predicted matrices are empty after filtering.")
    else:
        num_images_adv, num_labels_adv = true_matrix_adv.shape
        print(f"Comparing true and predicted (adversarial) labels for {num_images_adv} images and {num_labels_adv} classes.")

        print("\n--- Per-Class Accuracy (Adversarial) ---")
        # Calculate and print per-class accuracy
        for i, label_name in enumerate(labels):
            true_col_adv = true_matrix_adv[:, i]
            predicted_col_adv = predicted_matrix_adv[:, i]
            # Accuracy for a class is (TP + TN) / Total Samples
            correct_predictions_for_class_adv = (true_col_adv == predicted_col_adv).sum()
            accuracy_adv = correct_predictions_for_class_adv / num_images_adv if num_images_adv > 0 else 0
            print(f"Accuracy for '{label_name}' (adversarial): {accuracy_adv:.4f}")

        print("\n--- Overall (Micro) Accuracy (Adversarial) ---")
        # Calculate and print overall micro accuracy (accuracy across all individual label predictions)
        total_correct_predictions_adv = (true_matrix_adv == predicted_matrix_adv).sum()
        total_possible_predictions_adv = float(true_matrix_adv.size)
        overall_accuracy_adv = total_correct_predictions_adv / total_possible_predictions_adv if total_possible_predictions_adv > 0 else 0
        print(f"Overall (Micro) Accuracy (Adversarial): {overall_accuracy_adv:.4f}")

print("\n--- Adversarial Attack Block Finished ---")

In [None]:
num_visualizations = 5

# Ensure there are valid processed IDs to visualize
if not valid_processed_ids_adv:
    print("No valid adversarial examples generated to visualize.")
else:
    # Select a few random IDs from the list of successfully processed adversarial examples
    # Using np.random.choice ensures uniqueness and handles cases where num_visualizations > list length
    ids_to_visualize = np.random.choice(valid_processed_ids_adv,
                                        min(num_visualizations, len(valid_processed_ids_adv)),
                                        replace=False)

    print(f"Visualizing {len(ids_to_visualize)} random examples...")

    # Get the list of relevant files info from the data processor (it's cached after processing)
    relevant_files_info = data_processor._find_relevant_files()
    # Create a dictionary mapping ID to (path, label) for quick lookup
    relevant_files_dict = {id_: (path, label) for id_, path, label in relevant_files_info}

    # Iterate through the selected IDs
    for image_id in ids_to_visualize:
        print(f"\nVisualizing Image ID: {image_id}")

        # Get file path and true label for this ID
        file_path, true_label_array = relevant_files_dict.get(image_id)
        if file_path is None:
             print(f"Could not find file info for ID {image_id}, skipping.")
             continue

        # Get the adversarial prediction for this ID
        adv_prediction_array = predictions_dict_adversarial.get(image_id)
        if adv_prediction_array is None:
             print(f"Could not find adversarial prediction for ID {image_id}, skipping.")
             continue

        try:
            # Load and preprocess the original image using the processor instance
            original_img_processed = data_processor._load_and_preprocess_single_image(file_path)

            if original_img_processed is None:
                print(f"Preprocessing failed for ID {image_id}, skipping visualization.")
                continue

            # Ensure the image is in the expected float32 format and has the correct shape
            if not isinstance(original_img_processed, tf.Tensor) or original_img_processed.dtype != tf.float32:
                 original_img_processed = tf.cast(original_img_processed, tf.float32)

            # Generate adversarial image (using the trained generator)
            # Add batch dimension (H, W, C) -> (1, H, W, C)
            original_img_batch = tf.expand_dims(original_img_processed, axis=0)

            # Use the trained generator from the attack executor
            perturbation_batch = attack_executor.generator(original_img_batch, training=False)

            # Create adversarial image
            adversarial_image_batch = original_img_batch + perturbation_batch
            # Clip values back to [0.0, 1.0]
            adversarial_image_batch = tf.clip_by_value(adversarial_image_batch, 0.0, 1.0)

            # Convert Tensors to NumPy arrays for plotting and remove batch dimension
            original_img_np = tf.squeeze(original_img_batch, axis=0).numpy()
            adversarial_img_np = tf.squeeze(adversarial_image_batch, axis=0).numpy()
            perturbation_np = tf.squeeze(perturbation_batch, axis=0).numpy() # Optional: visualize perturbation too

            # --- Plotting ---
            fig, axes = plt.subplots(1, 2, figsize=(12, 6)) # 1 row, 2 columns

            # Plot Original Image
            axes[0].imshow(original_img_np)
            axes[0].set_title("Original Image")
            axes[0].axis('off') # Hide axes ticks and labels

            # Plot Adversarial Image
            axes[1].imshow(adversarial_img_np)
            axes[1].set_title("Adversarial Image")
            axes[1].axis('off') # Hide axes ticks and labels

            # --- Add Title with Labels/Predictions ---
            # Format true labels and adversarial predictions
            true_labels_str = [labels[i] for i, val in enumerate(true_label_array) if val == 1]
            adv_pred_str = [labels[i] for i, val in enumerate(adv_prediction_array) if val == 1]

            # Compare and highlight prediction changes
            comparison_str = []
            for i, label_name in enumerate(labels):
                true_val = true_label_array[i]
                pred_val = adv_prediction_array[i]
                if true_val == 1 and pred_val == 1:
                    comparison_str.append(f"✅{label_name}") # Correctly predicted positive
                elif true_val == 1 and pred_val == 0:
                    comparison_str.append(f"❌{label_name}") # Missed positive
                elif true_val == 0 and pred_val == 1:
                     comparison_str.append(f"🚨{label_name}") # False positive (attack success for this label)
                # If true_val is 0 and pred_val is 0, it's a correct negative, usually less interesting for attack visualization
                # Optionally include: elif true_val == 0 and pred_val == 0: comparison_str.append(f"⚪{label_name}")

            main_title = f"ID: {image_id} | True: [{', '.join(true_labels_str)}] | Adv Pred (Threshold={PREDICTION_THRESHOLD}): [{', '.join(adv_pred_str)}]\nComparison: [{', '.join(comparison_str)}]"
            fig.suptitle(main_title, y=1.02) # Add a super title slightly above the plots

            plt.tight_layout(rect=[0, 0, 1, 0.98]) # Adjust layout to prevent title overlap
            plt.show() # Display the figure

        except Exception as e:
            print(f"An error occurred during visualization for ID {image_id}: {e}")
            # Continue to the next image even if one fails