In [1]:
# Load the data
import numpy as np
# Path to the saved .npy files
spectrograms_file = "dataSpectogram/spectrograms.npy"
targets_file = "dataSpectogram/targets.npy"

# Load the spectrograms and targets
spectrograms = np.load(spectrograms_file)
targets = np.load(targets_file)

# Verify the data
print(f"Spectrograms shape: {spectrograms.shape}")  # Should be (num_samples, 227, fixed_time_dim)
print(f"Targets shape: {targets.shape}")  # Should be (num_samples,)


Spectrograms shape: (30000, 227, 169)
Targets shape: (30000,)


In [2]:
# Shuffle the data
indices = np.arange(len(spectrograms))
np.random.shuffle(indices)

spectrograms = spectrograms[indices]
targets = targets[indices]


In [3]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

# Use only 60% of the data
num_samples = len(spectrograms)
num_training_samples = int(0.6 * num_samples)

# Split data into 60% usable data and 40% for adversarial attack
usable_spectrograms = spectrograms[:num_training_samples]
usable_targets = targets[:num_training_samples]
# Expand dimensions to match AlexNet input format (add a channel dimension)
usable_spectrograms = usable_spectrograms[..., np.newaxis]  # Shape: (num_samples, 227, 169, 1)

adversarial_spectrograms = spectrograms[num_training_samples:]
adversarial_targets = targets[num_training_samples:]

# Verify splits
print(f"Usable data: {len(usable_spectrograms)} samples")
print(f"Adversarial data: {len(adversarial_spectrograms)} samples")

# Further split the 60% usable data into training and testing sets
# Convert targets to one-hot encoding
num_classes = len(np.unique(targets))
targets_one_hot = to_categorical(usable_targets, num_classes=num_classes)

# Split into training and validation sets
X_train, X_val, y_train, y_val = train_test_split(usable_spectrograms, targets_one_hot, test_size=0.2, random_state=42)

print(f"Training data: {len(X_train)} samples")
print(f"Testing data: {len(X_val)} samples")

Usable data: 18000 samples
Adversarial data: 12000 samples
Training data: 14400 samples
Testing data: 3600 samples


In [4]:
import tensorflow as tf

# Load fixed model
model = tf.keras.models.load_model("alexnet_spectrogram_model.h5")

# Verify the model
model.summary()

Model: "AlexNet"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 data (InputLayer)           [(None, 227, 169, 1)]     0         
                                                                 
 conv1 (Conv2D)              (None, 55, 40, 96)        11712     
                                                                 
 relu1 (ReLU)                (None, 55, 40, 96)        0         
                                                                 
 pool1 (MaxPooling2D)        (None, 27, 19, 96)        0         
                                                                 
 conv2 (Conv2D)              (None, 27, 19, 256)       307456    
                                                                 
 relu2 (ReLU)                (None, 27, 19, 256)       0         
                                                                 
 pool2 (MaxPooling2D)        (None, 13, 9, 256)        0   

In [5]:
# Evaluate on validation data to see how well it predicts
val_loss, val_accuracy = model.evaluate(X_val, y_val, verbose=1)
print(f"Validation Loss: {val_loss:.4f}")
print(f"Validation Accuracy: {val_accuracy:.4f}")

Validation Loss: 0.0814
Validation Accuracy: 0.9781


In [6]:
import tensorflow as tf

def pgd_attack(model, spectrogram, label, epsilon=0.1, alpha=0.01, num_iter=40):

    # Clone the spectrogram to avoid modifying the original input
    perturbed_spectrogram = tf.cast(tf.identity(spectrogram), tf.float32)  # Ensure it's float32

    # Add random initialization within the epsilon ball
    perturbed_spectrogram += tf.random.uniform(perturbed_spectrogram.shape, minval=-epsilon, maxval=epsilon, dtype=tf.float32)
    perturbed_spectrogram = tf.clip_by_value(perturbed_spectrogram, clip_value_min=0.0, clip_value_max=1.0)

    # Perform iterative attack
    for _ in range(num_iter):
        with tf.GradientTape() as tape:
            tape.watch(perturbed_spectrogram)

            # Forward pass
            predictions = model(perturbed_spectrogram)

            # Compute loss (cross-entropy)
            loss = tf.keras.losses.categorical_crossentropy(label, predictions)

        # Compute gradients of loss with respect to input
        gradient = tape.gradient(loss, perturbed_spectrogram)
        signed_grad = tf.sign(gradient)

        # Update the perturbed spectrogram
        perturbed_spectrogram += alpha * signed_grad

        # Project back to the epsilon-ball and ensure valid pixel range
        perturbation = tf.clip_by_value(perturbed_spectrogram - spectrogram, -epsilon, epsilon)
        perturbed_spectrogram = tf.clip_by_value(spectrogram + perturbation, clip_value_min=0.0, clip_value_max=1.0)

    return perturbed_spectrogram


In [7]:
# Example spectrogram and label
index = 150  # Choose an index from the validation set
sample_spectrogram = X_val[index]  
label = y_val[index]  

# Convert to TensorFlow tensors
sample_spectrogram = tf.convert_to_tensor(sample_spectrogram, dtype=tf.float32)
label = tf.convert_to_tensor(label, dtype=tf.float32)  # Ensure label is float32

# Add batch and channel dimensions to spectrogram
sample_spectrogram = tf.expand_dims(sample_spectrogram, axis=0)  # Add batch dimension
sample_spectrogram = tf.expand_dims(sample_spectrogram, axis=-1)  # Add channel dimension
label = tf.expand_dims(label, axis=0)  # Add batch dimension to match predictions


# Perform PGD attack
adversarial_example = pgd_attack(model, sample_spectrogram, label, epsilon=0.1, alpha=0.01, num_iter=40)

# Get predictions for the adversarial example
predictions = model(adversarial_example)
predicted_class = tf.argmax(predictions, axis=1)

print("Predicted class for adversarial example:", predicted_class.numpy()[0])
print('True class:', np.argmax(label))


Predicted class for adversarial example: 7
True class: 4


In [8]:
#check how well the model is doing on validation data with adversarial attack
def evaluate_pgd_attack(model, X_val, y_val, epsilon=0.1, alpha=0.01, num_iter=40):
    
    total_samples = X_val.shape[0]
    total_samples = 1000  
    correct_predictions = 0

    for i in range(total_samples):
        # Get the i-th spectrogram and label
        sample_spectrogram = X_val[i]
        label = y_val[i]

        # Convert to TensorFlow tensors
        sample_spectrogram = tf.convert_to_tensor(sample_spectrogram, dtype=tf.float32)
        label = tf.convert_to_tensor(label, dtype=tf.float32)

        # Add batch and channel dimensions to spectrogram
        sample_spectrogram = tf.expand_dims(sample_spectrogram, axis=0)  # Add batch dimension

        # Add batch dimension to label
        label = tf.expand_dims(label, axis=0)

        # Generate adversarial example using PGD
        adversarial_example = pgd_attack(
            model, sample_spectrogram, label,
            epsilon=epsilon, alpha=alpha, num_iter=num_iter
        )

        # Make a prediction on the adversarial example
        predictions = model(adversarial_example)
        predicted_class = tf.argmax(predictions, axis=1).numpy()[0]
        true_class = tf.argmax(label, axis=1).numpy()[0]

        # Check if the prediction matches the true label
        if predicted_class == true_class:
            correct_predictions += 1

    # Calculate adversarial accuracy
    adversarial_accuracy = correct_predictions / total_samples
    return adversarial_accuracy


# Evaluate model under PGD attack
adversarial_accuracy = evaluate_pgd_attack(model, X_val, y_val, epsilon=0.1, alpha=0.01, num_iter=40)

print(f"Accuracy of normal model under PGD Attack: {adversarial_accuracy * 100:.2f}%")


Accuracy of normal model under PGD Attack: 9.50%


In [9]:
def generate_adversarial_examples(model, x_batch, y_batch, epsilon):
    x_batch = tf.convert_to_tensor(x_batch, dtype=tf.float32)

    with tf.GradientTape() as tape:
        tape.watch(x_batch)

        # Forward pass
        predictions = model(x_batch)

        # Ensure labels match batch size
        y_batch = tf.reshape(y_batch, (tf.shape(x_batch)[0],))  # Dynamically match batch size

        # Compute loss
        loss = tf.keras.losses.sparse_categorical_crossentropy(y_batch, predictions)

    # Compute gradients
    gradients = tape.gradient(loss, x_batch)

    # Generate adversarial example
    adv_x = x_batch + epsilon * tf.sign(gradients)

    # Clip values to be within valid range
    return tf.clip_by_value(adv_x, 0, 1)


In [10]:
def adversarial_training(model, x_train, y_train, x_val, y_val, epochs=5, batch_size=32, epsilon=0.1):
    model.compile(
        optimizer=tf.keras.optimizers.Adam(),
        loss=tf.keras.losses.SparseCategoricalCrossentropy(),
        metrics=['accuracy']
    )

    for epoch in range(epochs):
        print(f"Epoch {epoch+1}/{epochs}")
        for i in range(0, len(x_train), batch_size):
            x_batch = x_train[i:i+batch_size]
            y_batch = y_train[i:i+batch_size]

            # Convert to tensors
            x_batch = tf.convert_to_tensor(x_batch, dtype=tf.float32)
            y_batch = tf.convert_to_tensor(y_batch, dtype=tf.float32)

            # Convert one-hot labels to class indices
            if len(y_batch.shape) > 1 and y_batch.shape[1] > 1:
                y_batch = tf.argmax(y_batch, axis=1)  # Convert to class indices

            y_batch = tf.cast(y_batch, tf.int64)

            # Generate adversarial examples
            adv_x_batch = generate_adversarial_examples(model, x_batch, y_batch, epsilon)

            # Train the model on adversarial examples
            model.train_on_batch(adv_x_batch, y_batch)

    #Fix: Ensure `y_val` is an int64 tensor (Remove unnecessary conversion to float32)
    y_val = tf.convert_to_tensor(y_val, dtype=tf.int64)

    if len(y_val.shape) > 1 and y_val.shape[1] > 1:
        y_val = tf.argmax(y_val, axis=1)  # Convert one-hot to class indices

    # Evaluate model on validation set
    val_loss, val_acc = model.evaluate(x_val, y_val, verbose=0)
    print(f"Validation Loss: {val_loss:.4f}, Accuracy: {val_acc:.4f}")


In [11]:
gpus = tf.config.list_physical_devices('GPU')
if gpus:
  # Restrict TensorFlow to only use the first GPU
  try:
    tf.config.set_visible_devices(gpus[0], 'GPU')
    logical_gpus = tf.config.list_logical_devices('GPU')
    print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPU")
  except RuntimeError as e:
    # Visible devices must be set before GPUs have been initialized
    print(e)

1 Physical GPUs, 1 Logical GPU


In [12]:
# Adversarial Training
adversarial_training(model, X_train, y_train, X_val, y_val)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5
Validation Loss: 96.1315, Accuracy: 0.1006


In [13]:
# Evaluate model under PGD attack
adversarial_accuracy = evaluate_pgd_attack(model, X_val, y_val, epsilon=0.1, alpha=0.01, num_iter=40)
print(f"Accuracy of advesarially trained model under PGD Attack: {adversarial_accuracy * 100:.2f}%")

Accuracy of advesarially trained model under PGD Attack: 6.70%
