In [None]:
import os
import time
from datetime import datetime
import numpy as np
import pandas as pd
import h5py
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from torchvision.datasets import MNIST
from torchvision.transforms import ToTensor
from torchvision.utils import make_grid
from sklearn.datasets import make_circles
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
from IPython.display import clear_output
from IPython import display
from scipy import fftpack
from matplotlib.colors import LogNorm


In [None]:
# Path to the dataset file
file_name_1 = './data/set1WR14.h5'

def h5_file_numpy(filename):
    """
    Reads a .h5 file containing micro-Doppler data, extracts the 'mDoppler' subgroup,
    and converts it into a NumPy array.

    Parameters:
    filename (str): Path to the .h5 file.

    Returns:
    np.ndarray: Extracted micro-Doppler data as a NumPy array.
    """
    with h5py.File(filename, 'r') as hdf_mDoppler_WR14:
        # List and display the top-level groups in the HDF5 file
        base_items = list(hdf_mDoppler_WR14.items())
        print('Subgroups in mDoppler_WR14:', base_items)

        # Extract the 'mDoppler' subgroup and convert it to a NumPy array
        WR14_mDoppler_numpy = np.array(hdf_mDoppler_WR14.get('mDoppler'))

    # Debugging: Display the extracted NumPy array
    print("Extracted micro-Doppler data:", WR14_mDoppler_numpy)
    
    return WR14_mDoppler_numpy

# Extract micro-Doppler data for subject 000
WR14_mDoppler_subject000_numpy = h5_file_numpy(file_name_1)


In [None]:
# Path to the timestamp CSV file
timestamp_location = './data/timestamp_speech.csv'

def miliseconds_past_func(timestamp_location):
    """
    Calculates the milliseconds elapsed since the first timestamp for each entry in the CSV file.

    Parameters:
    timestamp_location (str): Path to the CSV file containing timestamps.

    Returns:
    list: Milliseconds elapsed since the first timestamp.
    """
    # Load the timestamp data from the CSV file
    df_timestamps = pd.read_csv(timestamp_location)
    
    # Extract the first timestamp and convert it to milliseconds
    first_time = df_timestamps["timestamp"][0]
    first_timestamp_milliseconds = datetime.strptime(first_time, '%Y-%m-%dT%H:%M:%S.%f').timestamp() * 1000

    # Calculate elapsed time for each subsequent timestamp
    milliseconds_past = []
    for i in df_timestamps["timestamp"]:
        current_time = datetime.strptime(i, '%Y-%m-%dT%H:%M:%S.%f')
        millisec = current_time.timestamp() * 1000
        milliseconds_past.append(millisec - first_timestamp_milliseconds)

    print("Milliseconds elapsed since the first timestamp:", milliseconds_past)
    return milliseconds_past

# Calculate elapsed milliseconds for subject 000
milliseconds_past_subject000 = miliseconds_past_func(timestamp_location)

def frame_count(milliseconds_past):
    """
    Calculates the frame counts based on elapsed milliseconds.

    Parameters:
    milliseconds_past (list): Milliseconds elapsed since the first timestamp.

    Returns:
    list: Frame counts for each interval.
    """
    frame_count = []
    previous_time = 0

    for i in milliseconds_past:
        # Calculate frame count for the current interval
        frame_count.append(int((i - previous_time) // 90))
        previous_time = i

        # Add a final frame count of 41 frames for the last interval (3.7 seconds optimal sample length)
        if milliseconds_past[-1] == i:
            frame_count.append(41)

    print("Frame counts for each interval:", frame_count)
    print("Total time covered (seconds):", sum(frame_count) * 90 / 1000)
    return frame_count

# Calculate frame counts for subject 000
frame_count_subject000 = frame_count(milliseconds_past_subject000)


In [None]:

# Function to load timestamps from a CSV file
def timestamps(TimestampLocation):
    df_timestamps = pd.read_csv(TimestampLocation)
    return df_timestamps

# Load the timestamp data
df_timestamps = timestamps(timestamp_location)

# Function to prepare the data by processing the Doppler signals
def data_prepare(miliseconds_past, WR14_mDoppler_numpy, df_timestamps):
    # Initialize variables for action frame calculation and list storage
    action_before_ms, remaining_ms, actions_frame_sum, index_counter = 0, 0, 0, 0
    dtype_list = []  # List to store Doppler signal segments
    mDoppler_list_WR14 = []  # List to store Doppler signal data
    mDoppler_labels_WR14 = []  # List to store corresponding labels

    # Temporary array for storing data
    deneme_numpy_append = np.zeros(41)

    # Loop through the miliseconds_past list and process each time step
    for i in miliseconds_past:
        remaining_action_frame_count = 0  # Initialize remaining action frame count

        # Calculate the number of frames since the last action
        action_frame_count = int((i - action_before_ms) // 90)
        print(f"Action frame count: {action_frame_count}")

        # Case 1: Action frame count is greater than 40
        if action_frame_count > 40:
            # Extract a segment of the Doppler signal and store it
            mDoppler_list_WR14.append(np.array([WR14_mDoppler_numpy[actions_frame_sum:actions_frame_sum + 40]]))
            dtype_list.append(np.array([WR14_mDoppler_numpy[actions_frame_sum + 40:actions_frame_sum + 80]]))
            mDoppler_labels_WR14.append(df_timestamps["command"][index_counter])

            # If the action frame count exceeds 40, handle the remaining frames
            if (action_frame_count - 40) > 40:
                remaining_action_frame_count = action_frame_count - 40
                mDoppler_list_WR14.append(np.array([WR14_mDoppler_numpy[actions_frame_sum + 40:actions_frame_sum + 80]]))
                dtype_list.append(np.array([WR14_mDoppler_numpy[actions_frame_sum + 40:actions_frame_sum + 80]]))
                mDoppler_labels_WR14.append(df_timestamps["command"][index_counter])

                remaining_action_frame_count -= 40
                mDoppler_list_WR14.append(np.array([WR14_mDoppler_numpy[actions_frame_sum + 40 + remaining_action_frame_count:actions_frame_sum + 80 + remaining_action_frame_count]]))
                dtype_list.append(np.array([WR14_mDoppler_numpy[actions_frame_sum + 40 + remaining_action_frame_count:actions_frame_sum + 80 + remaining_action_frame_count]]))
                mDoppler_labels_WR14.append(df_timestamps["command"][index_counter])

            else:
                # Handle case where action frame count is <= 40 but > 0
                mDoppler_list_WR14.append(np.array([WR14_mDoppler_numpy[actions_frame_sum + (action_frame_count - 40):actions_frame_sum + 40 + (action_frame_count - 40)]]))
                dtype_list.append(np.array([WR14_mDoppler_numpy[actions_frame_sum + (action_frame_count - 40):actions_frame_sum + 40 + (action_frame_count - 40)]]))
                mDoppler_labels_WR14.append(df_timestamps["command"][index_counter])

        # Case 2: Action frame count is less than 40
        elif action_frame_count < 40:
            empty_rows = 40 - action_frame_count
            print(f"Action frame count < 40, action_frame_count: {action_frame_count}")
            print(f"Index: {index_counter}")

            # Extract available frames and resize if needed
            empty_rows_added = np.array(WR14_mDoppler_numpy[actions_frame_sum:(actions_frame_sum + action_frame_count)])
            print(f"Empty rows added: {empty_rows_added}")

            empty_rows_added_copy = empty_rows_added.copy()
            print(f"Shape before resize: {empty_rows_added_copy.shape}")

            # Resize the array to ensure it has 40 frames
            empty_rows_added_copy.resize((40, 128))
            print(f"Shape after resize: {empty_rows_added_copy.shape}")

            mDoppler_list_WR14.append([empty_rows_added_copy])
            mDoppler_labels_WR14.append(df_timestamps["command"][index_counter])

        # Case 3: Action frame count is exactly 40
        else:
            mDoppler_list_WR14.append(np.array([WR14_mDoppler_numpy[actions_frame_sum:actions_frame_sum + action_frame_count]]))
            mDoppler_labels_WR14.append(df_timestamps["command"][index_counter])

        # Update the frame sum for the next iteration
        actions_frame_sum += action_frame_count
        print(f"Action before sum: {action_before_ms}")
        index_counter += 1
        action_before_ms = i

    # Append the final data for the last frame
    mDoppler_list_WR14.append(np.array([WR14_mDoppler_numpy[actions_frame_sum:actions_frame_sum + 40]]))
    mDoppler_labels_WR14.append(df_timestamps["command"][11])

    print(f"Total frames in mDoppler list: {len(mDoppler_list_WR14)}")

    # Convert the data list to a numpy array for further processing
    WR14_mDoppler_numpy_deneme = np.array(mDoppler_list_WR14)
    
    return [WR14_mDoppler_numpy_deneme, mDoppler_labels_WR14]

# Example usage of the function
WR14_mDoppler_numpy_subject000_wlabels = data_prepare(milliseconds_past_subject000, WR14_mDoppler_subject000_numpy, df_timestamps)


In [None]:
# Unpack the result from the data_prepare function into separate variables
# WR14_mDoppler_numpy_subject000 contains the Doppler signal data
# WR14_mDoppler_labels_subject000 contains the corresponding labels for the Doppler signals
WR14_mDoppler_numpy_subject000 = WR14_mDoppler_numpy_subject000_wlabels[0]
WR14_mDoppler_labels_subject000 = WR14_mDoppler_numpy_subject000_wlabels[1]


In [None]:
# Normalize the data to a range between 0 and 1 using min-max normalization
# This scales the WR14_mDoppler_numpy_subject000 data by subtracting the minimum value and dividing by the range
# This method ensures that the data values are scaled within the [0, 1] interval
normalizedData = (WR14_mDoppler_numpy_subject000 - np.min(WR14_mDoppler_numpy_subject000)) / (np.max(WR14_mDoppler_numpy_subject000) - np.min(WR14_mDoppler_numpy_subject000))

# Alternatively, you can use linalg.norm for normalization by dividing the data by its Euclidean norm (this was previously commented out)
# normalizedData = WR14_mDoppler_numpy_deneme / np.linalg.norm(WR14_mDoppler_numpy_deneme)

# Print the first normalized data to check the result
print(normalizedData[1])


In [None]:
# Visualize the 14th element (slice) of the WR14_mDoppler_numpy_subject000 array as an image.
# The 'extent' argument specifies the axis limits (x-axis from 0 to 40, y-axis from 0 to 128).
# The 'aspect' is set to 'auto' to let matplotlib automatically adjust the aspect ratio.
plt.imshow(WR14_mDoppler_numpy_subject000[14].T, extent=[0, 40, 0, 128], aspect='auto')

# Display the shape of the first element of WR14_mDoppler_numpy_subject000 to understand its dimensions.
print(WR14_mDoppler_numpy_subject000[0].shape)

# Print the type of the first element to verify that it is a numpy array.
print(f"Type of WR14_mDoppler_numpy_subject000[0]: {type(WR14_mDoppler_numpy_subject000[0])}")


In [None]:
# Check if CUDA is available for GPU computation. If it is, set the device to 'cuda'; otherwise, use 'cpu'.
dev = 'cuda' if torch.cuda.is_available() else 'cpu'

# If CUDA is available, print a confirmation message
if torch.cuda.is_available():
    print("CUDA is available. Using GPU.")

# Set the device for PyTorch operations based on the availability of CUDA (GPU) or fall back to CPU.
device = torch.device(dev)


In [None]:
# Convert the list of labels into a set to get unique values for the WR14_mDoppler_labels_subject000 dataset.
# This will help us identify all unique labels present in the dataset.
mDoppler_labels_WR14_set = set(WR14_mDoppler_labels_subject000)

# Print the unique labels and the total count of unique labels
print(f"Unique labels in WR14_mDoppler_labels_subject000: {mDoppler_labels_WR14_set}")
print(f"Number of unique labels: {len(mDoppler_labels_WR14_set)}")

# Initialize an empty list to store the counter for each label's occurrence
label_list = []

# Loop through each label in WR14_mDoppler_labels_subject000
# For each label, compare it with all unique labels from the set
for labels in WR14_mDoppler_labels_subject000:
    counter = 1
    # Loop through the unique labels set and check for a match with the current label
    for set_index in mDoppler_labels_WR14_set:
        if labels == set_index:
            label_list.append(counter)
        counter += 1

# Print the total length of new_list and its contents
print(f"Length of label_list: {len(label_list)}")
print(f"Contents of label_list: {label_list}")


In [None]:
# Convert the WR14_mDoppler_numpy_subject000 (a NumPy array) into a PyTorch tensor
# This allows you to leverage PyTorch's GPU acceleration and tensor operations
tensor_x = torch.Tensor(WR14_mDoppler_numpy_subject000)

# Optional: Loop through the elements of my_x and print the length of each element (if needed for debugging)
# Uncomment the following lines if you want to inspect the length of each element in my_x
# for i in my_x:
#     print(len(i))


In [None]:
from torch.utils.data import Dataset, TensorDataset, DataLoader

# Normalize data and prepare it for use with PyTorch DataLoader
my_x = normalizedData

# Prepare the labels from new_list and reshape them into a 2D array (224, 1)
my_y = np.array(label_list)
my_y = my_y.reshape((len(label_list), 1))

# Convert the NumPy arrays to PyTorch tensors
tensor_x = torch.Tensor(my_x)
tensor_y = torch.Tensor(my_y)

# Create a TensorDataset from the input features and labels
my_dataset = TensorDataset(tensor_x, tensor_y)

# Initialize a DataLoader to handle batching and shuffling of the dataset
my_dataloader = DataLoader(my_dataset)

# Access the first sample in the dataset to check its shape
n_samples_deneme, n_features_deneme = my_dataset[0]

# Fetch a specific sample (index 3) from the dataset
image, label = my_dataset.__getitem__(3)

# Fetch the first sample to check the format of the data
aa = my_dataset.__getitem__(0)

# Print the label of the first sample
print(label)


In [None]:
# Define the batch size for loading data
BATCH_SIZE = 4

# Create a DataLoader instance for batching and shuffling the dataset
# The `num_workers` argument is set to use the number of CPU cores available for data loading
dataloader = DataLoader(my_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=os.cpu_count())


In [None]:
import matplotlib.pyplot as plt
import os
import torch
import torch.nn as nn
from torchvision.datasets import MNIST
from torchvision.transforms import ToTensor
from torchvision.utils import make_grid
from torch.utils.data import DataLoader
import time as time
import numpy as np
from IPython import display

In [None]:
# Fetch the first image and its corresponding label from the dataset
image, label = my_dataset.__getitem__(1)

# Display the image (squeeze to remove the channel dimension if it exists and transpose to match the correct orientation)
plt.imshow(image.squeeze(1).T)
plt.show()

# Optional: Print the image size and label for debugging
# print("Image size:", image.shape)
# print("Label:", label)

# Check the type of the dataset and inspect its length and the properties of the fetched sample
print(f"Dataset type: {type(my_dataset)}")
print(f"Dataset length: {len(my_dataset)}")  # Print the number of samples in the dataset
print(f"Image size: {image.size()}")  # Print the size of the image tensor
print(f"First sample: {my_dataset.__getitem__(1)}")  # Print the first sample from the dataset


In [None]:
my_generator_images=[]
my_discriminator_results=[]
class Generator(nn.Module):
    def __init__(self, input_size):
        super().__init__()

        # Store the input size and initialize the layers
        self.input_size = input_size

        # Fully connected layer network
        self.fc_net = nn.Sequential(
            nn.Linear(input_size, 256*10*32, bias=False),  # Create a layer with the specified input size and output size (256*10*32)
            nn.BatchNorm1d(256*10*32),  # Batch normalization to improve training
            nn.Softmax()  # Apply Softmax activation (no need for dim argument here)
        )

        # Transposed convolution layers for upsampling
        self.conv_model = nn.Sequential(
            nn.ConvTranspose2d(256, 128, 5, bias=False, padding=2),  # Upsample to 128 channels
            nn.BatchNorm2d(128),  # Batch normalization for stability
            nn.Softmax(),  # Apply Softmax activation (no need for dim argument here)
            nn.ConvTranspose2d(128, 64, 5, stride=2, bias=False, padding=2, output_padding=1),  # Upsample to 64 channels
            nn.BatchNorm2d(64),  # Batch normalization
            nn.Softmax(),  # Apply Softmax activation (no need for dim argument here)
            nn.ConvTranspose2d(64, 1, 5, stride=2, bias=False, padding=2, output_padding=1),  # Final output layer (1 channel)
            nn.Sigmoid()  # Apply Sigmoid activation to constrain output values between 0 and 1
        )

    def forward(self, x):
        # Forward pass through the fully connected network
        y = self.fc_net(x)
        
        # Reshape the output from the FC network to match the expected input for the convolutional layers
        y = y.reshape((-1, 256, 10, 32))
        
        # Pass the reshaped tensor through the convolutional layers
        y = self.conv_model(y)
        
        # Return the generated image
        return y


In [None]:
def weights_init(m):
    """
    Initialize the weights of the model layers based on their types.
    This function applies different initialization techniques depending on the layer type.

    Args:
        m (nn.Module): A PyTorch module (layer) whose weights are being initialized.
    """
    classname = m.__class__.__name__  # Get the class name of the layer (e.g., 'Conv2d', 'Linear', 'BatchNorm')

    # Initialize weights for Convolutional layers
    if classname.find('Conv') != -1:
        # Xavier uniform initialization for convolutional layers
        nn.init.xavier_uniform_(m.weight.data)
    
    # Initialize weights for Linear layers
    elif classname.find('Linear') != -1:
        # Xavier uniform initialization for fully connected (linear) layers
        nn.init.xavier_uniform_(m.weight.data)
    
    # Initialize weights for BatchNorm layers
    elif classname.find('BatchNorm') != -1:
        # Normal initialization for BatchNorm weights, mean = 1.0, std = 0.2
        nn.init.normal_(m.weight.data, 1.0, 0.2)
        # Initialize the biases to zero for BatchNorm layers
        nn.init.constant_(m.bias.data, 0)


In [None]:
# Initialize the generator with an input size of 100
generator = Generator(100)

# Apply the custom weight initialization to the generator model
generator.apply(weights_init)

# Generate random noise to feed into the generator (10 samples, 100-dimensional noise vector)
noise = torch.normal(0, 1, [10, 100])
print("Noise shape:", noise.shape)

# Generate the images from the noise vector using the generator
generated_image = generator(noise).detach()  # Use .detach() to avoid tracking gradients for this step
print("Generated image shape:", generated_image.shape)

# Display the first generated image (squeeze to remove any unnecessary dimensions)
plt.imshow(generated_image.squeeze()[0], cmap='gray')
plt.show()

# Optionally print the generated image tensor to inspect its values
print(generated_image.squeeze()[0])


In [None]:
class Discriminator(nn.Module):
    """
    Discriminator class for the GAN model.
    This model distinguishes between real and fake images by passing the input through
    several convolutional layers followed by a fully connected layer.

    Attributes:
        model (nn.Sequential): A sequence of layers that make up the discriminator.
    """

    def __init__(self):
        """
        Initializes the Discriminator with convolutional layers followed by a linear layer.
        """
        super().__init__()

        self.model = nn.Sequential(
            # First convolutional layer: Converts 1-channel input to 64 channels
            nn.Conv2d(1, 64, 5, stride=2, padding=2),
            nn.Softmax(dim=1),  # Softmax activation along the channel dimension
            nn.Dropout(0.4),    # Dropout to prevent overfitting

            # Second convolutional layer: Increases depth from 64 to 128 channels
            nn.Conv2d(64, 128, 5, stride=2, padding=2),
            nn.Softmax(dim=1),
            nn.Dropout(0.4),

            # Third convolutional layer: Keeps depth at 128 channels
            nn.Conv2d(128, 128, 5, stride=2, padding=2),
            nn.Softmax(dim=1),
            nn.Dropout(0.4),

            # Flatten the output of the last convolutional layer before passing to the linear layer
            nn.Flatten(),
            nn.Linear(10240, 14)  # Final linear layer to output a prediction (14 categories in this case)
        )

    def forward(self, x):
        """
        Defines the forward pass for the Discriminator.

        Args:
            x (tensor): The input image tensor to the discriminator.

        Returns:
            tensor: The output of the discriminator after passing through the layers.
        """
        y = self.model(x)
        my_discriminator_results.append(y)  # Store the results for later inspection (optional)
        return y


In [None]:
# Instantiate the Discriminator model
discriminator = Discriminator()

# Apply the weights initialization function to the model
discriminator.apply(weights_init)

# Pass the generated image through the discriminator to get the decision (real/fake classification)
decision = discriminator(generated_image)

# Print the decision made by the discriminator
print(decision)


In [None]:
# Loss function for the Discriminator
cross_entropy = nn.BCEWithLogitsLoss()

# Lists to store loss values for analysis
Generator_loss_list = []
Discriminator_loss_list = []

def discriminator_loss(real_output, fake_output, device):
    """
    Compute the loss for the Discriminator, which includes the real and fake loss components.
    
    Args:
        real_output (Tensor): The output of the discriminator for real images.
        fake_output (Tensor): The output of the discriminator for generated (fake) images.
        device (torch.device): The device used for tensor computations (CPU or GPU).

    Returns:
        total_loss (Tensor): The total loss, combining real and fake loss.
    """
    # Loss for real images (the discriminator should classify real images as 1)
    real_loss = cross_entropy(real_output, torch.ones_like(real_output, device=device))
    print("Discriminator Real Loss:", real_loss)

    # Loss for fake images (the discriminator should classify fake images as 0)
    fake_loss = cross_entropy(fake_output, torch.zeros_like(fake_output, device=device))
    print("Discriminator Fake Loss:", fake_loss)

    # Total loss is the sum of real and fake losses
    total_loss = real_loss + fake_loss
    print("Discriminator Total Loss:", total_loss)

    # Append the losses for future analysis
    Discriminator_loss_list.append([real_loss, fake_loss, total_loss])

    return total_loss


In [None]:
# Loss function for the Generator
def generator_loss(fake_output, device):
    """
    Compute the loss for the Generator, which encourages the Generator to produce outputs that the Discriminator classifies as real.
    
    Args:
        fake_output (Tensor): The output of the discriminator for generated (fake) images.
        device (torch.device): The device used for tensor computations (CPU or GPU).

    Returns:
        gen_loss (Tensor): The loss for the Generator.
    """
    # Generator loss (the Generator tries to fool the Discriminator by producing images classified as real)
    gen_loss = cross_entropy(fake_output, torch.ones_like(fake_output, device=device))
    print("Generator Loss:", gen_loss)

    # Append the Generator loss for future analysis
    Generator_loss_list.append([gen_loss])

    return gen_loss


In [None]:
# Optimizer setup for the Generator and Discriminator
gen_opt = torch.optim.Adam(generator.parameters(), lr=1e-4, betas=(0.5, 0.999))
dis_opt = torch.optim.Adam(discriminator.parameters(), lr=1e-4, betas=(0.5, 0.999))


In [None]:


# Random seed for reproducibility of the generated images
seed = torch.randn([num_examples_to_generate, noise_dim], device=device)


In [None]:
# Lists for tracking real and fake outputs (argmax values) during training
argmax_real = []
argmax_fake = []

# Training step function for each batch
def train_step(images, generator, discriminator, BATCH_SIZE, noise_dim, device, dis_opt, gen_opt):
    # Generate random noise input for the Generator
    noise = torch.randn([BATCH_SIZE, noise_dim], device=device)
    
    # Generate images using the Generator
    generated_images = generator(noise)
    
    # Pass real images through the Discriminator
    real_output = discriminator(images)
    
    # Pass generated (fake) images through the Discriminator (detached to avoid updating Generator)
    fake_output = discriminator(generated_images.detach())
    
    # Track argmax of real and fake outputs for analysis or visualization
    argmax_real.append(torch.argmax(real_output, dim=1))
    argmax_fake.append(torch.argmax(fake_output, dim=1))
    
    # Calculate the loss for the Discriminator
    disc_loss = discriminator_loss(real_output, fake_output, device)
    
    # Update Discriminator's weights
    dis_opt.zero_grad()
    disc_loss.backward()
    dis_opt.step()

    # Calculate the Generator's loss using the Discriminator's output on generated images
    fake_output = discriminator(generated_images)
    gen_loss = generator_loss(fake_output, device)
    
    # Update Generator's weights
    gen_opt.zero_grad()
    gen_loss.backward()
    gen_opt.step()

    return gen_loss, disc_loss


In [None]:
def generate_and_save_images(model, epoch, test_input, boolean, discri):
    # Set the model to evaluation mode
    model.eval()
    
    with torch.no_grad():
        # Generate predictions from the model without updating weights
        predictions = model(test_input).detach().cpu() * 250

    # Create a grid of the generated images and display
    grid = make_grid(predictions, 4).numpy().squeeze().transpose(1, 2, 0)
    plt.imshow(grid.astype(np.uint16), interpolation='bilinear', aspect='auto')
    plt.axis('off')

    # Save the generated image
    plt.savefig(f'image_at_epoch_{epoch:04d}.png')
    plt.show()
    
    if boolean:
        # If boolean is True, evaluate the generated images with the discriminator
        predictions32 = predictions[:32].to(device)
        label_prediction = discri(predictions32)

        # Extract the predicted labels by taking the argmax
        list_label_predictions = [torch.argmax(i) for i in label_prediction]
        
        return list_label_predictions, predictions
    
    # Return to training mode
    model.train()


In [None]:
def train(dataloader, epochs, generator, discriminator, BATCH_SIZE, noise_dim, device, dis_opt, gen_opt):
    gloss = []
    dloss = []
    
    for epoch in range(epochs):
        start = time.time()

        gen_losses = []
        disc_losses = []
        
        for image_batch, _ in dataloader:
            image_batch = image_batch.to(device)
            gen_loss, disc_loss = train_step(image_batch, generator, discriminator, BATCH_SIZE, noise_dim, device, dis_opt, gen_opt)
            
            gen_losses.append(gen_loss.detach().cpu())
            disc_losses.append(disc_loss.detach().cpu())

        gloss.append(np.mean(gen_losses))
        dloss.append(np.mean(disc_losses))

        # Produce and save images during training
        display.clear_output(wait=True)
        generate_and_save_images(generator, epoch + 1, seed, False, discriminator)

        print(f'Time for epoch {epoch + 1} is {time.time() - start:.2f} sec')

    # Generate and save images after the final epoch
    display.clear_output(wait=True)
    generate_and_save_images(generator, epochs, seed, False, discriminator)

    return gloss, dloss


In [None]:
# Move the generator and discriminator models to the specified device (GPU or CPU)
generator.to(device)
discriminator.to(device)

# Training configuration
EPOCHS = 100# Number of epochs for training (adjusted from 400)
noise_dim = 100  # Dimension of the random noise vector input to the Generator
num_examples_to_generate = 32  # Number of generated examples to display per epoch

# Train the models for the specified number of epochs and store the loss values
gloss, dloss = train(dataloader, EPOCHS, generator, discriminator, BATCH_SIZE, noise_dim, device, dis_opt, gen_opt)


In [None]:
label_predictions_generate_and_save, predictions_generate_and_save = generate_and_save_images(
    generator,
    EPOCHS,
    seed,
    True,
    discriminator
)

print(label_predictions_generate_and_save)


In [None]:
# Convert the set to a list for indexing
mDoppler_labels_list = list(mDoppler_labels_WR14_set)

# Initialize an empty list to store label strings
label_list_strings = []

# Iterate through the first 32 predictions
for idx in range(32):
    # Find the label corresponding to the predicted class
    predicted_label = label_predictions_generate_and_save[idx].detach().cpu().tolist()
    
    for label_idx, label_name in enumerate(mDoppler_labels_list):
        if predicted_label == label_idx:
            label_list_strings.append(label_name)
            break
    
    # Plot the generated image
    plt.figure()
    plt.imshow(predictions_generate_and_save[idx].detach().cpu().T, aspect='auto')
    
    # Set the title of the image to the predicted label
    plt.title(mDoppler_labels_list[predicted_label])
    
    # Save the generated and labeled image
    plt.savefig(f'generated_and_labeled_image_{idx}.png')
    plt.close()


In [None]:
print(len(label_list_strings))

In [None]:
# Initialize the counter for labeling
counter = 0

# Function to plot the Fourier spectrum with a logarithmic colormap
def plot_spectrum(im_fft):
    """
    Plots the Fourier transform of an image with a logarithmic color map.
    """
    plt.imshow(np.abs(im_fft), norm=LogNorm(vmin=5))  # Using a logarithmic scale for visualization
    plt.colorbar()

# Iterate through the predictions
for im in predictions_generate_and_save:
    # Convert image to numpy array and compute FFT
    im = im.detach().cpu().numpy().squeeze()
    im_fft = fftpack.fft2(im)

    # Plot the original Fourier transform
    plt.figure()
    plot_spectrum(im_fft)
    plt.title('Fourier Transform')

    # Make a copy of the Fourier transform for filtering
    im_fft_filtered = im_fft.copy()

    # Get the dimensions of the Fourier transform
    rows, cols = im_fft_filtered.shape

    # Define the fraction of coefficients to keep (only outer frequencies)
    keep_fraction = 0.2

    # Set the middle frequencies to zero (filtering)
    im_fft_filtered[int(rows * keep_fraction):int(rows * (1 - keep_fraction))] = 0
    im_fft_filtered[:, int(cols * keep_fraction):int(cols * (1 - keep_fraction))] = 0

    # Plot the filtered Fourier spectrum
    plt.figure()
    plot_spectrum(im_fft_filtered)
    plt.title('Filtered Spectrum')

    # Reconstruct the denoised image from the filtered Fourier transform
    im_new = fftpack.ifft2(im_fft_filtered).real

    # Plot the reconstructed image
    plt.figure()
    plt.imshow(im_new.T, aspect='auto')  # Display the reconstructed image
    plt.title(label_list_strings[counter])  # Set the title to the label

    # Save the reconstructed image with the label
    plt.savefig(f'generated_and_labeled_image_noise_filtered_{counter}.png')  # Save the image
    plt.show()  # Ensure the plot is shown

    # Increment the counter for the next image
    counter += 1

In [None]:
# Create a plot with a larger figure size for better visibility
plt.figure(figsize=(10, 10))

# Plot the generator loss
plt.plot(gloss, label="Generator Loss", color='blue')

# Plot the discriminator loss
plt.plot(dloss, label="Discriminator Loss", color='red')

# Add a legend to the plot to distinguish between the two loss curves
plt.legend()

# Display the plot
plt.show()
