# <center> **Generating Synthetic Data** <center>

In [1]:
from src import *

import os
import time
import numpy as np
import pandas as pd
import torch
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
import matplotlib.pyplot as plt

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

2025-01-15 17:06:29.324935: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: SSE4.1 SSE4.2 AVX AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


## Define parameters

In [2]:
data_percentage = [10, 20, 30, 40, 50]
class_size = [4, 10]
target_size = 5000

# hyperparameters of one-shot generative model
patch_dims = [5, 7, 9]
min_height = 15

# hyperparameters of deep generative models
num_epochs = 2000
batch_size = 64
latent_dim = 50

## Load subsampled and test data

In [3]:
paths_dict = utils.get_sorted_paths_dict(data_directory  = './data/', substring_to_replace = "train_subsampled")
paths_dict_test = utils.get_sorted_paths_dict(data_directory  = './data/', substring_to_replace = "test")

## Generate by one-shot generative model (GPDM)

In [None]:
utils.set_seeds(seed_value=0)

for patch_dim in patch_dims:
    g = generate.Vicinal(patch_dim = patch_dim, min_height = min_height, algorithm = 'gpdm')
    for cs in class_size:
        for dp in data_percentage:
            for fold in range(5):
                print(cs, ' Classes, ', dp, ' Percent, ', 'Fold ', fold)
                X_train_subsampled = np.load(paths_dict[(cs, dp)][fold][0])
                Y_train_subsampled = np.load(paths_dict[(cs, dp)][fold][1])
                X_train_synthetic, Y_train_synthetic = g.generate(X_train_subsampled, Y_train_subsampled, target_size)

                # Create the folder if it doesn't exist
                folder_path = f'./results/gpdm{patch_dim}/C{cs}P{dp:02d}K{fold + 1}/'
                os.makedirs(folder_path, exist_ok=True)

                # Save the synthetic data
                np.save(os.path.join(folder_path, 'X_train_synthetic.npy'), X_train_synthetic)
                np.save(os.path.join(folder_path, 'Y_train_synthetic.npy'), np.squeeze(Y_train_synthetic))


## Generate by one-shot generative model (GPNN)

In [None]:
utils.set_seeds(seed_value=0)

for patch_dim in patch_dims:
    g = generate.Vicinal(patch_dim = patch_dim, min_height = min_height, algorithm = 'gpnn')
    for cs in class_size:
        for dp in data_percentage:
            for fold in range(5):
                print(cs, ' Classes, ', dp, ' Percent, ', 'Fold ', fold)
                X_train_subsampled = np.load(paths_dict[(cs, dp)][fold][0])
                Y_train_subsampled = np.load(paths_dict[(cs, dp)][fold][1])
                X_train_synthetic, Y_train_synthetic = g.generate(X_train_subsampled, Y_train_subsampled, target_size)

                # Create the folder if it doesn't exist
                folder_path = f'./results/gpnn{patch_dim}/C{cs}P{dp:02d}K{fold + 1}/'
                os.makedirs(folder_path, exist_ok=True)

                # Save the synthetic data
                np.save(os.path.join(folder_path, 'X_train_synthetic.npy'), X_train_synthetic)
                np.save(os.path.join(folder_path, 'Y_train_synthetic.npy'), np.squeeze(Y_train_synthetic))

## Generate by CGAN

In [None]:
lr = 5e-4

utils.set_seeds(seed_value=0)

folder_path = f'./results/cgan/plots'
os.makedirs(folder_path, exist_ok=True)
        
for cs in class_size:
    for idx,dp in enumerate(data_percentage):
        for fold in range(5):
            
            X_test = np.load(paths_dict_test[(cs, dp)][fold][0])
            Y_test = np.load(paths_dict_test[(cs, dp)][fold][1])
            X_train_subsampled = np.load(paths_dict[(cs, dp)][fold][0])
            Y_train_subsampled = np.load(paths_dict[(cs, dp)][fold][1])
            
            ## Uncomment to clean subsampled data prior to training
            #outlier_remover = utils.OutlierRemover(n_mad=3)
            #X_train_subsampled, Y_train_subsampled = outlier_remover.remove_outliers(X_train_subsampled,Y_train_subsampled)
            
            cn = utils.ClasswiseNormalizer(symmetry=True)
            X_train_subsampled_normalized =  cn.normalize(X_train_subsampled, Y_train_subsampled)
            X_train_subsampled_normalized, Y_train_subsampled_onehot = utils.np2tensor_onehot(
                X_train_subsampled_normalized,
                Y_train_subsampled,
                num_classes=cs)
            augmented_X, augmented_Y = utils.generate_mixup_samples(
                X_train_subsampled_normalized,
                Y_train_subsampled_onehot,
                target_size,
                alpha=0.2)
            dataset = TensorDataset(augmented_X, augmented_Y)
            data_loader = DataLoader(dataset, batch_size, shuffle=True, drop_last=False)
            
            # Initialize CGAN instance
            generative_model_instance = generate.Conditional('CGAN', X_train_subsampled.shape[1], cs, latent_dim)
            generative_model_instance = generative_model_instance.to(device)
            optimizer_G = torch.optim.Adam(generative_model_instance.model.generator.parameters(),lr=lr)
            optimizer_D = torch.optim.Adam(generative_model_instance.model.discriminator.parameters(),lr=lr)
            
            # Train CGAN
            generative_model_instance.model.train_model(data_loader, optimizer_G, optimizer_D, num_epochs)
            
            # Generate synthetic data using CGAN
            X_train_synthetic_normalized, Y_train_synthetic = generative_model_instance.generate(num_samples=target_size)
            X_train_synthetic = cn.denormalize(X_train_synthetic_normalized, Y_train_synthetic)

            # Create the folder if it doesn't exist
            folder_path = f'./results/cgan/C{cs}P{dp:02d}K{fold + 1}/'
            os.makedirs(folder_path, exist_ok=True)
            
            loss_plot = plt.gcf()
            loss_plot.savefig(f'./results/cgan/plots/loss_C{cs}P{dp:02d}K{fold + 1}.pdf')
            
            # Extracting dimensions from the data
            m, f = X_train_subsampled.shape
            n, _ = X_train_synthetic.shape
            c = cs # Total number of unique classes
            
            print(cs, ' Classes, ', dp, ' Percent, ', 'Fold ', fold)
            # Create a subplot of shape c*2
            fig, axes = plt.subplots(c, 3, figsize=(10, 4*c))

            # Iterate over each class and plot data
            for cls in range(c):
                # Subsampled data
                mask_subsampled = Y_train_subsampled == cls
                axes[cls][0].plot(range(f), X_train_subsampled[mask_subsampled].T, alpha=0.25)
                axes[cls][0].set_title(f"Class {cls} Subsampled")
                
                # Synthetic data
                mask_synthetic = Y_train_synthetic == cls
                axes[cls][1].plot(range(f), X_train_synthetic[mask_synthetic].T, alpha=0.25)
                axes[cls][1].set_title(f"Class {cls} Synthetic")

                # test data
                mask_test = Y_test == cls
                axes[cls][2].plot(range(f), X_test[mask_test].T, alpha=0.25)
                axes[cls][2].set_title(f"Class {cls} Test")

            plt.tight_layout()
            plt.show()

            fig.savefig(f'./results/cgan/plots/sample_C{cs}P{dp:02d}K{fold + 1}.pdf')
            

            # Save the synthetic data
            np.save(os.path.join(folder_path, 'X_train_synthetic.npy'), X_train_synthetic)
            np.save(os.path.join(folder_path, 'Y_train_synthetic.npy'), np.squeeze(Y_train_synthetic))
        print('\n')

## Generate by CVAE

In [None]:
lr = 1e-4

utils.set_seeds(seed_value=0)

folder_path = f'./results/cvae/plots'
os.makedirs(folder_path, exist_ok=True)

for cs in class_size:
    for idx, dp in enumerate(data_percentage):
        for fold in range(5):
            
            # Load and preprocess data as before
            X_test = np.load(paths_dict_test[(cs, dp)][fold][0])
            Y_test = np.load(paths_dict_test[(cs, dp)][fold][1])
            X_train_subsampled = np.load(paths_dict[(cs, dp)][fold][0])
            Y_train_subsampled = np.load(paths_dict[(cs, dp)][fold][1])
            
            ## Uncomment to clean subsampled data prior to training
            #outlier_remover = utils.OutlierRemover(n_mad=3)
            #X_train_subsampled, Y_train_subsampled = outlier_remover.remove_outliers(X_train_subsampled,Y_train_subsampled)
            
            cn = utils.ClasswiseNormalizer(symmetry=True)
            X_train_subsampled_normalized =  cn.normalize(X_train_subsampled, Y_train_subsampled)
            X_train_subsampled_normalized, Y_train_subsampled_onehot = utils.np2tensor_onehot(
                X_train_subsampled_normalized,
                Y_train_subsampled,
                num_classes=cs)
            augmented_X, augmented_Y = utils.generate_mixup_samples(
                X_train_subsampled_normalized,
                Y_train_subsampled_onehot,
                target_size,
                alpha=0.2)
            dataset = TensorDataset(augmented_X, augmented_Y)
            data_loader = DataLoader(dataset, batch_size, shuffle=True, drop_last=False)
            
            # Initialize CVAE instance
            generative_model_instance = generate.Conditional('CVAE', X_train_subsampled.shape[1], cs, latent_dim)
            generative_model_instance = generative_model_instance.to(device)
            optimizer = torch.optim.Adam(generative_model_instance.model.parameters(), lr)
            
            # Train CVAE
            generative_model_instance.model.train_model(data_loader, optimizer, num_epochs)

            # Generate synthetic data using CVAE
            X_train_synthetic_normalized, Y_train_synthetic = generative_model_instance.generate(target_size)
            X_train_synthetic = cn.denormalize(X_train_synthetic_normalized, Y_train_synthetic)
            
            # Create the folder if it doesn't exist
            folder_path = f'./results/cvae/C{cs}P{dp:02d}K{fold + 1}/'
            os.makedirs(folder_path, exist_ok=True)
            
            loss_plot = plt.gcf()
            loss_plot.savefig(f'./results/cvae/plots/loss_C{cs}P{dp:02d}K{fold + 1}.pdf')
            
            # Extracting dimensions from the data
            m, f = X_train_subsampled.shape
            n, _ = X_train_synthetic.shape
            c = cs # Total number of unique classes
            
            print(cs, ' Classes, ', dp, ' Percent, ', 'Fold ', fold)
            # Create a subplot of shape c*2
            fig, axes = plt.subplots(c, 3, figsize=(10, 4*c))

            # Iterate over each class and plot data
            for cls in range(c):
                # Subsampled data
                mask_subsampled = Y_train_subsampled == cls
                axes[cls][0].plot(range(f), X_train_subsampled[mask_subsampled].T, alpha=0.25)
                axes[cls][0].set_title(f"Class {cls} Subsampled")
                
                # Synthetic data
                mask_synthetic = Y_train_synthetic == cls
                axes[cls][1].plot(range(f), X_train_synthetic[mask_synthetic].T, alpha=0.25)
                axes[cls][1].set_title(f"Class {cls} Synthetic")

                # test data
                mask_test = Y_test == cls
                axes[cls][2].plot(range(f), X_test[mask_test].T, alpha=0.25)
                axes[cls][2].set_title(f"Class {cls} Test")

            plt.tight_layout()
            plt.show()

            fig.savefig(f'./results/cvae/plots/sample_C{cs}P{dp:02d}K{fold + 1}.pdf')
            

            # Save the synthetic data
            np.save(os.path.join(folder_path, 'X_train_synthetic.npy'), X_train_synthetic)
            np.save(os.path.join(folder_path, 'Y_train_synthetic.npy'), np.squeeze(Y_train_synthetic))