#Import lib

In [1]:
!pip install cleverhans

Collecting cleverhans
  Downloading cleverhans-4.0.0-py3-none-any.whl.metadata (846 bytes)
Collecting nose (from cleverhans)
  Downloading nose-1.3.7-py3-none-any.whl.metadata (1.7 kB)
Collecting pycodestyle (from cleverhans)
  Downloading pycodestyle-2.14.0-py2.py3-none-any.whl.metadata (4.5 kB)
Collecting mnist (from cleverhans)
  Downloading mnist-0.2.2-py2.py3-none-any.whl.metadata (1.6 kB)
Downloading cleverhans-4.0.0-py3-none-any.whl (92 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m92.3/92.3 kB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading mnist-0.2.2-py2.py3-none-any.whl (3.5 kB)
Downloading nose-1.3.7-py3-none-any.whl (154 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m154.7/154.7 kB[0m [31m12.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pycodestyle-2.14.0-py2.py3-none-any.whl (31 kB)
Installing collected packages: nose, pycodestyle, mnist, cleverhans
Successfully installed cleverhans-4.0.0 mnist-0.2.2 nose-1.3.7 pyc

In [2]:
import math
import numpy as np
import tensorflow as tf
import tensorflow_datasets as tfds
from absl import app, flags
from easydict import EasyDict
from tensorflow.keras import Model
from tensorflow.keras import layers, models, backend as K
from tensorflow.keras.layers import MaxPooling2D, Dropout, Flatten, Dense, BatchNormalization, Conv2D, Input

from cleverhans.tf2.attacks.projected_gradient_descent import projected_gradient_descent
from cleverhans.tf2.attacks.fast_gradient_method import fast_gradient_method
import pandas as pd


import matplotlib.pyplot as plt


## LOAD CIFAR-10 DATASET

In [3]:
def ld_cifar10():
    """Load training and test data."""

    def convert_types(image, label):
        image = tf.cast(image, tf.float32)
        image /= 127.5
        image -= 1.0
        return image, label

    dataset, info = tfds.load("cifar10", with_info=True, as_supervised=True)

    def augment_mirror(x):
        return tf.image.random_flip_left_right(x)

    def augment_shift(x, w=4):
        y = tf.pad(x, [[w] * 2, [w] * 2, [0] * 2], mode="REFLECT")
        return tf.image.random_crop(y, tf.shape(x))

    cifar10_train, cifar10_test = dataset["train"], dataset["test"]
    # Augmentation helps a lot in CIFAR10
    cifar10_train = cifar10_train.map(
        lambda x, y: (augment_mirror(augment_shift(x)), y)
    )
    cifar10_train = cifar10_train.map(convert_types).shuffle(10000).batch(128)
    cifar10_test = cifar10_test.map(convert_types).batch(1000)

    return EasyDict(train=cifar10_train, test=cifar10_test)

In [None]:
# Variables
nb_epochs=50
#Les budgets sont en norme infini
adv_perturbation_budget=0.01
pgd_iterations=10
pgd_step_budget=0.001

adv_train=False

data = ld_cifar10()
input_shape=(32,32,3)
num_classes=10

# Model Under Test
def Resnet_block(x, filters, size, num_comb,add=False):

    # Shortcut : si le nombre de filtres change, on l’adapte
    if x.shape[-1] != filters:
        shortcut = layers.Conv2D(filters, (1,1), padding='same', use_bias=False)(x)
        shortcut = layers.BatchNormalization()(shortcut)
    else:
        shortcut = x

    if (add==True):
      for i in range(0, num_comb):
          size = size + 2*i
          x = layers.Conv2D(filters, (size, size), padding='same', use_bias=False)(x)
          x = layers.BatchNormalization()(x)
          x = layers.Activation('relu')(x)
    else :
        for i in range(0, num_comb):
          x = layers.Conv2D(filters, (size, size), padding='same', use_bias=False)(x)
          x = layers.BatchNormalization()(x)
          x = layers.Activation('relu')(x)


    # Ajout du shortcut
    x = layers.BatchNormalization()(x)
    x = layers.add([x, shortcut])
    x = layers.Activation('relu')(x)

    return x

def build_resnet1(input_shape=(32,32,3), num_classes=10):
    inputs = layers.Input(shape=input_shape)

    # Bloc initial
    x = layers.Conv2D(8, (3,3), padding='same', use_bias=False)(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)

    # --- Blocs ResNet ---
    x = Resnet_block(x, filters=8, size=3, num_comb=3)
    x = Resnet_block(x, filters=8, size=3, num_comb=3)
    x = layers.MaxPooling2D((2,2))(x)
    x = layers.Dropout(0.2)(x)

    x = Resnet_block(x, filters=16, size=3, num_comb=3)
    x = Resnet_block(x, filters=16, size=3, num_comb=3)
    x = layers.MaxPooling2D((2,2))(x)
    x = layers.Dropout(0.3)(x)

    x = layers.Conv2D(32, (3, 3), padding='same', use_bias=False)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.Conv2D(32, (3, 3), padding='same', use_bias=False)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.MaxPooling2D((2,2))(x)
    x = layers.Dropout(0.3)(x)

    x = layers.Conv2D(64, (3, 3), padding='same', use_bias=False)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)

    # Classification finale
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dense(128, activation='relu')(x)
    x = layers.Dropout(0.35)(x)
    outputs = layers.Dense(10, activation='softmax')(x)

    model = models.Model(inputs, outputs, name="Simple_ResNet")
    return model

model = build_resnet1()
# Load training and test data


#model = Model(inputs, x)
model.summary()

loss_object = tf.losses.SparseCategoricalCrossentropy(from_logits=True)
optimizer = tf.optimizers.Adam(learning_rate=0.001)

# Metrics to track the different accuracies.
train_loss = tf.metrics.Mean(name="train_loss")
train_acc_clean = tf.metrics.SparseCategoricalAccuracy()
test_acc_clean = tf.metrics.SparseCategoricalAccuracy()
test_acc_fgsm = tf.metrics.SparseCategoricalAccuracy()
test_acc_pgd = tf.metrics.SparseCategoricalAccuracy()

@tf.function
def train_step(x, y):
  with tf.GradientTape() as tape:
    predictions = model(x)
    loss = loss_object(y, predictions)
  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))
  train_loss(loss)
  train_acc_clean(y,predictions)

# Train model with adversarial training
for epoch in range(nb_epochs):
  # keras like pp of progress
  progress_bar_train = tf.keras.utils.Progbar(50000)
  for (x, y) in data.train:
    if adv_train:
      # Replace clean example with adversarial example for adversarial training
      x = projected_gradient_descent(model, x, adv_perturbation_budget, 0.01, 40, np.inf)
    train_step(x, y)
    progress_bar_train.add(x.shape[0], values=[("epoch",epoch),("Loss (train)", train_loss.result())])





Downloading and preparing dataset Unknown size (download: Unknown size, generated: Unknown size, total: Unknown size) to /root/tensorflow_datasets/cifar10/3.0.2...


Dl Completed...: 0 url [00:00, ? url/s]

Dl Size...: 0 MiB [00:00, ? MiB/s]

Extraction completed...: 0 file [00:00, ? file/s]

Generating splits...:   0%|          | 0/2 [00:00<?, ? splits/s]

Generating train examples...: 0 examples [00:00, ? examples/s]

Shuffling /root/tensorflow_datasets/cifar10/incomplete.4AJ6DV_3.0.2/cifar10-train.tfrecord*...:   0%|         …

Generating test examples...: 0 examples [00:00, ? examples/s]

Shuffling /root/tensorflow_datasets/cifar10/incomplete.4AJ6DV_3.0.2/cifar10-test.tfrecord*...:   0%|          …

Dataset cifar10 downloaded and prepared to /root/tensorflow_datasets/cifar10/3.0.2. Subsequent calls will reuse this data.


  output, from_logits = _get_logits(


[1m50000/50000[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m36s[0m 712us/step - epoch: 0.0000e+00 - Loss (train): 1.9847
[1m50000/50000[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 312us/step - epoch: 1.0000 - Loss (train): 1.7424
[1m50000/50000[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 325us/step - epoch: 2.0000 - Loss (train): 1.6282
[1m50000/50000[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m22s[0m 436us/step - epoch: 3.0000 - Loss (train): 1.5474
[1m50000/50000[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 313us/step - epoch: 4.0000 - Loss (train): 1.4854
[1m50000/50000[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m15s[0m 307us/step - epoch: 5.0000 - Loss (train): 1.4340
[1m50000/50000[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m15s[0m 310us/step - epoch: 6.0000 - Loss (train): 1.3888
[1m50000/50000[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m15s[0m 308us/step - epoch: 7.0000 - Loss (train): 1.3503
[1m50000/50000[0m [32m━━━

In [None]:
# Evaluate on clean and adversarial data
progress_bar_test = tf.keras.utils.Progbar(10000)

# Lists to store results for a few examples
true_labels = []
clean_preds = []
fgsm_preds = []
pgd_preds = []
clean_confidences = []
fgsm_confidences = []
pgd_confidences = []

sample_count = 0
max_samples_to_display = 100  # Display results for up to 100 samples

for x, y in data.test:
  #print(np.shape(x))
  y_pred = model(x)
  test_acc_clean(y, y_pred)

  #ATTACKS WITH FGSM
  x_fgm = fast_gradient_method(model, x, adv_perturbation_budget, np.inf)
  y_pred_fgm = model(x_fgm)
  test_acc_fgsm(y, y_pred_fgm)

  #ATTACKS WITH PGD (iterative attack)
  x_pgd = projected_gradient_descent(model, x, adv_perturbation_budget, pgd_step_budget, pgd_iterations, np.inf)
  y_pred_pgd = model(x_pgd)
  test_acc_pgd(y, y_pred_pgd)

  # Store results for display
  if sample_count < max_samples_to_display:
      true_labels.extend(tf.get_static_value(y[:max_samples_to_display-sample_count]))

      clean_pred_values = tf.get_static_value(y_pred[:max_samples_to_display-sample_count])
      clean_preds.extend(tf.get_static_value(tf.argmax(clean_pred_values, axis=1)))
      clean_confidences.extend(tf.get_static_value(tf.reduce_max(tf.nn.softmax(clean_pred_values, axis=1), axis=1)))

      fgsm_pred_values = tf.get_static_value(y_pred_fgm[:max_samples_to_display-sample_count])
      fgsm_preds.extend(tf.get_static_value(tf.argmax(fgsm_pred_values, axis=1)))
      fgsm_confidences.extend(tf.get_static_value(tf.reduce_max(tf.nn.softmax(fgsm_pred_values, axis=1), axis=1)))

      pgd_pred_values = tf.get_static_value(y_pred_pgd[:max_samples_to_display-sample_count])
      pgd_preds.extend(tf.get_static_value(tf.argmax(pgd_pred_values, axis=1)))
      pgd_confidences.extend(tf.get_static_value(tf.reduce_max(tf.nn.softmax(pgd_pred_values, axis=1), axis=1)))


      sample_count += x.shape[0]


  #Print the clean accuracy (no attack) and the adversarial accuracy (i.e., the accuracy computed on the adversarial images)
  #NB: CIFAR-10 has 10 labels, the "random guess level" is accuracy=1/10=0.1
  progress_bar_test.add(x.shape[0])

print("\n--- Evaluation Results ---")
print("test acc on clean examples (%): {:.3f}".format(test_acc_clean.result() * 100))
print("test acc on FGM adversarial examples (%): {:.3f}".format(test_acc_fgsm.result() * 100)) #Utilisation du gradient pour genere une images (un mask)
print("test acc on PGD adversarial examples (%): {:.3f}".format(test_acc_pgd.result() * 100)) #La meme mais en iterative

# Display comparative table for a few examples
import pandas as pd

results_df = pd.DataFrame({
    'True Label': true_labels,
    'Clean Prediction': clean_preds,
    'Clean Confidence': clean_confidences,
    'FGSM Prediction': fgsm_preds,
    'FGSM Confidence': fgsm_confidences,
    'PGD Prediction': pgd_preds,
    'PGD Confidence': pgd_confidences
})

print("\n--- Comparative Table (First {} samples) ---".format(max_samples_to_display))
#display(results_df) # Commented out the display

# Save the table to a text file
adv_budget_str = str(adv_perturbation_budget).replace('.', '')
pgd_step_str = str(pgd_step_budget).replace('.', '')
filename = f'comparative_model19_adv_{adv_budget_str}_pgdstep_{pgd_step_str}.txt'
results_df.to_csv(filename, sep='\t', index=False)
print(f"Comparative table saved to {filename}")


#SHOW THE ATTACK for a random sample
rand_sample_id=np.random.randint(np.shape(x)[0])

#We normalized our images into [0;1]. We make the inverse process to go into [0,255]
#NB: For the attacked images, the max perturbation is +/-0.05, so the images will be in [-0.05,1.05]
x_display = tf.cast(127.5*(x[rand_sample_id,:,:,:]+1.0),tf.uint8)
x_pgd_display = tf.cast(121.4*(x_pgd[rand_sample_id,:,:,:]+1.05),tf.uint8)
perturbation = x_pgd[rand_sample_id,:,:,:] - x[rand_sample_id,:,:,:]
print('Maximum perturbation: ',np.amax(perturbation))

#Display
fig, axs = plt.subplots(nrows=1, ncols=3, figsize=(6, 2), subplot_kw={'xticks': [], 'yticks': []})
axs.flat[0].imshow(tf.cast(x_display,tf.uint8))
axs.flat[0].set_title(str('clean, label=')+str(tf.get_static_value(y[rand_sample_id])))
axs.flat[1].imshow(tf.cast(x_pgd_display,tf.uint8))
axs.flat[1].set_title(str('pgd, label=')+str(np.argmax(tf.get_static_value(y_pred_pgd[rand_sample_id]))))
axs.flat[2].imshow(perturbation[:,:,0], cmap='gray')
axs.flat[2].set_title('Perturbation Mask')
plt.tight_layout()
plt.show()