In [None]:
# !pip install tensorflow numpy

In [None]:
!pip list

In [None]:
import tensorflow as tf
import keras as K
# import tensorflow_datasets as tfds
import numpy as np
# import matplotlib.pyplot as plt
import os

os.environ["TF_ENABLE_ONEDNN_OPTS"]= "0"

### Saving and Loading Function

In [None]:
def save(gan, generator, discriminator, model_folder, prefix="ACGAN"):
    """
    Save the model weights
    Args:
    - path: Path to save the model weights
    - prefix: Prefix for the model weights
    """
    if not os.path.exists(model_folder):
        os.makedirs(model_folder)
    
    #save generator and discriminator
    discriminator.trainable = True
    K.models.save_model(generator, f"{model_folder}/{prefix}/generator")
    K.models.save_model(discriminator, f"{model_folder}/{prefix}/discriminator")
    
    discriminator.trainable = False
    # save model
    print("Saving AC-GAN")
    K.models.save_model(gan, f"{model_folder}/{prefix}/model")


def load(model_folder, prefix="ACGAN"):
    """
    Load the model weights
    Args:
    """
    generator = K.models.load_model(f"{model_folder}/{prefix}/generator")
    discriminator = K.models.load_model(f"{model_folder}/{prefix}/discriminator")


    gan = K.models.load_model(f"{model_folder}/{prefix}/model")
    
    generator.summary()
    discriminator.summary()
    
    gan.summary()

    return generator, discriminator, gan

# AC-GAN Model

## AC-GAN Generator

In [None]:
class Generator(K.Model):
    """
    Generator component of AC-GAN for MNIST dataset

    Args:
    - latent_dim: Dimension of the latent space (generated as noise)
    - n_classes: Number of classes(labels) in the dataset (default=10)

    inherited from https://github.com/kochlisGit/Generative-Adversarial-Networks/blob/main/mnist-digits-acgan/digits-acgan.py

    """
    def __init__(self, latent_dim, n_classes=10):
        super(Generator, self).__init__()
        self.latent_dim = latent_dim
        self.n_classes = n_classes

        # Layers for Latent Inputs
        self.dense1 = K.layers.Dense(units=7 * 7 * 256, use_bias=False)
        self.bn1 = K.layers.BatchNormalization()
        self.reshape1 = K.layers.Reshape(target_shape=[7, 7, 256])

        # Layers for Label Inputs
        self.embedding = K.layers.Embedding(input_dim=n_classes, output_dim=64)
        self.dense2 = K.layers.Dense(units=7*7, use_bias=False)
        self.bn2 = K.layers.BatchNormalization()
        self.reshape2 = K.layers.Reshape(target_shape=(7, 7, 1))

        # Layers for Merging Inputs (Combining Latent and Label Inputs)
        self.conv1 = K.layers.Conv2DTranspose(filters=128, kernel_size=5, strides=1, padding='same', use_bias=False)
        self.bn3 = K.layers.BatchNormalization()
        self.dropout1 = K.layers.Dropout(rate=0.4)
        self.conv2 = K.layers.Conv2DTranspose(filters=64, kernel_size=5, strides=2, padding='same', use_bias=False)
        self.bn4 = K.layers.BatchNormalization()
        self.dropout2 = K.layers.Dropout(rate=0.4)
        self.conv3 = K.layers.Conv2DTranspose(filters=1, kernel_size=5, strides=2, padding='same', activation='tanh')

    @tf.function
    def call(self, inputs, training=True):
        """
        Forward pass of the generator
        - latent_inputs: Random noise from the latent space, using for generating images
        - label_inputs: Labels for the images to be generated
        - training: Boolean flag for whether training or testing
        """
        latent_inputs, label_inputs = inputs

        # Latent Inputs Layer (Dense Layer + BatchNorm + ReLU + Reshape)
        x1 = self.dense1(latent_inputs)
        x1 = self.bn1(x1, training=training)
        x1 = K.layers.LeakyReLU()(x1)
        x1 = self.reshape1(x1)

        # Process label inputs
        x2 = self.embedding(label_inputs)
        x2 = self.dense2(x2)
        x2 = self.bn2(x2, training=training)
        x2 = K.layers.LeakyReLU()(x2)
        x2 = self.reshape2(x2)

        #
        merged_inputs = K.layers.Concatenate()([x1, x2])
        x = self.conv1(merged_inputs)
        x = self.bn3(x, training=training)
        x = K.layers.LeakyReLU()(x)
        x = self.dropout1(x, training=training)
        x = self.conv2(x)
        x = self.bn4(x, training=training)
        x = K.layers.LeakyReLU()(x)
        x = self.dropout2(x, training=training)
        x = self.conv3(x)

        return x


## AC-GAN Discriminator

In [None]:
class Discriminator(K.Model):
    """
    Discriminator component of AC-GAN for MNIST dataset

    Args:
    - n_classes: Number of classes(labels) in the dataset (default=10) which predicted (discriminated) by the Discriminator
    """
    def __init__(self, n_classes=10):
        super(Discriminator, self).__init__()
        self.n_classes = n_classes

        # Define layers
        self.gaussian_noise = K.layers.GaussianNoise(stddev=0.2)
        self.conv1 = K.layers.Conv2D(filters=64, kernel_size=5, strides=2, padding='same', use_bias=False)
        self.bn1 = K.layers.BatchNormalization()
        self.dropout1 = K.layers.Dropout(rate=0.4)
        self.conv2 = K.layers.Conv2D(filters=128, kernel_size=5, strides=2, padding='same', use_bias=False)
        self.bn2 = K.layers.BatchNormalization()
        self.dropout2 = K.layers.Dropout(rate=0.4)

        # flatten layer
        self.flatten = K.layers.Flatten()

        # Output layers: 2 Dense Layer for validity and label prediction
        self.dense1 = K.layers.Dense(units=1, activation='sigmoid') # dense layer for validity the image
        self.dense2 = K.layers.Dense(units=n_classes, activation='softmax') # dense layer for classifying the label

    @tf.function
    def call(self, inputs, training=True):
        """
        Forward pass of the discriminator
        Args:
        - inputs: Input images to be discriminated. Passing the input (generated by the Generator) through the Discriminator
        and output the validity and label prediction
        - training: Boolean flag for whether training or testing

        Returns:
        - validity: Validity of the input image that the discriminator predicts
        - label: Label of the input image that the discriminator predicts
        """
        x = self.gaussian_noise(inputs)
        x = self.conv1(x)
        x = self.bn1(x, training=training)
        x = K.layers.LeakyReLU()(x)
        x = self.dropout1(x, training=training)
        x = self.conv2(x)
        x = self.bn2(x, training=training)
        x = K.layers.LeakyReLU()(x)
        x = self.dropout2(x, training=training)

        x = self.flatten(x)

        # Output layers
        validity = self.dense1(x)
        label = self.dense2(x)

        return validity, label


## AC-GAN Architecture

In [None]:
class ACGAN(K.Model):
    def __init__(self, generator, discriminator, latent_dim, n_classes=10):
        super(ACGAN, self).__init__()
        self.generator = generator
        self.discriminator = discriminator
        self.latent_dim = latent_dim
        self.n_classes = n_classes
        self.generator_optimizer = K.optimizers.Adam(learning_rate=0.0002, beta_1=0.5, beta_2=0.999)
        self.discriminator_optimizer = K.optimizers.Adam(learning_rate=0.0002, beta_1=0.5, beta_2=0.999)

        # Define loss functions with label smoothing
        self.binary_loss = K.losses.BinaryCrossentropy(label_smoothing=0.25) #
        self.sparse_categorical_loss = K.losses.SparseCategoricalCrossentropy()

    def compile(self):
        super(ACGAN, self).compile()

        # Set the discriminator to not trainable initially
        self.discriminator.trainable = False

        # # Compile the combined model
        # self.compile(
        #     optimizer=self.generator_optimizer,
        #     loss=[self.binary_loss, self.sparse_categorical_loss]
        # )
    def call(self, inputs, training=False):
        """
        Forward pass of the ACGAN model.

        Args:
        - inputs: A list containing [latent_inputs, label_inputs] which refers to the random noise
        - training: Boolean flag for whether training or testing

        Returns:
        - discriminated_validity: Validity of the input image that the discriminator predicts
        - discriminated_label: Label of the input image that the discriminator predicts
        """
        latent_inputs, label_inputs = inputs
        generated_images = self.generator([latent_inputs, label_inputs], training=training)
        discriminated_validity, discriminated_label = self.discriminator(generated_images, training=training)
        return discriminated_validity, discriminated_label
    
    def train_step(self, data):
        """
        Training step for the ACGAN model
        Args:
        - data: A batch of real images getting from the dataset (i.e. MNIST), this contains the images and labels,
        and the corresponding shape and size of the images
        """
        x_batch, y_batch = data
        batch_size = tf.shape(x_batch)[0]

        # =========================== Ground Truth labels =======================================
        real_labels = tf.ones((batch_size, 1))
        fake_labels = tf.zeros((batch_size, 1))
        mixed_labels = tf.concat([real_labels, fake_labels], axis=0)
        #========================================================================================



        # ====================== Generate the Noise for Discriminator ===========================

        # Generate random noise and random labels from the latent space
        random_latent_noise = tf.random.normal(shape=[batch_size, self.latent_dim]) #shape=[32,128]
        # Categorical labels, TODO: Using uint8 because we do not use One-Hot for Y-label
        # random_labels = tf.random.uniform(shape=[batch_size], minval=0, maxval=self.n_classes, dtype=tf.float32) #shape=[32,?].
        random_labels = np.random.randint(0, n_classes, size=[batch_size])
        # random_labels_one_hot = tf.one_hot(random_labels, depth=self.n_classes, dtype=tf.uint8)

        # Generate images from random noise and labels by Generator
        generated_images = self.generator([random_latent_noise, random_labels], training=True)

        # Mixed the real and generated images and labels for Discriminator (Concatenating)
        mixed_images = tf.concat([x_batch, generated_images], axis=0)
        mixed_generated_labels = tf.concat([y_batch, random_labels], axis=0)
        # mixed_generated_labels = tf.concat([y_batch, random_labels_one_hot], axis=0)

        #========================================================================================


        # =========================== Train the Discriminator ====================================
        self.discriminator.trainable = True # Set the discriminator to trainable

        with tf.GradientTape() as tape:
            discriminated_validity, discriminated_label = self.discriminator(mixed_images, training=True)

            discriminator_loss = [
                self.binary_loss(mixed_labels, discriminated_validity), # validity loss
                self.sparse_categorical_loss(mixed_generated_labels, discriminated_label) #label loss
            ]

            total_discriminator_loss = tf.reduce_mean(discriminator_loss[0]) + tf.reduce_mean(discriminator_loss[1])

        gradients_D = tape.gradient(total_discriminator_loss, self.discriminator.trainable_variables)

        self.discriminator_optimizer.apply_gradients(zip(gradients_D, self.discriminator.trainable_variables))

        #========================================================================================



        # =========================== Train the Generator =================================================
        self.discriminator.trainable = False # Set the discriminator to not trainable

        with tf.GradientTape() as tape:
            generated_images = self.generator([random_latent_noise, random_labels], training=True)
            discriminated_validity, discriminated_label = self.discriminator(generated_images, training=False)

            generator_loss = [
                self.binary_loss(real_labels, discriminated_validity),
                self.sparse_categorical_loss(random_labels, discriminated_label)
            ]

            total_generator_loss = tf.reduce_mean(generator_loss[0]) + tf.reduce_mean(generator_loss[1])

        gradients_G = tape.gradient(total_generator_loss, self.generator.trainable_variables)
        self.generator_optimizer.apply_gradients(zip(gradients_G, self.generator.trainable_variables))

        #========================================================================================

        return {
            "d_loss": total_discriminator_loss,
            "g_loss": total_generator_loss
        }


    def generate_images(self, latent_space, labels):
        """
        Generate images from the latent space and labels. Using Generator only.
        Args:
        - latent_space: Random noise from the latent space
        - labels: Labels for the images to be generated
        """
        return self.generator([latent_space, labels], training=False)


# =================================================================================================

## Training AC-GAN

### Load the dataset

In [None]:
# Load the dataset
# mnist_train, mnist_test = tfds.load('mnist', split=['train', 'test'],data_dir='~/tensorflow_datasets',  as_supervised=True)

(x_train, y_train), (x_test, y_test) = K.datasets.mnist.load_data(path="mnist.npz")

print("Training set:", x_train.shape)
print("Training label:", y_train.shape)
print("Test set:", x_test.shape)
print("Test label:", y_test.shape)

In [None]:

print("Normalizing and Reshaping the data...")

x_train = x_train.reshape(x_train.shape[0], 28, 28, 1).astype(np.float32)
x_train = (x_train - 127.5) / 127.5

x_test = x_test.reshape(x_test.shape[0], 28, 28, 1).astype(np.float32)
x_text = (x_test - 127.5) / 127.5

# NOT USED: Convert labels to one-hot encoding ---------------------------
# BECAUSE IT MIXED WITH SHAPE OF RANDOM NEIGHBOR, WHICH IS [32] 
# USE ONE HOT LABEL MAKE SHAPE BECOME: [32,10]
# y_train_one_hot = K.utils.to_categorical(y_train, num_classes=10)
# y_test_one_hot = K.utils.to_categorical(y_test, num_classes=10)
# y_train_one_hot = tf.cast(y_train_one_hot, dtype=tf.uint8)
# y_test_one_hot = tf.cast(y_test_one_hot, dtype=tf.uint8) #cast to uint8
# print("Train label one hot", y_train_one_hot.shape)
# print("Test label one hot", y_test_one_hot.shape)
#--------------------------------------------------------------------------

print("Train label", y_train.shape)
print("Test label", y_test.shape)

print("Completed preprocessing the data!!")

### Hyperparameters

In [None]:
# hyperparameters
latent_dim = 128 #noise size
n_classes = 10
batch_size = 32
epochs = 50

### Create Generator and Discriminator

In [None]:
# Create the AC-GAN model
generator = Generator(latent_dim, n_classes)
discriminator = Discriminator(n_classes)

acgan = ACGAN(generator, discriminator, latent_dim, n_classes)

acgan.compile()



### Training

In [None]:
# Training
# # Create the dataset
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(buffer_size=x_train.shape[0])
inputs = train_dataset.batch(batch_size=batch_size, drop_remainder=True).prefetch(buffer_size=tf.data.AUTOTUNE)

batches_per_epoch = x_train.shape[0] // batch_size

for epoch in range(epochs):
    print(f"Epoch {epoch+1}/{epochs}")

    for i, (x_batch, y_batch) in enumerate(inputs):
        losses = acgan.train_step([x_batch, y_batch])

        if i % 200 == 0:
            print(f"Batch {i}/{batches_per_epoch}, Discriminator Loss: {losses['d_loss']}, Generator Loss: {losses['g_loss']}")

    print(f"\nEpoch ({epoch+1}/{epochs}): \n Discriminator Loss: {losses['d_loss']}, Generator Loss: {losses['g_loss']}\n")

print("Training complete!")        
# Save the model
print("Saving the model")
model_folder = "models"
save(acgan, generator, discriminator, model_folder, prefix="ACGAN")
        

### Test the Model

In [None]:
# def evaluate_acgan(acgan, x_test, y_test, batch_size=32):
#     """Evaluates the AC-GAN using the discriminator's auxiliary classifier."""
#     _, aux_output = acgan.discriminator.predict(x_test, batch_size=batch_size)
#     predicted_labels = np.argmax(aux_output, axis=1)
#     accuracy = np.mean(predicted_labels == y_test)
#     # print(f"AC-GAN Test Accuracy: {accuracy * 100:.2f}%")
#     return accuracy

# accuracy = evaluate_acgan(acgan, x_test, y_test)
# print(f"AC-GAN Test Accuracy: {accuracy * 100:.2f}%")

### Test Generation

In [None]:
# digits_per_class = 3
# random_noise = tf.random.normal(shape=[digits_per_class * n_classes, latent_dim])
# digit_targets = np.array([target for target in range(n_classes) for _ in range(digits_per_class)])
# generated_digits = generator.predict([random_noise, digit_targets])

# rows = 5
# cols = 6
# fig, axes = plt.subplots(nrows=rows, ncols=cols, figsize=(10, 10))
# for i, digit in enumerate(generated_digits):
#     digit = np.reshape(digit * 127.5 + 127.5, (28, 28))
#     ax = axes[i // cols, i % cols]
#     ax.imshow(digit, cmap='gray')
# plt.tight_layout()
# plt.show()

# AT - GAN

## Target Classifier

In [None]:
# AT-GAN MODELS: Extended from ACGAN for Adversarial Attack
class TargetClassifier(K.Model):
    """
    Target Classifier for the AT-GAN model.
    This simply acts as the classifier for the input images (MNIST) of either real or generated images.
    Using as the target for the attack.
    """
    def __init__(self, num_classes=10):
        super(TargetClassifier, self).__init__()

        # Classifier Layers
        self.conv1 = K.layers.Conv2D(32, (3, 3), activation='relu', padding='same', input_shape=(28, 28, 1))
        self.pool1 = K.layers.MaxPooling2D((2, 2))
        self.conv2 = K.layers.Conv2D(64, (3, 3), activation='relu', padding='same')
        self.pool2 = K.layers.MaxPooling2D((2, 2))

        self.flatten = tf.keras.layers.Flatten()
        self.fc1 = tf.keras.layers.Dense(128, activation='relu')
        self.fc2 = tf.keras.layers.Dense(num_classes, activation='softmax')

    @tf.function
    def call(self, x, training=False):
        x = self.conv1(x)
        x = self.pool1(x)
        x = self.conv2(x)
        x = self.pool2(x)
        x = self.flatten(x)
        x = self.fc1(x)
        x = self.fc2(x)

        return x

## Attack Generator (`G_attack`)

In [None]:
class Attack_Generator(K.Model):
    """
    G_attack simply a copy of AC-GAN Generator, and used for the adversarial attack.
    Which transfering the output of the Generator to the Target Classifier.
    """
    def __init__(self, generator):
        super(Attack_Generator, self).__init__()
        self.generator = generator

    def call(self, inputs, training=False):
        return self.generator(inputs, training=training)


## AT-GAN

In [None]:
class ATGAN:
    def __init__(self, G_original, G_attack, f_target, noise_size, lambda_adv_at=2.0, lambda_dist=1.0):
        self.G_original = G_original # Original Generator (G_original)
        self.G_attack = G_attack # Adversarial Generator (G_attack)
        self.f_target = f_target    # Target Classifier (f_target)

        self.noise_size = noise_size # latent space size

        self.lambda_adv_at = lambda_adv_at  # lambda for adversarial loss
        self.lambda_dist = lambda_dist     # lambda for distance loss

        self.optimizer_G_attack = K.optimizers.Adam(learning_rate=0.0002, beta_1=0.5)
        self.sparse_categorical_loss = K.losses.SparseCategoricalCrossentropy()

    @tf.function
    def train_step_atgan(self, images, target_labels):
        batch_size = tf.shape(images)[0]

        with tf.GradientTape() as g_attack_tape:
            z = tf.random.normal([batch_size, self.noise_size])

            # Generate adversarial images
            adv_images = self.G_attack([z, target_labels], training=True)

            # Target classifier's prediction on adversarial images
            pred_adv = self.f_target(adv_images, training=False)

            # 1. Adversarial Loss (La) ========================================================

            la_loss = tf.reduce_mean(
                self.sparse_categorical_loss(target_labels, pred_adv)
            )

            # 2. Distance Loss (Ld) ========================================================
            # Add Gaussian noise
            noise = tf.random.normal(shape=tf.shape(adv_images), mean=0.0, stddev=0.1)
            adv_images_noisy = adv_images + noise

            # Original images generated by G_original
            orig_images = self.G_original([z, target_labels], training=False)

            ld_loss = tf.reduce_mean(tf.square(orig_images - adv_images_noisy))

            # Total adversarial loss for G_attack
            g_attack_loss = self.lambda_adv_at * la_loss + self.lambda_dist * ld_loss

        # Calculate G_attack gradients
        g_attack_gradients = g_attack_tape.gradient(g_attack_loss, self.G_attack.trainable_variables)
        self.optimizer_G_attack.apply_gradients(zip(g_attack_gradients, self.G_attack.trainable_variables))

        return g_attack_loss, la_loss, ld_loss


### Train AT-GAN

In [None]:
#Hyperparameters
epochs_atgan = 50

In [None]:
# Create and train target classifier
f_target = TargetClassifier()
f_target.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
f_target.fit(x_train, y_train_one_hot, epochs=5, batch_size=batch_size, validation_data=(x_test, y_test_one_hot))

In [None]:
# Create G_attack and AT-GAN
G_attack_instance = Attack_Generator(generator)
atgan = ATGAN(generator, G_attack_instance, f_target, latent_dim)

In [None]:
# Train AT-GAN
for epoch in range(epochs_atgan):
    print('\nTraining AT-GAN on epoch', epoch + 1)
    for i, (x_batch, _) in enumerate(inputs):
        target_labels = np.random.randint(0, n_classes, size=[batch_size])
        g_attack_loss, la_loss, ld_loss = atgan.train_step_atgan(x_batch, target_labels)

        if i % 200 == 0:
            print(f'Batch {i}, G_attack Loss: {g_attack_loss}, La Loss: {la_loss}, Ld Loss: {ld_loss}')

    print(f'\nEpoch ({epoch + 1}/{epochs_atgan}):\n G_attack Loss: {g_attack_loss}, La Loss: {la_loss}, Ld Loss: {ld_loss}\n')



### Evaluate AT-GAN

In [None]:

def evaluate_atgan(atgan, f_target, x_test, y_test, noise_size=128, n_classes=10, num_batches=100):
    """
    Evaluates the AT-GAN by generating adversarial examples and testing the target classifier.

    Args:
        atgan: The trained ATGAN model.
        f_target: The target classifier to be attacked.
        x_test: Test dataset images.
        y_test: True labels for the test dataset.
        noise_size: The dimension of the random noise vector.
        n_classes: Number of classes in the dataset.
        num_batches: Number of batches to use for evaluation.
    """
    target_classifier_fooled = 0

    for _ in range(num_batches):
        batch_indices = np.random.choice(len(x_test), size=batch_size)
        x_batch = x_test[batch_indices]
        y_batch = y_test[batch_indices]

        z = tf.random.normal([batch_size, noise_size])
        
        # Generate target labels that are different from the true labels
        target_labels = (y_batch + np.random.randint(1, n_classes, size=batch_size)) % n_classes

        # Generate adversarial examples
        adv_examples = atgan.G_attack([z, target_labels], training=False)

        # Classify adversarial examples with the target classifier
        predictions = f_target.predict(adv_examples)
        predicted_labels = np.argmax(predictions, axis=1)

        # Count how many times the target classifier was fooled
        target_classifier_fooled += np.sum(predicted_labels == target_labels)

    # Calculate the success rate of the attack
    fooling_rate = (target_classifier_fooled / (num_batches * batch_size)) * 100
    print(f"AT-GAN Attack Success Rate: {fooling_rate:.2f}%")
    return fooling_rate

attack_success_rate = evaluate_atgan(atgan, f_target, x_test, y_test)
print(f"AT-GAN Attack Success Rate: {attack_success_rate:.2f}%")

# Generate Adversarial Examples

In [None]:
def generate_and_save_adv_examples(atgan, f_target, noise_size, n_classes, num_examples_per_class, save_dir):
    """Generates and saves adversarial examples using the AT-GAN."""
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)

    for target_class in range(n_classes):
        z = tf.random.normal([num_examples_per_class, noise_size])
        target_labels = np.full((num_examples_per_class,), target_class)

        adv_examples = atgan.G_attack([z, target_labels], training=False)
        adv_examples = ((adv_examples + 1) * 127.5).numpy().astype(np.uint8)  # Rescale to 0-255

        for i, adv_example in enumerate(adv_examples):
            img = K.preprocessing.image.array_to_img(adv_example.reshape(28, 28, 1))
            img.save(os.path.join(save_dir, f"x_adv_{target_class}_{i}.png"))

# Example usage: Generate and save adversarial examples
num_examples_per_class = 10
save_dir = "adversarial_examples"
generate_and_save_adv_examples(atgan, f_target, latent_dim, n_classes, num_examples_per_class, save_dir)