In [1]:
import math
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import os

import keras
import tensorflow as tf

from IPython.display import clear_output

from keras.models import Model, Sequential
from keras.layers import *
from keras.optimizers import Adam, SGD
from keras import backend
from tensorflow.keras.mixed_precision import set_global_policy

import sys
import time
import warnings
warnings.simplefilter(action = 'ignore', category = FutureWarning)
warnings.simplefilter(action = 'ignore', category = Warning)

import seaborn as sns
from scipy.signal import savgol_filter
from scipy.linalg import sqrtm

sys.version
import session_info
session_info.show()

In [2]:
%%time
try:
    tpu = tf.distribute.cluster_resolver.TPUClusterResolver()
    print('Device:', tpu.master())
    tf.config.experimental_connect_to_cluster(tpu)
    tf.tpu.experimental.initialize_tpu_system(tpu)
    strategy = tf.distribute.experimental.TPUStrategy(tpu)
except:
    strategy = tf.distribute.get_strategy()
    if tf.config.list_physical_devices('GPU'):
        print("GPU is available.")
        !nvidia-smi -L
    else:
        print("Training on CPU.")

print('Number of replicas:', strategy.num_replicas_in_sync)
#tf.config.run_functions_eagerly(True)
AUTOTUNE = tf.data.experimental.AUTOTUNE

GPU is available.
GPU 0: NVIDIA GeForce RTX 3070 Laptop GPU (UUID: GPU-22e7aaac-adad-9a2f-120f-e4e0a027d5a6)
Number of replicas: 1
CPU times: total: 15.6 ms
Wall time: 117 ms


In [3]:
#mixed precision to save resources use
tf.keras.mixed_precision.set_global_policy('mixed_float16')
#limits memory use. Kernel would crash if there is not enough avaliable memory. 
#Limiting it helps avoiding that at the cost of possible bottleneck.
gpus = tf.config.list_physical_devices('GPU')
if gpus:
  try:
    tf.config.experimental.set_virtual_device_configuration(gpus[0], [tf.config.experimental.VirtualDeviceConfiguration(memory_limit=7168)])
  except RuntimeError as e:
    print(e)

INFO:tensorflow:Mixed precision compatibility check (mixed_float16): OK
Your GPU will likely run quickly with dtype policy mixed_float16 as it has compute capability of at least 7.0. Your GPU: NVIDIA GeForce RTX 3070 Laptop GPU, compute capability 8.6


In [4]:
%%time
datestr = time.strftime("%Y%m%d")
ROOT_DIR = "./"
DATA_DIR = "%s/data" % ROOT_DIR
EVAL_DIR = "%s/evaluation" % ROOT_DIR
MODEL_DIR = ("%s/models/CGAN/"+ datestr) % ROOT_DIR
WEIGHTS_DIR = ("%s/weights/CGAN/"+ datestr) % ROOT_DIR
SAMPLES_DIR = ("%s/samples/CGAN/"+ datestr) % ROOT_DIR
GENERATED_DIR = "%s/generated" % ROOT_DIR

#Create missing directories, if they don't exist
if not os.path.exists(DATA_DIR):
    # Create a new directory because it does not exist
    os.makedirs(DATA_DIR)
    print("The data directory is created!")
if not os.path.exists(EVAL_DIR):
    # Create a new directory because it does not exist
    os.makedirs(EVAL_DIR)
    print("The evaluation directory is created!")
if not os.path.exists(MODEL_DIR):
    # Create a new directory because it does not exist
    os.makedirs(MODEL_DIR)
    print("The model directory is created!")
if not os.path.exists(WEIGHTS_DIR):
    # Create a new directory because it does not exist
    os.makedirs(WEIGHTS_DIR)
    print("The weights directory is created!")
if not os.path.exists(SAMPLES_DIR):
    # Create a new directory because it does not exist
    os.makedirs(SAMPLES_DIR)
    print("The samples directory is created!")
if not os.path.exists(GENERATED_DIR):
    # Create a new directory because it does not exist
    os.makedirs(GENERATED_DIR)
    print("The generated directory is created!")

The model directory is created!
The weights directory is created!
The samples directory is created!
CPU times: total: 0 ns
Wall time: 2.01 ms


In [53]:
def smooth_signal(row, window_length=11, polyorder=5):
    return savgol_filter(row, window_length=window_length, polyorder=polyorder)

def calculate_fid(df_real, df_generated):
    """
    Compute FID score between two pandas dataframes where:
    - Each row is a different signal
    - Each column is a data point of the signal

    Parameters:
        df_real (pd.DataFrame): DataFrame of real signals
        df_generated (pd.DataFrame): DataFrame of generated signals

    Returns:
        fid_score (float): Computed FID score
    """

    # Ensure inputs are either NumPy arrays or Pandas DataFrames
    if isinstance(df_real, np.ndarray):
        real_data = df_real
    elif hasattr(df_real, "to_numpy"):  # Handles Pandas DataFrame
        real_data = df_real.to_numpy()
    else:
        raise TypeError("df_real must be a NumPy array or Pandas DataFrame")

    if isinstance(df_generated, np.ndarray):
        gen_data = df_generated
    elif hasattr(df_generated, "to_numpy"):
        gen_data = df_generated.to_numpy()
    else:
        raise TypeError("df_generated must be a NumPy array or Pandas DataFrame")

    # Validate dimensions
    if real_data.shape[1] != gen_data.shape[1]:
        raise ValueError("Both datasets must have the same number of columns (features)")

    # Compute mean and covariance matrices
    mu_real, sigma_real = np.mean(real_data, axis=0), np.cov(real_data, rowvar=False)
    mu_gen, sigma_gen = np.mean(gen_data, axis=0), np.cov(gen_data, rowvar=False)

    # Compute square root of product of covariances
    cov_sqrt, _ = sqrtm(sigma_real @ sigma_gen, disp=False)

    # Check for numerical issues (complex numbers)
    if np.iscomplexobj(cov_sqrt):
        cov_sqrt = cov_sqrt.real

    # Compute FID score
    fid_score = np.sum((mu_real - mu_gen) ** 2) + np.trace(sigma_real + sigma_gen - 2 * cov_sqrt)
    
    return fid_score

load_file_path = '%s/' % GENERATED_DIR
df_generated = pd.read_csv(load_file_path+"20250201-150023_CGAN_generator_120000.csv", encoding='utf-8', index_col=0)
column_names = [f"Point {i+1}" for i in range(187)] + ["label"]
df_generated.columns = column_names

In [6]:
%%time
train_file_path = '%s/mitbih_train.csv' % DATA_DIR
test_file_path = '%s/mitbih_test.csv' % DATA_DIR
column_names = [f"Point {i+1}" for i in range(187)] + ["label"]
df_train = pd.read_csv(train_file_path, names=column_names)
df_test = pd.read_csv(test_file_path, names=column_names)
df_train["label"] = df_train["label"].astype(int)
df_test["label"] = df_test["label"].astype(int)

CPU times: total: 2.59 s
Wall time: 3.09 s


In [7]:
%%time
img_rows = 187
img_cols = 1
channels = 1

# Input image dimensions
img_shape = (img_rows, img_cols, channels)

# Size of the noise vector, used as input to the Generator
z_dim = 256

# Number of classes in the dataset
num_classes = 5

CPU times: total: 0 ns
Wall time: 0 ns


In [8]:
%%time
def build_generator(z_dim):

    model = Sequential()
    
    #Layer 1
    model.add(Dense(512*4, input_dim=z_dim))
    model.add(Reshape((4, 512)))
    
    #Layer 2
    model.add(Conv1DTranspose(256, kernel_size=4, strides=2, padding='same', use_bias=False))
    model.add(BatchNormalization())
    model.add(LeakyReLU(alpha=0.02))

    #Layer 3
    model.add(Conv1DTranspose(128, kernel_size=4, strides=2, padding='same', use_bias=False))
    model.add(BatchNormalization())
    model.add(LeakyReLU(alpha=0.02))
    
    #Layer 4
    model.add(Conv1DTranspose(64, kernel_size=4, strides=2, padding='same', use_bias=False))
    model.add(BatchNormalization())
    model.add(LeakyReLU(alpha=0.02))

    #Layer 5
    model.add(Conv1DTranspose(1, kernel_size=4, strides=2, padding='same', use_bias=False))
    model.add(BatchNormalization())
    model.add(LeakyReLU(alpha=0.02))

    # Output layer with tanh activation
    model.add(Flatten())
    model.add(Dense(187))
    model.add(Reshape((187, 1)))
    model.add(Activation('tanh'))

    return model

CPU times: total: 0 ns
Wall time: 0 ns


In [9]:
%%time
def build_cgan_generator(z_dim):

    # Random noise vector z
    z = Input(shape=(z_dim, ))

    # Conditioning label: integer 0-9 specifying the number G should generate
    label = Input(shape=(1, ), dtype='int32')

    # Label embedding:
    # ----------------
    # Turns labels into dense vectors of size z_dim
    # Produces 3D tensor with shape (batch_size, 1, z_dim)
    label_embedding = Embedding(num_classes, z_dim, input_length=1)(label)

    # Flatten the embedding 3D tensor into 2D tensor with shape (batch_size, z_dim)
    label_embedding = Flatten()(label_embedding)

    # Element-wise product of the vectors z and the label embeddings
    joined_representation = Multiply()([z, label_embedding])

    generator = build_generator(z_dim)

    # Generate image for the given label
    conditioned_img = generator(joined_representation)

    return Model([z, label], conditioned_img)

CPU times: total: 0 ns
Wall time: 0 ns


In [10]:
%%time
def build_discriminator(img_shape):

    model = Sequential()

    model.add(Input(shape = (img_shape[0], img_shape[1], img_shape[2] + 1)))
    
    #Layer 1
    model.add(
        Conv1D(32,
               kernel_size=4,
               strides=2,
               padding='same', use_bias=False))
    
    # Batch normalization
    #model.add(BatchNormalization())
    
    # Leaky ReLU activation
    model.add(LeakyReLU(alpha=0.2))
    model.add(Dropout(0.5))

    # Layer 2
    model.add(
        Conv1D(64,
               kernel_size=4,
               strides=2,
               padding='same', use_bias=False))

    # Batch normalization
    #model.add(BatchNormalization())

    # Leaky ReLU activation
    model.add(LeakyReLU(alpha=0.2))
    model.add(Dropout(0.5))

    # Layer 3
    model.add(
        Conv1D(128,
               kernel_size=4,
               strides=2,
               padding='same', use_bias=False))

    # Batch normalization
    #model.add(BatchNormalization())

    # Leaky ReLU
    model.add(LeakyReLU(alpha=0.2))
    model.add(Dropout(0.5))

    # Output layer with sigmoid activation
    model.add(Flatten())
    model.add(Dense(1, activation='sigmoid'))

    return model

CPU times: total: 0 ns
Wall time: 0 ns


In [11]:
%%time
def build_cgan_discriminator(img_shape):

    # Input image
    img = Input(shape=img_shape)

    # Label for the input image
    label = Input(shape=(1, ), dtype='int32')

    #print("Shape of img:", img.shape)
    #print("Shape of label:", label.shape)

    # Label embedding:
    # ----------------
    # Turns labels into dense vectors of size z_dim
    # Produces 3D tensor with shape (batch_size, 1, 28*28*1)
    label_embedding = Embedding(num_classes,
                                np.prod(img_shape),
                                input_length=1)(label)

    # Flatten the embedding 3D tensor into 2D tensor with shape (batch_size, 28*28*1)
    label_embedding = Flatten()(label_embedding)

    # Reshape label embeddings to have same dimensions as input images
    label_embedding = Reshape(img_shape)(label_embedding)

    # Concatenate images with their label embeddings
    concatenated = Concatenate(axis=-1)([img, label_embedding])

    #print("Shape after concatenation:", concatenated.shape)

    discriminator = build_discriminator(img_shape)

    # Classify the image-label pair
    classification = discriminator(concatenated)

    return Model([img, label], classification)

CPU times: total: 0 ns
Wall time: 0 ns


In [12]:
%%time
# Build and compile the Discriminator
discriminator = build_cgan_discriminator(img_shape)
discriminator.compile(loss='binary_crossentropy',
                      optimizer=Adam(),
                      metrics=['accuracy'])

# Build the Generator
generator = build_cgan_generator(z_dim)

CPU times: total: 1.17 s
Wall time: 1.48 s


In [13]:
generator_file_path = "%s/../20250201/20250201-150023_CGAN_generator_120000.weights.h5" % WEIGHTS_DIR
generator.load_weights(generator_file_path, skip_mismatch=False, by_name=False, options=None)
#generator = tf.keras.models.load_model(generator_file_path)

In [14]:
discriminator_file_path = "%s/../20250201/20250201-150023_CGAN_discriminator_120000.weights.h5" % WEIGHTS_DIR
discriminator.load_weights(discriminator_file_path, skip_mismatch=False, by_name=False, options=None)
#discriminator = tf.keras.models.load_model(discriminator_file_path)

In [15]:
%%time
def sample_data(amount):

    # Sample random noise
    z = np.random.normal(0, 1, (amount * 5, z_dim))

    # Get image labels 0-4
    label = np.arange(0, 5)
    labels = np.tile(label, amount).reshape(-1, 1)

    # Generate images from random noise
    gen_data = generator.predict([z, labels], verbose=0)

    fake = np.ones((amount, 1)) * 0

    real_fake = discriminator.predict([gen_data, labels], verbose=0)

    # Rescale image pixel values to [0, 1]
    gen_data = 0.5 * gen_data + 0.5

    return gen_data, labels, real_fake

CPU times: total: 0 ns
Wall time: 0 ns


In [16]:
gen_data, labels, real_fake = sample_data(100)

In [55]:
z = np.random.normal(0, 1, (10 * 5, 256))
label = np.arange(0, 5)
labels = np.tile(label, 10).reshape(-1, 1)
y_train = labels.squeeze()
imgs = generator.predict([z, labels], verbose=0).reshape(50, 187)
gen_imgs = generator.predict([z, labels], verbose=0).reshape(50, 187)

In [56]:
y_train.shape

(50,)

In [57]:
# -------------------------
#  Compute FID for each label
# -------------------------
unique_labels = np.unique(y_train)
fid_scores = {}

for label in unique_labels:
    real_samples = imgs[y_train == label]  # Select only real samples for the label
    fake_samples = gen_imgs[y_train == label]  # Select only generated samples

    if len(real_samples) > 1 and len(fake_samples) > 1:  # Ensure enough samples exist
        fid_scores[label] = calculate_fid(real_samples, fake_samples)
print(f"Iteration: FID Scores {fid_scores}")

Iteration: FID Scores {0: -6.920403901701413e-06, 1: -7.573819849466504e-06, 2: -1.3341462374954777e-05, 3: -3.0417200394114094e-06, 4: -8.41450704349659e-06}


df_smooth = df_generated.drop(['label'], axis=1)
df_smooth = df_smooth.apply(smooth_signal, axis=1, result_type="expand")
df_smooth['label'] = df_generated['label']
df_smooth.columns = column_names

fid = calculate_fid(df_train.drop(columns=["label"]), df_smooth.drop(columns=["label"]))
fid

fid = calculate_fid(df_train.drop(columns=["label"]), df_smooth.drop(columns=["label"]))
fid

fid = calculate_fid(df_train.drop(columns=["label"]), df_smooth.drop(columns=["label"]))
fid