In [None]:
import numpy as np
# import pandas as pd
import keras
import skimage.io
import skimage.segmentation
import copy
import sklearn
import sklearn.metrics
from sklearn.linear_model import LinearRegression
import warnings
import pickle
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras.utils import to_categorical
from tensorflow.keras import layers, models, Sequential
from tensorflow.keras.losses import CategoricalCrossentropy
import matplotlib.pyplot as plt
from tensorflow.keras.layers import Conv2D, Activation, MaxPooling2D, Flatten, Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import load_model
print('Notebook running: Keras', keras.__version__) # Zeigt die Keras Version
# print('Notebook running: TF', tensorflow.__version__)
from skimage.segmentation import slic

2025-01-09 18:40:58.075693: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2025-01-09 18:41:10.728440: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2025-01-09 18:41:10.777371: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


Notebook running: Keras 2.13.1


In [None]:
# Loadthe dataset
training_file = "train.p"
validation_file = "valid.p"
testing_file = "test.p"

with open(training_file, 'rb') as f:
    train_data = pickle.load(f)
with open(validation_file, 'rb') as f:
    valid_data = pickle.load(f)
with open(testing_file, 'rb') as f:
    test_data = pickle.load(f)

X_train, y_train = train_data['features'], train_data['labels']
X_valid, y_valid = valid_data['features'], valid_data['labels']
X_test, y_test = test_data['features'], test_data['labels']

# Normalize
X_train = X_train.astype('float32') / 255
X_valid = X_valid.astype('float32') / 255
X_test = X_test.astype('float32') / 255

# One-hot encode labels
n_classes = len(set(y_train))
y_train = to_categorical(y_train, n_classes)
y_valid = to_categorical(y_valid, n_classes)
y_test = to_categorical(y_test, n_classes)

# Load label names
label_names = open('signnames.csv').read().strip().split("\n")[1:]
label_names = [line.split(',') for line in label_names]

In [None]:
# Define the CNN model
def create_cnn_model():
    model = Sequential([
        layers.Input(shape=(32, 32, 3)),
        layers.Conv2D(32, kernel_size=(3, 3), padding='same', activation='relu'),
        layers.MaxPooling2D((2, 2), padding='same'),
        layers.Conv2D(64, (3, 3), padding='same', activation='relu'),
        layers.MaxPooling2D(pool_size=(2, 2), padding='same'),
        layers.Conv2D(128, (3, 3), padding='same', activation='relu'),
        layers.MaxPooling2D(pool_size=(2, 2), padding='same'),
        layers.Flatten(),
        layers.Dense(128, activation='relu'),
        layers.Dense(n_classes, activation='softmax')
    ])
    model.compile(optimizer='adam', loss=CategoricalCrossentropy(), metrics=['accuracy'])
    return model


In [None]:

# Save initial weights
model_no_training = create_cnn_model()
model_no_training.save_weights('init_weights_2501_20estop_02.h5')


In [None]:
# Function to generate adversarial examples using FGSM on superpixels
def generate_fgsm_superpixel(model, image, label, epsilon, segments):
    """
    Generate FGSM adversarial examples targeting superpixels.
    Args:
    - model: Pretrained model.
    - image: Input image.
    - label: True label (one-hot encoded).
    - epsilon: Perturbation magnitude.
    - segments: Superpixel segmentation of the input image.

    Returns:
    - adversarial_image: Generated adversarial image.
    """
    image = tf.convert_to_tensor(image[np.newaxis, ...], dtype=tf.float32)
    label = tf.convert_to_tensor(label[np.newaxis, ...], dtype=tf.float32)

    with tf.GradientTape() as tape:
        tape.watch(image)
        predictions = model(image)
        loss = CategoricalCrossentropy()(label, predictions)

    gradient = tape.gradient(loss, image).numpy()[0]

    # Create adversarial image by perturbing each superpixel
    adversarial_image = image.numpy()[0]
    unique_superpixels = np.unique(segments)
    for superpixel_id in unique_superpixels:
        mask = (segments == superpixel_id)
        avg_gradient = np.sum(gradient[mask], axis=0)  # Average gradient
        adversarial_image[mask] += epsilon * np.sign(avg_gradient)  # Perturb the superpixel

    # Clip pixel values to valid range [0, 1]
    adversarial_image = np.clip(adversarial_image, 0, 1)
    return adversarial_image

In [None]:
# Generate adversarial dataset for training
def generate_adversarial_dataset_superpixel(model, X_data, y_data, epsilon, superpixel_segments):
    """
    Generate adversarial dataset using FGSM with superpixels.
    Args:
    - model: Pretrained model.
    - X_data: Dataset of images.
    - y_data: Dataset of labels (one-hot encoded).
    - epsilon: Perturbation magnitude.
    - superpixel_segments: Precomputed superpixel segmentations for the dataset.

    Returns:
    - X_adversarial: Array of adversarial images.
    """
    X_adversarial = []
    for i in range(len(X_data)):
        adversarial_image = generate_fgsm_superpixel(model, X_data[i], y_data[i], epsilon, superpixel_segments[i])
        X_adversarial.append(adversarial_image)
    return np.array(X_adversarial)

In [None]:

# Precompute superpixels for the dataset
def generate_superpixels(X_data, n_segments=50, compactness=5, sigma=1):
    """
    Generate superpixel segmentations for a dataset.
    Args:
    - X_data: Dataset of images.
    - n_segments: Number of superpixels.
    - compactness: Compactness parameter for SLIC.
    - sigma: Gaussian smoothing for SLIC.

    Returns:
    - List of segmentations for each image.
    """
    return [slic(X_data[i], n_segments=n_segments, compactness=compactness, sigma=sigma) for i in range(len(X_data))]

# Generate superpixels for training and validation datasets
train_segments = generate_superpixels(X_train)
valid_segments = generate_superpixels(X_valid)


In [None]:


# Perform adversarial training
def adversarial_training(model, X_train, y_train, X_valid, y_valid, train_segments, epochs, batch_size, alpha, epsilon, patience):
    best_val_loss = float('inf')
    patience_counter = 0

    for epoch in range(epochs):
        print(f"Epoch {epoch + 1}/{epochs}")

        indices = np.arange(len(X_train))
        np.random.shuffle(indices)
        X_train, y_train = X_train[indices], y_train[indices]
        train_segments = [train_segments[i] for i in indices]

        for batch_start in range(0, len(X_train), batch_size):
            batch_end = min(batch_start + batch_size, len(X_train))
            x_batch = X_train[batch_start:batch_end]
            y_batch = y_train[batch_start:batch_end]
            segments_batch = train_segments[batch_start:batch_end]

            x_adv_batch = generate_adversarial_dataset_superpixel(model, x_batch, y_batch, epsilon, segments_batch)

            # Training step
            x_batch = tf.convert_to_tensor(x_batch, dtype=tf.float32)
            y_batch = tf.convert_to_tensor(y_batch, dtype=tf.float32)

            with tf.GradientTape() as tape:
                predictions_clean = model(x_batch)
                loss_clean = CategoricalCrossentropy()(y_batch, predictions_clean)

                predictions_adv = model(x_adv_batch)
                loss_adv = CategoricalCrossentropy()(y_batch, predictions_adv)

                combined_loss = alpha * loss_clean + (1 - alpha) * loss_adv

            gradients = tape.gradient(combined_loss, model.trainable_variables)
            model.optimizer.apply_gradients(zip(gradients, model.trainable_variables))

        val_loss, val_acc = model.evaluate(X_valid, y_valid, verbose=0)
        print(f"Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_acc:.4f}")

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            patience_counter = 0
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print("Early stopping triggered.")
                break

    return model



In [None]:
# Define training parameters
epochs = 200
batch_size = 64
alpha = 0.5
epsilon = 0.01
patience = 20  # Early stopping patience


In [None]:
# Train adversarial model
model_adv_training = create_cnn_model()
model_adv_training = adversarial_training(model_adv_training, X_train, y_train, X_valid, y_valid, train_segments, epochs, batch_size, alpha, epsilon, patience)

# # Save the adversarially trained model
model_adv_training.save("CNN_adv_training_paper_superpixel_estop20_2501_02.h5")

Epoch 1/200
Validation Loss: 0.7488, Validation Accuracy: 0.8070
Epoch 2/200
Validation Loss: 0.4697, Validation Accuracy: 0.8921
Epoch 3/200
Validation Loss: 0.4753, Validation Accuracy: 0.8950
Epoch 4/200
Validation Loss: 0.3594, Validation Accuracy: 0.9234
Epoch 5/200
Validation Loss: 0.3871, Validation Accuracy: 0.9202
Epoch 6/200
Validation Loss: 0.5277, Validation Accuracy: 0.9075
Epoch 7/200
Validation Loss: 0.4586, Validation Accuracy: 0.9209
Epoch 8/200
Validation Loss: 0.3254, Validation Accuracy: 0.9356
Epoch 9/200
Validation Loss: 0.3382, Validation Accuracy: 0.9420
Epoch 10/200
Validation Loss: 0.5286, Validation Accuracy: 0.9272
Epoch 11/200
Validation Loss: 0.4425, Validation Accuracy: 0.9474
Epoch 12/200
Validation Loss: 0.3748, Validation Accuracy: 0.9388
Epoch 13/200
Validation Loss: 0.2601, Validation Accuracy: 0.9537
Epoch 14/200
Validation Loss: 0.2969, Validation Accuracy: 0.9483
Epoch 15/200
Validation Loss: 0.3607, Validation Accuracy: 0.9422
Epoch 16/200
Valida

  saving_api.save_model(


In [None]:
# Evaluate the model on clean and adversarial test data
test_segments = generate_superpixels(X_test)
X_test_adv = generate_adversarial_dataset_superpixel(model_adv_training, X_test, y_test, epsilon, test_segments)

test_loss, test_acc = model_adv_training.evaluate(X_test, y_test)
adv_test_loss, adv_test_acc = model_adv_training.evaluate(X_test_adv, y_test)



