
# cGAN for FAS

- **Author:** Matheus Ferreira Silva 
- **Email:** matheus.ferreira@get.inatel.br
- **Date:** September 2024

## Imports

In [1]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

import keras
from keras import layers
import tensorflow as tf
import numpy as np
import scipy.io as sio
from tensorflow.keras.optimizers import AdamW
from tensorflow.keras.callbacks import EarlyStopping, Callback

2024-09-10 08:19:36.140668: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:485] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-09-10 08:19:36.182810: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:8454] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-09-10 08:19:36.195673: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1452] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


## GPU

In [2]:
# Specify which GPU to use (e.g., GPU 0)
os.environ["CUDA_VISIBLE_DEVICES"] = "0"

tf.config.optimizer.set_experimental_options({'layout_optimizer': False})

# Check if TensorFlow is using the correct GPU
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    print(f"TensorFlow is using the following GPU(s): {[gpu.name for gpu in gpus]}")
    print(f"Currently using GPU: {tf.test.gpu_device_name()}")
else:
    print("No GPU found. Running on CPU.")

TensorFlow is using the following GPU(s): ['/physical_device:GPU:0']
Currently using GPU: /device:GPU:0


I0000 00:00:1725967185.520417 4077036 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
I0000 00:00:1725967185.590212 4077036 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
I0000 00:00:1725967185.594947 4077036 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
I0000 00:00:1725967185.601284 4077036 cuda_executor.cc:1015] successful NUMA node read from SysFS ha

## Normalize data

In [3]:

#! Change the normalization method?
#! Normalize by mean and unit variance??

def normalize_mat_df(file_path, var, output_file_path, new_var_name):
    """
    Normalizes a complex dataset from a .mat file by applying normalization
    to each generation separately using the maximum absolute value for that generation.
    Saves the normalized dataset to a new .mat file with a different variable name.

    Args:
        file_path (str): Path to the .mat file.
        var (str): Variable name in the .mat file.
        output_file_path (str): Path where the normalized .mat file will be saved.
        new_var_name (str): New variable name for the normalized dataset in the saved .mat file.

    Returns:
        None
    """
    data = sio.loadmat(file_path)
    dataset = data[var]

    # Normalize each generation separately
    normalized_dataset = dataset / np.max(np.abs(dataset), axis=(0, 1), keepdims=True)

    sio.savemat(output_file_path, {new_var_name: normalized_dataset})
    print(f"Normalized data saved to {output_file_path} with variable name '{new_var_name}'")


# Normalize and save the datasets
normalize_mat_df(
    file_path="data/dataset_1_frame_20k_20dB/received_samples.mat",
    var="received_samples",
    output_file_path="data/dataset_1_frame_20k_20dB/received_samples_normalized.mat",
    new_var_name="received_samples_normalized",
)

normalize_mat_df(
    file_path="data/dataset_1_frame_20k_20dB/channel_history.mat",
    var="channel_history",
    output_file_path="data/dataset_1_frame_20k_20dB/channel_history_normalized.mat",
    new_var_name="channel_history_normalized",
)

normalize_mat_df(
    file_path="data/dataset_1_frame_20k_20dB/interpolated_channel.mat",
    var="interpolated_channel",
    output_file_path="data/dataset_1_frame_20k_20dB/interpolated_channel_normalized.mat",
    new_var_name="interpolated_channel_normalized",
)

normalize_mat_df(
    file_path="data/dataset_1_frame_20k_20dB/pilot_channel_estimative.mat",
    var="pilot_channel_estimative",
    output_file_path="data/dataset_1_frame_20k_20dB/pilot_channel_estimative_normalized.mat",
    new_var_name="pilot_channel_estimative_normalized",
)

FileNotFoundError: [Errno 2] No such file or directory: 'data/dataset_1_frame_20k_20dB/received_samples.mat'

## Load and Preprocess Data

In [141]:
def load_mat_df(file_path, var):
    """
    Loads a complex dataset from a .mat file, separates it into real and imaginary components,
    and returns a 4D matrix with dimensions N_Ports x Frame_size x 2 x Generations.

    Args:
        file_path (str): Path to the .mat file.
        var (str): Variable name in the .mat file.

    Returns:
        np.ndarray: A 4D array with real and imaginary parts separated along the third dimension.
    """
    data = sio.loadmat(file_path)[var]  # Load the data directly from the .mat file
    real_part = np.real(data)  # Extract the real part
    imag_part = np.imag(data)  # Extract the imaginary part
    return np.stack((real_part, imag_part), axis=2)  # Combine into a 4D array


# ------------------- Load the datasets with normalization ------------------- #
# received_samples = load_mat_df(
#     "data/dataset_1_frame_20k_20dB/received_samples_normalized.mat", "received_samples_normalized"
# )
# pilot_channel_estimative = load_mat_df(
#     "data/dataset_1_frame_20k_20dB/pilot_channel_estimative_normalized.mat", "pilot_channel_estimative_normalized"
# ) # This represents only x ports that were estimated using pilot symbols
# perfect_channel = load_mat_df(
#     "data/dataset_1_frame_20k_20dB/channel_history_normalized.mat", "channel_history_normalized"
# ) # This is the real channel
# interpolated_channel = load_mat_df(
#     "data/dataset_1_frame_20k_20dB/interpolated_channel_normalized.mat", "interpolated_channel_normalized"
# ) # The interpolated channel is used for comparison purposes

# ------------------ Load the datasets without normalization ----------------- #
received_samples = load_mat_df("data/dataset_1_frame_20k_20dB/received_samples.mat", "received_samples")
pilot_channel_estimative = load_mat_df(
    "data/dataset_1_frame_20k_20dB/pilot_channel_estimative.mat", "pilot_channel_estimative"
)  # This represents only x ports that were estimated using pilot symbols
perfect_channel = load_mat_df(
    "data/dataset_1_frame_20k_20dB/channel_history.mat", "channel_history"
)  # This is the real channel
interpolated_channel = load_mat_df(
    "data/dataset_1_frame_20k_20dB/interpolated_channel.mat", "interpolated_channel"
)  # The interpolated channel is used for comparison purposes

# Print the shapes of the datasets, type <class 'numpy.ndarray'>
print(f"Received samples shape: {received_samples.shape}")
print(f"Pilot estimation shape: {pilot_channel_estimative.shape}")
print(f"Perfect channel shape: {perfect_channel.shape}")
print(f"Interpolated channel shape: {interpolated_channel.shape}")

Received samples shape: (100, 1, 2, 20000)
Pilot estimation shape: (100, 1, 2, 20000)
Perfect channel shape: (100, 1, 2, 20000)
Interpolated channel shape: (100, 1, 2, 20000)


## Preparing Input for Model Training

In [142]:
batch_size = 64

# Transpose the data to fit the input structure (generations, ports, frame size, dimensions)
pilot_channel_estimative = np.transpose(pilot_channel_estimative, (3, 0, 1, 2))
perfect_channel = np.transpose(perfect_channel, (3, 0, 1, 2))

print(f"Pilot estimation shape: {pilot_channel_estimative.shape}")
print(f"Perfect channel shape: {perfect_channel.shape}")

# Create tf.data.Dataset
dataset = tf.data.Dataset.from_tensor_slices((pilot_channel_estimative, perfect_channel))
dataset = dataset.shuffle(buffer_size=1024).batch(batch_size)

Pilot estimation shape: (20000, 100, 1, 2)
Perfect channel shape: (20000, 100, 1, 2)


## cGAN Training

#### Training Parameters

In [143]:
epochs = 50 # Number of epochs for the training
patience = 10 # Patience for the early stopping
num_ports = 100  # Modify this based on your data
frame_size = 1  # Modify this based on your data
latent_dim = 128
dimensions = 2  # real and imaginary parts

#### GAN Model

In [144]:
generator = keras.Sequential(
    [
        keras.layers.InputLayer((latent_dim,)),
        layers.Dense(100 * 1 * 64),
        layers.LeakyReLU(negative_slope=0.2),
        layers.Reshape((100, 1, 64)),  # Corrected this line to use a tuple (100, 1, 64)
        layers.Conv2DTranspose(64, kernel_size=(3, 1), strides=(1, 1), padding="same"),
        layers.LeakyReLU(negative_slope=0.2),
        layers.Conv2DTranspose(32, kernel_size=(3, 1), strides=(1, 1), padding="same"),
        layers.LeakyReLU(negative_slope=0.2),
        layers.Conv2D(2, (3, 1), padding="same", activation="sigmoid"),  # Output shape (100, 1, 2)
    ],
    name="generator",
)

discriminator = keras.Sequential(
    [
        keras.layers.InputLayer((100, 1, 2)),
        layers.Conv2D(64, kernel_size=(3, 1), strides=(1, 1), padding="same"),
        layers.LeakyReLU(negative_slope=0.2),
        layers.Conv2D(128, kernel_size=(3, 1), strides=(1, 1), padding="same"),
        layers.LeakyReLU(negative_slope=0.2),
        layers.GlobalMaxPooling2D(),
        layers.Dense(1),
    ],
    name="discriminator",
)

In [145]:
class GAN(keras.Model):
    """
    GAN (Generative Adversarial Network) class that inherits from keras.Model.

    Parameters:
    - discriminator: The discriminator model (a neural network that classifies real vs. fake images)
    - generator: The generator model (a neural network that generates fake images from latent space)
    - latent_dim: Dimensionality of the latent space (input noise vector for the generator)
    """

    def __init__(self, discriminator, generator, latent_dim):
        """
        Initialize the GAN model by setting the discriminator, generator, and latent dimension.
        Also initializes loss trackers for both generator and discriminator.
        """
        super().__init__()  # Call the parent class (keras.Model) initializer
        self.discriminator = discriminator  # Discriminator model for distinguishing real/fake images
        self.generator = generator  # Generator model to generate fake images
        self.latent_dim = latent_dim  # Latent dimension used to sample random points for generator

        # Trackers for generator and discriminator loss
        self.gen_loss_tracker = keras.metrics.Mean(name="generator_loss")  # Mean metric for generator loss
        self.disc_loss_tracker = keras.metrics.Mean(
            name="discriminator_loss"
        )  # Mean metric for discriminator loss

    @property
    def metrics(self):
        """
        Return the list of metrics being tracked (generator and discriminator loss).
        This is required by keras to monitor these metrics during training.
        """
        return [self.gen_loss_tracker, self.disc_loss_tracker]

    def compile(self, d_optimizer, g_optimizer, loss_fn):
        """
        Configure the GAN model for training by setting the optimizers and loss function.

        Parameters:
        - d_optimizer: Optimizer for the discriminator (e.g., Adam)
        - g_optimizer: Optimizer for the generator (e.g., Adam)
        - loss_fn: Loss function to compute the loss for both generator and discriminator (e.g., binary crossentropy)
        """
        super().compile()  # Call the parent class (keras.Model) compile method
        self.d_optimizer = d_optimizer  # Optimizer for discriminator
        self.g_optimizer = g_optimizer  # Optimizer for generator
        self.loss_fn = loss_fn  # Loss function to compute both generator and discriminator losses

    def call(self, inputs):
        """
        Define the forward pass of the GAN model.

        Parameters:
        - inputs: Input data (typically noise vector for the generator).

        Returns:
        - The output of the generator (fake images).
        """
        latent_vector = inputs  # Inputs should be the latent vector
        return self.generator(latent_vector)  # Forward pass just returns the generator's output

    def train_step(self, data):
        """
        Perform a single training step of the GAN model, updating both the discriminator and generator.

        Parameters:
        - data: A tuple of real images and their labels (labels are not used here).

        Returns:
        - A dictionary containing the current generator loss (g_loss) and discriminator loss (d_loss).
        """
        real_images, _ = data  # Unpack real images (we discard the labels since they aren't needed)

        # Ensure real images are in the correct data type (float32)
        real_images = tf.cast(real_images, tf.float32)

        batch_size = tf.shape(real_images)[0]  # Get the batch size

        # Sample random points in the latent space for generator input (latent_dim = 128)
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))

        # Generate fake images from random latent vectors
        generated_images = self.generator(random_latent_vectors)

        # Combine real and fake images into a single batch
        combined_images = tf.concat([generated_images, real_images], axis=0)

        # Create labels for real (1) and fake (0) images
        labels = tf.concat([tf.ones((batch_size, 1)), tf.zeros((batch_size, 1))], axis=0)

        # Add random noise to the labels to introduce label smoothing (makes training more stable)
        labels += 0.05 * tf.random.uniform(tf.shape(labels))

        # Train the discriminator
        with tf.GradientTape() as tape:
            # Get predictions from the discriminator for the combined images (real and fake)
            predictions = self.discriminator(combined_images)
            # Calculate discriminator loss
            d_loss = self.loss_fn(labels, predictions)
        # Compute gradients for the discriminator
        grads = tape.gradient(d_loss, self.discriminator.trainable_weights)
        # Update the discriminator weights using the optimizer
        self.d_optimizer.apply_gradients(zip(grads, self.discriminator.trainable_weights))

        # Re-sample random points in the latent space for training the generator
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))

        # Create "misleading" labels (all ones) to trick the discriminator into thinking all generated images are real
        misleading_labels = tf.ones((batch_size, 1))

        # Train the generator
        with tf.GradientTape() as tape:
            # Generate new fake images
            generated_images = self.generator(random_latent_vectors)
            # Get predictions from the discriminator for these fake images
            predictions = self.discriminator(generated_images)
            # Calculate generator loss (want discriminator to think the generated images are real)
            g_loss = self.loss_fn(misleading_labels, predictions)

        # Compute gradients for the generator
        grads = tape.gradient(g_loss, self.generator.trainable_weights)

        # Update the generator weights using the optimizer
        self.g_optimizer.apply_gradients(zip(grads, self.generator.trainable_weights))

        # Update the loss trackers
        self.gen_loss_tracker.update_state(g_loss)
        self.disc_loss_tracker.update_state(d_loss)

        # Return the current losses for generator and discriminator
        return {"g_loss": self.gen_loss_tracker.result(), "d_loss": self.disc_loss_tracker.result()}

#### Train the Model

In [146]:
# Compile the GAN model with AdamW optimizer
gan = GAN(discriminator=discriminator, generator=generator, latent_dim=latent_dim)
gan.compile(
    d_optimizer=AdamW(),
    g_optimizer=AdamW(),
    loss_fn=keras.losses.BinaryCrossentropy(from_logits=True),
)

# Train the GAN model
gan.fit(
    dataset,
    epochs=epochs,
    validation_data=dataset.take(20)  # Assuming 20 batches for validation
)

"""## Generate and save sample images"""
def generate_images(generator, latent_dim, num_images=10):
    random_latent_vectors = tf.random.normal(shape=(num_images, latent_dim))
    generated_images = generator(random_latent_vectors)
    return generated_images.numpy()

# Generate images after training
fake_images = generate_images(gan.generator, latent_dim)
print("Generated fake images shape:", fake_images.shape)

Epoch 1/50


[1m312/313[0m [32m━━━━━━━━━━━━━━━━━━━[0m[37m━[0m [1m0s[0m 47ms/step - d_loss: -0.8166 - g_loss: 0.0941

ValueError: Exception encountered when calling Sequential.call().

[1mInvalid input shape for input Tensor("Cast:0", shape=(None, 100, 1, 2), dtype=float32). Expected shape (None, 128), but input has incompatible shape (None, 100, 1, 2)[0m

Arguments received by Sequential.call():
  • inputs=tf.Tensor(shape=(None, 100, 1, 2), dtype=float32)
  • training=None
  • mask=None