# Resnet model Evaluation

In [1]:
import os
import sys
# 0 = all logs, 1 = filter INFO, 2 = filter INFO & WARNING, 3 = filter INFO, WARNING & ERROR
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"  # use "3" to hide even ERROR logs

import tensorflow as tf

# Silence TensorFlow's Python logger as well
tf.get_logger().setLevel("ERROR")

# Silence absl logs that TF uses
import absl.logging
absl.logging.set_verbosity(absl.logging.ERROR)


try:
    tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)
except Exception:
    pass

import tensorflow_datasets as tfds
from tensorflow.keras.applications.resnet50 import preprocess_input
from tensorflow.keras.applications import ResNet50
import matplotlib.pyplot as plt
import seaborn as sns

In [2]:
# Constants
IMG_SIZE = 224
BATCH_SIZE = 200
AUTOTUNE = tf.data.AUTOTUNE
EPOCHS = 5 # Changed to a lower number for demonstration if retraining is needed
tf.random.set_seed(5)
dataset_dir = "../datasets"

if 'google.colab' in sys.modules:
    from google.colab import drive

    drive.mount('/content/drive')
    dataset_dir = "/content/drive/Othercomputers/Big Mac/datasets"


physical_gpus = tf.config.list_physical_devices('GPU')
print("Available GPUs:", physical_gpus)

try:
    tf.keras.mixed_precision.set_global_policy('float32') # Ensured float32 policy as per the original notebook
    if physical_gpus: # Check if GPUs are available before setting virtual device
        tf.config.experimental.set_virtual_device_configuration(
            physical_gpus[0],
            [tf.config.experimental.VirtualDeviceConfiguration(memory_limit=56320)]  # Limit RAM to 55GB to avoid starving PC
        )
        print("Using GPU with 55GB of memory")
except Exception as e:
    print(e)

Mounted at /content/drive
Available GPUs: []


In [None]:
# Load ImageNet data
def prepare_input_data(input):
    image = tf.cast(input['image'], tf.float32)
    image = tf.image.resize(image, (IMG_SIZE, IMG_SIZE)) # Corrected to use the casted image
    image = preprocess_input(image)
    label = input['label']
    return image, label

# dataset, info = tfds.load(
#     'imagenette',
#     shuffle_files=False,
#     with_info=True,
#     data_dir=dataset_dir
# )
dataset, info = tfds.load(
    'imagenet2012',
    shuffle_files=False,
    with_info=True,
    data_dir=dataset_dir
)

print(f'Train image count: {info.splits["train"].num_examples}')
print(f'Test image count: {info.splits["validation"].num_examples}')

train_dataset = dataset['train'].map(prepare_input_data, num_parallel_calls=AUTOTUNE).batch(BATCH_SIZE).prefetch(AUTOTUNE)
test_dataset = dataset['validation'].map(prepare_input_data, num_parallel_calls=AUTOTUNE).batch(BATCH_SIZE).prefetch(AUTOTUNE)

# Load ResNet50 model
# base_model = ResNet50(
#     include_top=True,
#     weights=None,
#     input_shape=(IMG_SIZE, IMG_SIZE, 3),
#     pooling=None,
#     classes=10,
#     classifier_activation='softmax'
# )

base_model = ResNet50(
    include_top=True,
    weights='imagenet',
    input_shape=(IMG_SIZE, IMG_SIZE, 3),
    pooling=None,
    classes=1000,
    classifier_activation='softmax'
)

# Functions for adversarial data generation and loading
def _bytes_feature(value):
    if isinstance(value, type(tf.constant(0))):
        value = value.numpy()
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

def _int64_feature(value):
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

def _create_adversary_with_pgd(model, images, labels, eps, eps_iter, nb_iter):
    x_adv = tf.identity(images)
    # Use from_logits=False because classifier_activation='softmax' means model outputs probabilities
    loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False)

    for _ in range(nb_iter):
        with tf.GradientTape() as tape:
            tape.watch(x_adv)
            prediction = model(x_adv, training=False)
            loss = loss_object(labels, prediction)

        gradients = tape.gradient(loss, x_adv)
        signed_grad = tf.sign(gradients)
        x_adv = x_adv + eps_iter * signed_grad
        perturbation = tf.clip_by_value(x_adv - images, -eps, eps)
        x_adv = images + perturbation

    return x_adv

def generate_adversarial_dataset(filename, dataset, model, eps, pgd_steps, pgd_step_size):
    options = tf.io.TFRecordOptions(compression_type="GZIP")
    num = 0
    dataset_iterator = iter(dataset)
    with tf.io.TFRecordWriter(filename, options=options) as writer:
        for i, (images, labels) in enumerate(dataset_iterator):
            print(f"Batch {i+1}")
            adv_images = _create_adversary_with_pgd(
                model=model,
                images=images,
                labels=labels,
                eps=eps,
                eps_iter=pgd_step_size,
                nb_iter=pgd_steps
            )

            for i in range(len(adv_images)):
                image_tensor = adv_images[i]
                label = labels[i]
                image_tensor_f16 = tf.cast(image_tensor, tf.float16)
                image_bytes = tf.io.serialize_tensor(image_tensor_f16)
                feature = {
                    'image': _bytes_feature(image_bytes),
                    'label': _int64_feature(label.numpy())
                }
                num += 1
                serialized_example = tf.train.Example(features=tf.train.Features(feature=feature)).SerializeToString()
                writer.write(serialized_example)
    print(f"Processed and saved: {num} images")

def _parse_function(proto):
    feature_description = {
        'image': tf.io.FixedLenFeature([], tf.string),
        'label': tf.io.FixedLenFeature([], tf.int64),
    }
    parsed_features = tf.io.parse_single_example(proto, feature_description)
    image_f16 = tf.io.parse_tensor(parsed_features['image'], out_type=tf.float16)
    label = parsed_features['label']
    image_f32 = tf.cast(image_f16, tf.float32)
    image_f32.set_shape([IMG_SIZE, IMG_SIZE, 3])
    return image_f32, label

base_model.compile(
    optimizer='adam',
    loss='sparse_categorical_crossentropy',
    metrics=[
        'accuracy',
        tf.keras.metrics.SparseTopKCategoricalAccuracy(k=5, name='top_5_accuracy'),
    ]
)


In [None]:
print("Training base model...\n")
# base_model.fit(train_dataset, verbose=1, batch_size=BATCH_SIZE, epochs=4)


In [None]:
# Create adversarial dataset (uncomment to run generation)
EPSILON = 8/255 # Maximum allowed change.
PGD_STEPS = 5 # use 5 steps
PGD_STEP_SIZE = 2/255 # change by much at each step
adversarial_test_file = f'{dataset_dir}/adversaries/test_dataset.tfrec'
print("Generate adversarial test data")
generate_adversarial_dataset(
    filename=adversarial_test_file,
    dataset=test_dataset,
    model=base_model,
    eps=EPSILON,
    pgd_steps=PGD_STEPS,
    pgd_step_size=PGD_STEP_SIZE
)
adversarial_train_file = f'{dataset_dir}/adversaries/train_dataset.tfrec'
print("Generate adversarial train data")
generate_adversarial_dataset(
    filename=adversarial_train_file,
    dataset=train_dataset,
    model=base_model,
    eps=EPSILON,
    pgd_steps=PGD_STEPS,
    pgd_step_size=PGD_STEP_SIZE
)


In [None]:

loaded_test_dataset = tf.data.TFRecordDataset(adversarial_test_file, compression_type='GZIP')
parsed_test_dataset = loaded_test_dataset.map(_parse_function).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

print("Computing baseline metrics...\n")

train_metrics = base_model.evaluate(train_dataset, verbose=1)
test_metrics = base_model.evaluate(test_dataset, verbose=1)
noisy_metrics = base_model.evaluate(parsed_test_dataset, verbose=1)


In [None]:

# Extract metrics
try:
    metric_names = ['loss', 'accuracy', 'top_5_accuracy']
    metrics_dict = {
        'train': dict(zip(metric_names, train_metrics)),
        'test': dict(zip(metric_names, test_metrics)),
        'noisy': dict(zip(metric_names, noisy_metrics))
    }

    train_loss, train_acc, train_top5 = metrics_dict['train'].values()
    test_loss, test_acc, test_top5 = metrics_dict['test'].values()
    noisy_loss, noisy_acc, noisy_top5 = metrics_dict['noisy'].values()

except Exception as e:
    print(f"Error extracting metrics: {e}")

print("## Baseline Performance Metrics 📊")
print(f"Clean Data Accuracy: `{test_acc*100:.2f}%`")
print(f"Adversarial Data Accuracy: `{noisy_acc*100:.2f}%`")
print(f"* Robustness Gap: `{(test_acc-noisy_acc)*100:.2f}%`")

print("\n" + "---")
print(f"Top-1 Accuracy: `{test_acc*100:.2f}%`")
print(f"Top-5 Accuracy: `{test_top5*100:.2f}%`")
print(f"Loss: `{test_loss:.3f}`")


In [None]:
# Adversarial training
loaded_train_dataset = tf.data.TFRecordDataset(adversarial_train_file, compression_type='GZIP')
parsed_train_dataset = loaded_train_dataset.map(_parse_function).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

print("Adversarial Training robust model...\n")

# robust_model = ResNet50(
#     include_top=True,
#     weights=None,
#     input_shape=(IMG_SIZE, IMG_SIZE, 3),
#     pooling=None,
#     classes=10,
#     classifier_activation='softmax'
# )
#
robust_model = ResNet50(
    include_top=True,
    weights='imagenet',
    input_shape=(IMG_SIZE, IMG_SIZE, 3),
    pooling=None,
    classes=1000,
    classifier_activation='softmax'
)

robust_model.compile(
    optimizer='adam',
    loss='sparse_categorical_crossentropy',
    metrics=[
        'accuracy',
        tf.keras.metrics.SparseTopKCategoricalAccuracy(k=5, name='top_5_accuracy'),
    ]
)
robust_model.fit(parsed_train_dataset, verbose=1, batch_size=BATCH_SIZE, epochs=4)


In [None]:
# Adversarial evaluation
train_metrics_adv = robust_model.evaluate(parsed_train_dataset, verbose=1)
test_metrics_adv = robust_model.evaluate(test_dataset, verbose=1)
noisy_metrics_adv = robust_model.evaluate(parsed_test_dataset, verbose=1)

try:
    metrics_dict_adv = {
        'train': dict(zip(metric_names, train_metrics_adv)),
        'test': dict(zip(metric_names, test_metrics_adv)),
        'noisy': dict(zip(metric_names, noisy_metrics_adv))
    }

    train_loss_adv, train_acc_adv, train_top5_adv = metrics_dict_adv['train'].values()
    test_loss_adv, test_acc_adv, test_top5_adv = metrics_dict_adv['test'].values()
    noisy_loss_adv, noisy_acc_adv, noisy_top5_adv = metrics_dict_adv['noisy'].values()

except Exception as e:
    print(f"Error extracting metrics after adversarial training: {e}")
    print("Available metrics:", base_model.metrics_names)
    raise

# Create figure for multiple plots - After adversarial training
plt.style.use('seaborn-v0_8-darkgrid') # Updated style for better visuals
fig = plt.figure(figsize=(18, 6)) # Larger figure size

# 1. Accuracy Comparison (Post-Adversarial Training)
plt.subplot(1, 3, 1)
metrics = ['Top-1 Acc', 'Top-5 Acc']
clean_scores_post_adv = [test_acc_adv*100, test_top5_adv*100]
noisy_scores_post_adv = [noisy_acc_adv*100, noisy_top5_adv*100]

x = range(len(metrics))
width = 0.35

plt.bar([i - width/2 for i in x], clean_scores_post_adv, width, label='Clean Data', color='mediumseagreen')
plt.bar([i + width/2 for i in x], noisy_scores_post_adv, width, label='Adversarial Data', color='salmon')
plt.ylabel('Percentage (%)')
plt.title('Accuracy Comparison (Post-Adversarial Training)')
plt.xticks(x, metrics)
plt.legend()
plt.ylim(0, 100) # Set y-limit for better comparison

# 2. Loss Comparison (Post-Adversarial Training)
plt.subplot(1, 3, 2)
plt.bar(['Training', 'Testing', 'Adversarial'],
        [train_loss_adv, test_loss_adv, noisy_loss_adv],
        color=['steelblue', 'mediumseagreen', 'salmon'])
plt.ylabel('Loss')
plt.title('Loss Comparison Across Datasets (Post-Adversarial Training)')

# 3. Robustness Gap (Post-Adversarial Training)
plt.subplot(1, 3, 3)
plt.bar(['Generalization Gap', 'Robustness Gap'],
        [(train_acc_adv - test_acc_adv)*100, (test_acc_adv - noisy_acc_adv)*100],
        color=['steelblue', 'salmon'])
plt.ylabel('Gap Percentage (%)')
plt.title('Model Gaps Analysis (Post-Adversarial Training)')

plt.tight_layout()
plt.show()

# Print detailed metrics report (Post-Adversarial Training)
print("## Adversarial Performance Metrics 📊")
print(f"Clean Data Accuracy: `{test_acc_adv*100:.2f}%`")
print(f"Adversarial Data Accuracy: `{noisy_acc_adv*100:.2f}%`")
print(f"Robustness Gap: `{(test_acc_adv-noisy_acc_adv)*100:.2f}%`")

print(f"Top-1 Accuracy: `{test_acc_adv*100:.2f}%`")
print(f"Top-5 Accuracy: `{test_top5_adv*100:.2f}%`")
print(f"Loss: `{test_loss_adv:.3f}`")

print("\n## Generalization Analysis 🧠")
print(f"Training Accuracy: `{train_acc_adv*100:.2f}%`")
print(f"Test Accuracy: `{test_acc_adv*100:.2f}%`")
print(f"Generalization Gap: `{(train_acc_adv-test_acc_adv)*100:.2f}%`")
print(f"Training Loss: `{train_loss_adv:.3f}`")
print(f"Test Loss: `{test_loss_adv:.3f}`")
print("> *A smaller gap indicates better generalization*")



Available GPUs: [PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]
Using GPU with 55GB of memory
[1mDownloading and preparing dataset Unknown size (download: Unknown size, generated: Unknown size, total: Unknown size) to ../datasets/imagenet2012/5.1.0...[0m


                                                                         

[1mDataset imagenet2012 downloaded and prepared to ../datasets/imagenet2012/5.1.0. Subsequent calls will reuse this data.[0m
Train image count: 1281167
Test image count: 50000
Training baseline model...

Epoch 1/4
[1m6406/6406[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m12606s[0m 2s/step - accuracy: 0.5888 - loss: 1.6944 - top_5_accuracy: 0.8293
Epoch 2/4
[1m6406/6406[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m12525s[0m 2s/step - accuracy: 0.6959 - loss: 1.1874 - top_5_accuracy: 0.8979
Epoch 3/4
[1m6406/6406[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m12640s[0m 2s/step - accuracy: 0.7627 - loss: 0.8899 - top_5_accuracy: 0.9337
Epoch 4/4
[1m 570/6406[0m [32m━[0m[37m━━━━━━━━━━━━━━━━━━━[0m [1m3:10:27[0m 2s/step - accuracy: 0.7918 - loss: 0.7675 - top_5_accuracy: 0.9478

KeyboardInterrupt: 