In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

In [None]:
# Load Heart Rate Data
file_path = '/home/Gurshan.R/Documents/GitHub/SYSC4907_Capstone/GAN_Heart_v2/infant_2_8h_heart_rate_outlierRem.csv'  # Replace with your actual file path
df = pd.read_csv(file_path)

# Ensure the heart rate column exists
columns = ['heart_rate']  # Replace with the correct column name for heart rate
if columns[0] not in df.columns:
    raise ValueError(f"Column '{columns[0]}' not found in the dataset.")

# Normalization
data = df[columns].to_numpy().reshape(-1, 1)  # Extract heart rate data

# Confirm data range
min_data_value = data.min()
max_data_value = data.max()
print(f"Data range before normalization: {min_data_value} to {max_data_value}")

# Normalize the data
min_val, max_val = 100, 150  # Known neonate heart rate range
data_normalized = (data - min_val) / (max_val - min_val)  # Normalize to [0, 1]
data_normalized = torch.tensor(data_normalized, dtype=torch.float32)

# Ensure normalization was successful
print(f"Data range after normalization: {data_normalized.min().item()} to {data_normalized.max().item()}")


In [None]:
# Define Generator
class Generator(nn.Module):
    def __init__(self, noise_dim):
        super(Generator, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(noise_dim, 64),
            nn.ReLU(),
            nn.BatchNorm1d(64),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, 1),
            nn.Sigmoid()  # Outputs normalized data in [0, 1]
        )

    def forward(self, noise):
        return self.model(noise)

In [None]:
# Define Discriminator
class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(1, 32),
            nn.ReLU(),
            nn.Linear(32, 16),
            nn.ReLU(),
            nn.Linear(16, 1),
            nn.Sigmoid()  # Output probability of being real
        )

    def forward(self, x):
        return self.model(x)

In [None]:
# Initialize Generator and Discriminator
noise_dim = 10
generator = Generator(noise_dim).to(device)
discriminator = Discriminator().to(device)

In [None]:
# Loss and Optimizers
criterion = nn.BCELoss()
optimizer_G = optim.Adam(generator.parameters(), lr=0.0002)
optimizer_D = optim.Adam(discriminator.parameters(), lr=0.0002)

In [None]:
# Training Loop
epochs = 5000
batch_size = 16
real_labels = torch.ones(batch_size, 1).to(device)
fake_labels = torch.zeros(batch_size, 1).to(device)

In [None]:
for epoch in range(epochs):
    # Train Discriminator
    discriminator.zero_grad()
    idx = torch.randint(0, data_normalized.size(0), (batch_size,))  # Random sampling
    real_data = data_normalized[idx].to(device)
    real_loss = criterion(discriminator(real_data), real_labels)

    noise = torch.randn(batch_size, noise_dim).to(device)
    fake_data = generator(noise).detach()
    fake_loss = criterion(discriminator(fake_data), fake_labels)
    d_loss = real_loss + fake_loss
    d_loss.backward()
    optimizer_D.step()

    # Train Generator
    generator.zero_grad()
    noise = torch.randn(batch_size, noise_dim).to(device)
    fake_data = generator(noise)
    g_loss = criterion(discriminator(fake_data), real_labels)
    g_loss.backward()
    optimizer_G.step()

    # Print losses every 500 epochs
    if epoch % 500 == 0:
        print(f"Epoch [{epoch}/{epochs}] D Loss: {d_loss.item():.4f}, G Loss: {g_loss.item():.4f}")

In [None]:
# Generate Synthetic Data
with torch.no_grad():
    noise = torch.randn(96, noise_dim).to(device)
    synthetic_data = generator(noise).cpu().numpy()

In [None]:
# Denormalize real and synthetic data for plotting
real_data_denormalized = data_normalized.numpy() * (max_val - min_val) + min_val

# If synthetic_data is already a numpy array, use it directly
synthetic_data_denormalized = synthetic_data * (max_val - min_val) + min_val

In [None]:
# Confirm denormalization
print(f"Real Data range after denormalization: {real_data_denormalized.min()} to {real_data_denormalized.max()}")
print(f"Synthetic Data range after denormalization: {synthetic_data_denormalized.min()} to {synthetic_data_denormalized.max()}")

In [None]:
# Plot the data
plt.figure(figsize=(10, 6))
plt.plot(real_data_denormalized, label="Real Data", color="blue", alpha=0.6)
plt.plot(synthetic_data_denormalized, label="Synthetic Data", color="orange", linestyle="--", alpha=0.8)
plt.xlabel("Time Steps")
plt.ylabel("Heart Rate (BPM)")
plt.title("Comparison of Real and Synthetic Neonate Heart Rate Data")
plt.legend()
plt.show()

In [None]:
# Save the trained generator's state dictionary
torch.save(generator.state_dict(), 'trained_generator_HeartRate_again.pth')

# EVALUATION CODE

In [None]:
# Quantitative Evaluation Metrics
import numpy as np
from scipy.stats import ks_2samp, wasserstein_distance, skew, kurtosis

# Flatten the denormalized data arrays
real_flat = real_data_denormalized.flatten()
syn_flat = synthetic_data_denormalized.flatten()

# 1. Kolmogorov–Smirnov (KS) Test
ks_stat, ks_p_value = ks_2samp(real_flat, syn_flat)
print(f"KS Test Statistic: {ks_stat:.4f}, p-value: {ks_p_value:.4f}")

# 2. Wasserstein Distance (Earth Mover's Distance)
w_distance = wasserstein_distance(real_flat, syn_flat)
print(f"Wasserstein Distance: {w_distance:.4f}")

# 3. Summary Statistics Comparison
real_mean = np.mean(real_flat)
real_std = np.std(real_flat)
syn_mean = np.mean(syn_flat)
syn_std = np.std(syn_flat)
print(f"Real Data Mean: {real_mean:.4f}, STD: {real_std:.4f}")
print(f"Synthetic Data Mean: {syn_mean:.4f}, STD: {syn_std:.4f}")

# 4. Skewness and Kurtosis
real_skew = skew(real_flat)
syn_skew = skew(syn_flat)
real_kurt = kurtosis(real_flat)
syn_kurt = kurtosis(syn_flat)
print(f"Real Data Skewness: {real_skew:.4f}, Kurtosis: {real_kurt:.4f}")
print(f"Synthetic Data Skewness: {syn_skew:.4f}, Kurtosis: {syn_kurt:.4f}")
