In [16]:
import tensorflow as tf
print(tf.__version__)

2.18.0


In [17]:
import tensorflow as tf
from tensorflow.compat.v1 import InteractiveSession

# Close any existing sessions
tf.keras.backend.clear_session()

# Configure new session
config = tf.compat.v1.ConfigProto()
config.gpu_options.allow_growth = True
session = InteractiveSession(config=config)

ERROR:tensorflow:An interactive session is already active. This can cause out-of-memory errors or some other unexpected errors (due to the unpredictable timing of garbage collection) in some cases. You must explicitly call `InteractiveSession.close()` to release resources held by the other session(s). Please use `tf.Session()` if you intend to productionize.


In [18]:
'''GAN model builder and util functions

[1] Radford, Alec, Luke Metz, and Soumith Chintala.
"Unsupervised representation learning with deep convolutional
generative adversarial networks." arXiv preprint arXiv:1511.06434 (2015).

'''
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras import layers
from tensorflow.keras.layers import Activation, Dense, Input
from tensorflow.keras.layers import Conv2D, Flatten
from tensorflow.keras.layers import Reshape, Conv2DTranspose
from tensorflow.keras.layers import LeakyReLU
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.layers import concatenate
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer


import numpy as np
import math
import matplotlib.pyplot as plt
import os


def generator(inputs,
              image_size,
              activation='sigmoid',
              labels=None,
              codes=None):
    """Build a Generator Model

    Stack of BN-ReLU-Conv2DTranpose to generate fake images.
    Output activation is sigmoid instead of tanh in [1].
    Sigmoid converges easily.

    Arguments:
        inputs (Layer): Input layer of the generator (the z-vector)
        image_size (int): Target size of one side
            (assuming square image)
        activation (string): Name of output activation layer
        labels (tensor): Input labels
        codes (list): 2-dim disentangled codes for InfoGAN

    Returns:
        Model: Generator Model
    """

    inputs_tensor = []
    image_resize = image_size//2
    # network parameters
    kernel_size = 5
    layer_filters = [256,128, 64, 32, 1]

    if labels is not None:
        if codes is None:
            # ACGAN labels
            # concatenate z noise vector and one-hot labels
            inputs_tensor = [inputs, labels]
        else:
            # infoGAN codes
            # concatenate z noise vector,
            # one-hot labels and codes 1 & 2
            inputs_tensor = [inputs, labels] + codes
        x = concatenate(inputs_tensor, axis=1)
    elif codes is not None:
        # generator 0 of StackedGAN
        inputs_tensor = [inputs, codes]
        x = concatenate(inputs_tensor, axis=1)
    else:
        # default input is just 100-dim noise (z-code)
        x = inputs

    x = Dense(image_resize * image_resize * layer_filters[0])(x)
    x = Reshape((image_resize, image_resize, layer_filters[0]))(x)

    for filters in layer_filters:
        # first two convolution layers use strides = 2
        # the last two use strides = 1
        if filters > layer_filters[-4]:
            strides = 2
        else:
            strides = 1
        x = tf.keras.layers.BatchNormalization()(x)
        x = Activation('relu')(x)
        x = Conv2DTranspose(filters=filters,
                            kernel_size=kernel_size,
                            strides=strides,
                            padding='same')(x)
    import tensorflow.python.keras.backend as K



    if activation is not None:
        x = Activation(activation)(x)


    # generator output is the synthesized image x
    return Model(inputs_tensor if len(inputs_tensor) > 1 else inputs, x, name='generator')

# class MyLeakyReLU(Layer):
#     def __init__(self, alpha=0.2, **kwargs):
#         super(MyLeakyReLU, self).__init__(**kwargs)
#         self.alpha = alpha

#     def call(self, inputs):
#         return tf.keras.backend.relu(inputs, alpha=self.alpha)


def discriminator(inputs,
                  activation='sigmoid',
                  num_labels=None,
                  num_codes=None):
    """Build a Discriminator Model

    Stack of LeakyReLU-Conv2D to discriminate real from fake
    The network does not converge with BN so it is not used here
    unlike in [1]

    Arguments:
        inputs (Layer): Input layer of the discriminator (the image)
        activation (string): Name of output activation layer
        num_labels (int): Dimension of one-hot labels for ACGAN & InfoGAN
        num_codes (int): num_codes-dim Q network as output
                    if StackedGAN or 2 Q networks if InfoGAN


    Returns:
        Model: Discriminator Model
    """
    kernel_size = 5
    layer_filters = [32, 64, 128, 256]

    x = inputs
    for filters in layer_filters:
        # first 3 convolution layers use strides = 2
        # last one uses strides = 1
        if filters == layer_filters[-1]:
            strides = 1
        else:
            strides = 2
        x = LeakyReLU(alpha=0.2)(x)
        x = Conv2D(filters=filters,
                   kernel_size=kernel_size,
                   strides=strides,
                   padding='same')(x)

    x = Flatten()(x)
    # default output is probability that the image is real
    outputs = Dense(1)(x)
    if activation is not None:
        print(activation)
        outputs = Activation(activation)(outputs)

    if num_labels:
        # ACGAN and InfoGAN have 2nd output
        # 2nd output is 10-dim one-hot vector of label
        layer = Dense(layer_filters[-2])(x)
        labels = Dense(num_labels)(layer)
        labels = Activation('softmax', name='label')(labels)
        if num_codes is None:
            outputs = [outputs, labels]
        else:
            # InfoGAN have 3rd and 4th outputs
            # 3rd output is 1-dim continous Q of 1st c given x
            code1 = Dense(1)(layer)
            code1 = Activation('sigmoid', name='code1')(code1)

            # 4th output is 1-dim continuous Q of 2nd c given x
            code2 = Dense(1)(layer)
            code2 = Activation('sigmoid', name='code2')(code2)

            outputs = [outputs, labels, code1, code2]
    elif num_codes is not None:
        # StackedGAN Q0 output
        # z0_recon is reconstruction of z0 normal distribution
        z0_recon = Dense(num_codes)(x)
        z0_recon = Activation('tanh', name='z0')(z0_recon)
        outputs = [outputs, z0_recon]

    return Model(inputs, outputs, name='discriminator')


def train(models, x_train, params):
    """Train the Discriminator and Adversarial Networks

    Alternately train Discriminator and Adversarial networks by batch.
    Discriminator is trained first with properly real and fake images.
    Adversarial is trained next with fake images pretending to be real
    Generate sample images per save_interval.

    # Arguments
        models (list): Generator, Discriminator, Adversarial models
        x_train (tensor): Train images
        params (list) : Networks parameters

    """
    # the GAN models
    generator, discriminator, adversarial = models
    # network parameters
    batch_size, latent_size, train_steps, model_name = params
    # the generator image is saved every 500 steps
    save_interval = 500
    # noise vector to see how the generator output
    # evolves during training
    noise_input = np.random.uniform(-1.0, 1.0, size=[16, latent_size])
    # number of elements in train dataset
    train_size = x_train.shape[0]
    for i in range(train_steps):
        # train the discriminator for 1 batch
        # 1 batch of real (label=1.0) and fake images (label=0.0)
        # randomly pick real images from dataset
        rand_indexes = np.random.randint(0,
                                         train_size,
                                         size=batch_size)
        real_images = x_train[rand_indexes]
        # generate fake images from noise using generator
        # generate noise using uniform distribution
        noise = np.random.uniform(-1.0,
                                  1.0,
                                  size=[batch_size, latent_size])
        # generate fake images
        fake_images = generator.predict(noise)
        # real + fake images = 1 batch of train data
        x = np.concatenate((real_images, fake_images))
        # label real and fake images
        # real images label is 1.0
        y = np.ones([2 * batch_size, 1])
        # fake images label is 0.0
        y[batch_size:, :] = 0.0
        # train discriminator network, log the loss and accuracy
        loss, acc = discriminator.train_on_batch(x, y)
        log = "%d: [discriminator loss: %f, acc: %f]" % (i, loss, acc)

        # train the adversarial network for 1 batch
        # 1 batch of fake images with label=1.0
        # since the discriminator weights are frozen
        # in adversarial network only the generator is trained
        # generate noise using uniform distribution
        noise = np.random.uniform(-1.0,
                                  1.0,
                                  size=[batch_size, latent_size])
        # label fake images as real or 1.0
        y = np.ones([batch_size, 1])
        # train the adversarial network
        # note that unlike in discriminator training,
        # we do not save the fake images in a variable
        # the fake images go to the discriminator input
        # of the adversarial for classification
        # log the loss and accuracy
        loss, acc = adversarial.train_on_batch(noise, y)
        log = "%s [adversarial loss: %f, acc: %f]" % (log, loss, acc)
        print(log)
        if (i + 1) % save_interval == 0:
            # plot generator images on a periodic basis
            plot_images(generator,
                        noise_input=noise_input,
                        show=False,
                        step=(i + 1),
                        model_name=model_name)

    # save the model after training the generator
    # the trained generator can be reloaded
    # for future MNIST digit generation
    generator.save(model_name + ".h5")


def plot_images(generator,
                noise_input,
                noise_label=None,
                noise_codes=None,
                show=False,
                step=0,
                model_name="gan"):
    """Generate fake images and plot them

    For visualization purposes, generate fake images
    then plot them in a square grid

    # Arguments
        generator (Model): The Generator Model for
            fake images generation
        noise_input (ndarray): Array of z-vectors
        show (bool): Whether to show plot or not
        step (int): Appended to filename of the save images
        model_name (string): Model name

    """
    os.makedirs(model_name, exist_ok=True)
    filename = os.path.join(model_name, "%05d.png" % step)
    print(filename)
    rows = int(math.sqrt(noise_input.shape[0]))
    if noise_label is not None:
        noise_input = [noise_input, noise_label]
        if noise_codes is not None:
            noise_input += noise_codes

    images = generator.predict(noise_input)
    plt.figure(figsize=(2.2, 2.2))
    num_images = images.shape[0]
    image_size = images.shape[1]
    for i in range(num_images):
        plt.subplot(rows, rows, i + 1)
        image = np.reshape(images[i], [image_size, image_size])
        plt.imshow(image, cmap='gray')
        plt.axis('off')
    plt.savefig(filename)
    if show:
        plt.show()
    else:
        plt.close('all')


def test_generator(generator):
    noise_input = np.random.uniform(-1.0, 1.0, size=[16, 100])
    plot_images(generator,
                noise_input=noise_input,
                show=True,
                model_name="test_outputs")

In [23]:
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, Reshape, Flatten, Dropout
from tensorflow.keras.layers import BatchNormalization, Activation, LeakyReLU
from tensorflow.keras.layers import Conv2D, Conv2DTranspose
from tensorflow.keras.optimizers import RMSprop
from tensorflow.keras.models import Model
from tensorflow.keras.utils import to_categorical
import numpy as np
import os
import pickle

def build_generator(latent_size, image_size, num_labels):
    """
    Build generator model with correct output dimensions.
    For 10x10 output:
    - Start with 3x3
    - First upsample to 5x5
    - Second upsample to 10x10
    """
    # Image generator input
    inputs = Input(shape=(latent_size,), name='z_input')
    label_input = Input(shape=(num_labels,), name='label_input')
    
    # Concatenate noise and label inputs
    x = tf.keras.layers.concatenate([inputs, label_input])
    
    # First dense layer
    # For 10x10 target, start with 3x3
    init_size = 3
    filters = 256
    
    x = Dense(filters * init_size * init_size)(x)
    x = BatchNormalization()(x)
    x = LeakyReLU(negative_slope=0.2)(x)
    x = Reshape((init_size, init_size, filters))(x)

    # First upsampling: 3x3 -> 5x5
    x = Conv2DTranspose(128, 3, strides=1, padding='valid')(x)
    x = BatchNormalization()(x)
    x = LeakyReLU(negative_slope=0.2)(x)

    # Second upsampling: 5x5 -> 10x10
    x = Conv2DTranspose(64, 3, strides=2, padding='same')(x)
    x = BatchNormalization()(x)
    x = LeakyReLU(negative_slope=0.2)(x)

    # Final convolution
    x = Conv2D(1, 3, padding='same', activation='tanh')(x)
    
    # Verify output shape
    assert x.shape[1:3] == (image_size, image_size), f"Generator output shape mismatch: expected {(image_size, image_size)}, got {x.shape[1:3]}"
    
    generator = Model([inputs, label_input], x, name='generator')
    return generator

def build_discriminator(input_shape, num_labels):
    """
    Build discriminator model suitable for 10x10 input images.
    """
    inputs = Input(shape=input_shape, name='discriminator_input')
    
    x = Conv2D(64, 3, strides=1, padding='same')(inputs)
    x = LeakyReLU(negative_slope=0.2)(x)
    x = Dropout(0.3)(x)
    
    x = Conv2D(128, 3, strides=2, padding='same')(x)
    x = LeakyReLU(negative_slope=0.2)(x)
    x = Dropout(0.3)(x)
    
    x = Flatten()(x)
    
    # Dense layers
    x = Dense(256)(x)
    x = LeakyReLU(negative_slope=0.2)(x)
    x = Dropout(0.3)(x)
    
    # Source output (real/fake)
    source_output = Dense(1, activation='sigmoid', name='source')(x)
    
    # Label output
    label_output = Dense(num_labels, activation='softmax', name='label')(x)
    
    discriminator = Model(inputs, [source_output, label_output], name='discriminator')
    return discriminator

def load_data(params):
    """Load and preprocess the dataset."""
    try:
        base_dir = params["dir"]
        method = 'MI' if params['mutual_info'] else 'Mean'
        
        data_filename = os.path.join(
            base_dir, 
            f'train_{params["Max_A_Size"]}x{params["Max_B_Size"]}_{method}.pickle'
        )
        label_filename = os.path.join(base_dir, 'YTrain.pickle')
        
        print(f"Loading data from: {data_filename}")
        print(f"Loading labels from: {label_filename}")
        
        with open(data_filename, 'rb') as f:
            x_train = pickle.load(f)
        with open(label_filename, 'rb') as f:
            y_train = pickle.load(f)
            
        return np.asarray(x_train), np.asarray(y_train)
        
    except FileNotFoundError as e:
        print(f"Error: Could not find data files in {base_dir}")
        print(f"Expected files: \n- {data_filename}\n- {label_filename}")
        raise e

def train(models, data, params):
    """Train the ACGAN model."""
    generator, discriminator, adversarial = models
    x_train, y_train = data
    batch_size, latent_size, train_steps, num_labels = params
    
    # Training loop
    for step in range(train_steps):
        # Train discriminator
        idx = np.random.randint(0, x_train.shape[0], batch_size)
        real_images = x_train[idx]
        real_labels = y_train[idx]
        
        # Generate fake images
        noise = np.random.uniform(-1.0, 1.0, (batch_size, latent_size))
        fake_labels = np.eye(num_labels)[np.random.choice(num_labels, batch_size)]
        fake_images = generator.predict([noise, fake_labels])
        
        # Prepare discriminator training data
        x = np.concatenate((real_images, fake_images))
        labels = np.concatenate((real_labels, fake_labels))
        y = np.ones((2 * batch_size, 1))
        y[batch_size:, :] = 0
        
        # Train discriminator
        d_loss = discriminator.train_on_batch(x, [y, labels])
        
        # Train generator through adversarial model
        noise = np.random.uniform(-1.0, 1.0, (batch_size, latent_size))
        fake_labels = np.eye(num_labels)[np.random.choice(num_labels, batch_size)]
        y = np.ones((batch_size, 1))
        
        a_loss = adversarial.train_on_batch([noise, fake_labels], [y, fake_labels])
        
        if step % 100 == 0:
            print(f'Step {step}: [D loss: {d_loss[0]:.4f}, acc: {d_loss[3]:.4f}] [A loss: {a_loss[0]:.4f}]')

def build_and_train_models(params):
    """Main function to build and train the ACGAN."""
    session = configure_session()
    
    try:
        # Load and preprocess data
        x_train, y_train = load_data(params)
        image_size = x_train.shape[1]
        print(f"Input image size: {image_size}x{image_size}")
        
        # Reshape and normalize data
        x_train = np.reshape(x_train, [-1, image_size, image_size, 1])
        x_train = x_train.astype('float32') / 255 * 2 - 1  # Scale to [-1, 1]
        
        # Convert labels to categorical
        num_labels = len(np.unique(y_train))
        y_train = to_categorical(y_train)
        print(f"Number of classes: {num_labels}")
        
        # Network parameters
        latent_size = 100
        batch_size = 32  # Reduced batch size for smaller images
        train_steps = 40000
        lr = 1e-4  # Reduced learning rate for stability
        decay = 6e-8
        
        # Build models
        generator = build_generator(latent_size, image_size, num_labels)
        discriminator = build_discriminator((image_size, image_size, 1), num_labels)
        
        # Print model summaries
        print("\nGenerator Summary:")
        generator.summary()
        print("\nDiscriminator Summary:")
        discriminator.summary()
        
        # Build adversarial model
        discriminator.trainable = False
        noise_input = Input(shape=(latent_size,))
        label_input = Input(shape=(num_labels,))
        gen_output = generator([noise_input, label_input])
        adversarial_output = discriminator(gen_output)
        adversarial = Model([noise_input, label_input], adversarial_output)
        
        # Compile models
        optimizer = RMSprop(learning_rate=lr, decay=decay)
        loss = ['binary_crossentropy', 'categorical_crossentropy']
        
        discriminator.compile(optimizer=optimizer, loss=loss, metrics=['accuracy'])
        adversarial.compile(optimizer=optimizer, loss=loss, metrics=['accuracy'])
        
        # Train models
        models = (generator, discriminator, adversarial)
        data = (x_train, y_train)
        params = (batch_size, latent_size, train_steps, num_labels)
        
        train(models, data, params)
        
        return generator
        
    finally:
        session.close()

if __name__ == '__main__':
    params = {
        "Max_A_Size": 10,
        "Max_B_Size": 10,
        "Dynamic_Size": False,
        "Metod": "tSNE",
        "ValidRatio": 0.1,
        "seed": 180,
        "dir": "D:/IDS/Datasets/AAGM",
        "Mode": "CNN2",
        "LoadFromJson": False,
        "mutual_info": True,
        "hyper_opt_evals": 20,
        "epoch": 150,
        "No_0_MI": False,
        "autoencoder": False,
        "cut": None
    }
    
    try:
        generator = build_and_train_models(params)
        generator.save('acgan_generator.h5')
    except Exception as e:
        print(f"Error during training: {str(e)}")

Loading data from: D:/IDS/Datasets/AAGM\train_10x10_MI.pickle
Loading labels from: D:/IDS/Datasets/AAGM\YTrain.pickle
Input image size: 10x10
Number of classes: 2

Generator Summary:



Discriminator Summary:




[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 342ms/step
Error during training: For a model with multiple outputs, when providing the `metrics` argument as a list, it should have as many entries as the model has outputs. Received:
metrics=['accuracy']
of length 1 whereas the model has 2 outputs.


