In [59]:
import pickle
import numpy as np

path_db = "../../ECG_DATASET/dataset_ekg.pkl"

with open(path_db, "rb") as f:
    dataset = pickle.load(f)

dataset

{'NSR': array([[-0.06451476, -0.05951476, -0.02951476, ...,  0.13548524,
          0.13048524,  0.11548524],
        [ 0.24548524,  0.25048524,  0.24048524, ...,  0.34048524,
          0.33548524,  0.31548524],
        [ 0.13048524,  0.12548524,  0.11048524, ..., -0.20451476,
         -0.20451476, -0.20451476],
        ...,
        [-0.46451476, -0.46451476, -0.46451476, ..., -0.59451476,
         -0.58951476, -0.58951476],
        [-0.53451476, -0.51451476, -0.53451476, ..., -0.51451476,
         -0.44451476, -0.40451476],
        [ 0.51548524,  0.50548524,  0.50548524, ...,  0.21048524,
          0.20048524,  0.18548524]]),
 'VT': array([[ 0.10787069,  0.08787069,  0.08787069, ...,  0.01287069,
         -0.00212931,  0.01787069],
        [-0.31212931, -0.29712931, -0.27212931, ...,  0.82787069,
          0.84787069,  0.86787069],
        [ 0.01287069,  0.00787069,  0.00287069, ...,  0.04787069,
          0.06787069,  0.10287069],
        ...,
        [ 0.17287069,  0.16787069,  0.162

In [60]:
X_dataset = dataset['NSR'][:,: dataset["NSR"].shape[1]//2]

X_dataset.shape

(283, 1800)

In [61]:
N = X_dataset.shape[1]

fs = 360
ts = 1/fs
t = np.arange(N)*ts

In [62]:
# Normalizar al rango [-1, 1] para coincidir con Tanh
X_min = X_dataset.min()
X_max = X_dataset.max()
X_dataset_normalized = 2 * (X_dataset - X_min) / (X_max - X_min) - 1

print(f"Original range: [{X_min:.3f}, {X_max:.3f}]")
print(f"Normalized range: [{X_dataset_normalized.min():.3f}, {X_dataset_normalized.max():.3f}]")

Original range: [-2.155, 3.040]
Normalized range: [-1.000, 1.000]


In [63]:
import torch
import torch.nn as nn
from torch.optim import Adam, AdamW
from torch.utils.data import DataLoader, Dataset
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
seed = 2025
torch.manual_seed(seed)
np.random.seed(seed)

class SineWaveDataset:
    def __init__(self, data):
        self.data = torch.tensor(data, dtype=torch.float32)

    def __len__(self):
        return self.data.shape[0]

    def __getitem__(self, idx):
        return self.data[idx]
    

def get_dataloader(data, batch_size=32, shuffle=True):
    dataset = SineWaveDataset(data)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)
    return dataloader

In [64]:
# ========== IMPROVED GAN ARCHITECTURE FOR ECG SIGNALS ==========
# Using 1D Convolutional layers for better temporal pattern learning

def weights_init(m):
    """Weight initialization for convolutional and linear layers"""
    classname = m.__class__.__name__
    if classname.find('Conv') != -1:
        nn.init.normal_(m.weight.data, 0.0, 0.02)
    elif classname.find('Linear') != -1:
        nn.init.normal_(m.weight.data, 0.0, 0.02)
        if m.bias is not None:
            nn.init.constant_(m.bias.data, 0)
    elif classname.find('BatchNorm') != -1:
        nn.init.normal_(m.weight.data, 1.0, 0.02)
        nn.init.constant_(m.bias.data, 0)

# ========== GENERATOR: 1D CNN ARCHITECTURE ==========
class Generator(nn.Module):
    """Generator using 1D transposed convolutions for temporal signal generation"""
    def __init__(self, noise_dim=100, output_length=3600):
        super().__init__()
        self.noise_dim = noise_dim
        self.output_length = output_length
        
        # Calculate initial feature map size
        # We'll upsample: 225 -> 450 -> 900 -> 1800 -> 3600
        self.init_size = output_length // 16  # 3600 // 16 = 225
        
        # Project noise to initial feature map
        self.fc = nn.Sequential(
            nn.Linear(noise_dim, 256 * self.init_size),
            nn.BatchNorm1d(256 * self.init_size),
            nn.ReLU(True)
        )
        
        # Transposed convolutions for upsampling
        self.conv_blocks = nn.Sequential(
            # 225 -> 450
            nn.ConvTranspose1d(256, 128, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm1d(128),
            nn.ReLU(True),
            
            # 450 -> 900
            nn.ConvTranspose1d(128, 64, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm1d(64),
            nn.ReLU(True),
            
            # 900 -> 1800
            nn.ConvTranspose1d(64, 32, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm1d(32),
            nn.ReLU(True),
            
            # 1800 -> 3600
            nn.ConvTranspose1d(32, 16, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm1d(16),
            nn.ReLU(True),
            
            # Final refinement layer
            nn.Conv1d(16, 1, kernel_size=7, stride=1, padding=3),
            nn.Tanh()
        )
        
    def forward(self, z):
        # Project and reshape
        x = self.fc(z)
        x = x.view(-1, 256, self.init_size)
        # Generate signal
        x = self.conv_blocks(x)
        return x.squeeze(1)  # Remove channel dimension

# ========== DISCRIMINATOR: 1D CNN ARCHITECTURE ==========
class Discriminator(nn.Module):
    """Discriminator using 1D convolutions for temporal pattern recognition"""
    def __init__(self, input_length=3600):
        super().__init__()
        
        self.conv_blocks = nn.Sequential(
            # 3600 -> 1800
            nn.Conv1d(1, 32, kernel_size=4, stride=2, padding=1),
            nn.LeakyReLU(0.2, True),
            nn.Dropout(0.25),
            
            # 1800 -> 900
            nn.Conv1d(32, 64, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm1d(64),
            nn.LeakyReLU(0.2, True),
            nn.Dropout(0.25),
            
            # 900 -> 450
            nn.Conv1d(64, 128, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm1d(128),
            nn.LeakyReLU(0.2, True),
            nn.Dropout(0.25),
            
            # 450 -> 225
            nn.Conv1d(128, 256, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm1d(256),
            nn.LeakyReLU(0.2, True),
        )
        
        # Calculate flattened size: 256 channels * 225 length
        flattened_size = 256 * (input_length // 16)
        self.fc = nn.Sequential(
            nn.Linear(flattened_size, 1)
        )
        
    def forward(self, x):
        # Add channel dimension if needed
        if x.dim() == 2:
            x = x.unsqueeze(1)
        x = self.conv_blocks(x)
        x = x.view(x.size(0), -1)
        return self.fc(x)

# ========== SPECTRAL NORMALIZATION DISCRIMINATOR (Alternative) ==========
class SpectralNormDiscriminator(nn.Module):
    """Discriminator with Spectral Normalization for stability"""
    def __init__(self, input_length=3600):
        super().__init__()
        
        self.conv_blocks = nn.Sequential(
            # 3600 -> 1800
            nn.utils.spectral_norm(nn.Conv1d(1, 64, kernel_size=4, stride=2, padding=1)),
            nn.LeakyReLU(0.2, True),
            
            # 1800 -> 900
            nn.utils.spectral_norm(nn.Conv1d(64, 128, kernel_size=4, stride=2, padding=1)),
            nn.LeakyReLU(0.2, True),
            
            # 900 -> 450
            nn.utils.spectral_norm(nn.Conv1d(128, 256, kernel_size=4, stride=2, padding=1)),
            nn.LeakyReLU(0.2, True),
            
            # 450 -> 225
            nn.utils.spectral_norm(nn.Conv1d(256, 512, kernel_size=4, stride=2, padding=1)),
            nn.LeakyReLU(0.2, True),
        )
        
        # Calculate flattened size: 512 channels * 225 length
        flattened_size = 512 * (input_length // 16)
        self.fc = nn.Sequential(
            nn.utils.spectral_norm(nn.Linear(flattened_size, 1))
        )
        
    def forward(self, x):
        if x.dim() == 2:
            x = x.unsqueeze(1)
        x = self.conv_blocks(x)
        x = x.view(x.size(0), -1)
        return self.fc(x)

In [73]:
# ========== WGAN-GP TRAINER ==========
def compute_gradient_penalty(discriminator, real_samples, fake_samples, device):
    """Calculates the gradient penalty for WGAN-GP"""
    batch_size = real_samples.size(0)
    alpha = torch.rand(batch_size, 1).to(device)
    
    # Interpolated samples
    interpolates = (alpha * real_samples + (1 - alpha) * fake_samples).requires_grad_(True)
    
    d_interpolates = discriminator(interpolates)
    
    fake = torch.ones(batch_size, 1).to(device)
    
    # Get gradients
    gradients = torch.autograd.grad(
        outputs=d_interpolates,
        inputs=interpolates,
        grad_outputs=fake,
        create_graph=True,
        retain_graph=True,
        only_inputs=True
    )[0]
    
    gradients = gradients.view(batch_size, -1)
    gradient_penalty = ((gradients.norm(2, dim=1) - 1) ** 2).mean()
    return gradient_penalty

class WGANGPTrainer:
    """Wasserstein GAN with Gradient Penalty trainer"""
    def __init__(self, generator, discriminator, g_optimizer, d_optimizer, device, 
                 lambda_gp=10, n_critic=5):
        self.generator = generator
        self.discriminator = discriminator
        self.g_optimizer = g_optimizer
        self.d_optimizer = d_optimizer
        self.device = device
        self.lambda_gp = lambda_gp
        self.n_critic = n_critic
        self.d_steps = 0

    def train_step(self, real_data):
        batch_size = real_data.size(0)
        real_data = real_data.to(self.device)

        # ===== Train Discriminator =====
        self.d_optimizer.zero_grad()

        # Real samples
        d_real = self.discriminator(real_data).mean()

        # Fake samples
        noise = torch.randn(batch_size, noise_dim).to(self.device)
        fake_data = self.generator(noise)
        d_fake = self.discriminator(fake_data.detach()).mean()

        # Gradient penalty
        gp = compute_gradient_penalty(self.discriminator, real_data, fake_data, self.device)

        # Wasserstein loss with gradient penalty
        d_loss = d_fake - d_real + self.lambda_gp * gp
        d_loss.backward()
        self.d_optimizer.step()

        self.d_steps += 1

        # ===== Train Generator (every n_critic steps) =====
        g_loss = torch.tensor(0.0)
        if self.d_steps % self.n_critic == 0:
            self.g_optimizer.zero_grad()
            noise = torch.randn(batch_size, noise_dim).to(self.device)
            fake_data = self.generator(noise)
            g_loss = -self.discriminator(fake_data).mean()  # Maximize D(G(z))
            g_loss.backward()
            self.g_optimizer.step()

        # Return Wasserstein distance and generator loss
        wasserstein_d = (d_real - d_fake).item()
        return d_loss.item(), g_loss.item(), wasserstein_d, gp.item()

# ========== HYPERPARAMETERS ==========
noise_dim = 100
batch_size = 64
output_dim = X_dataset_normalized.shape[1]
discriminator_type = 'spectral'  # Options: 'spectral' or 'standard'

# ========== MODEL INITIALIZATION ==========
print(f"Device: {device}")
print(f"Training samples: {X_dataset_normalized.shape[0]}")
print(f"Signal length: {output_dim}")

# Adjust output_dim to be compatible with the generator architecture
# The generator uses 4 ConvTranspose1d layers with stride=2, which multiplies by 16
# Starting from initial_length = 112, we get 112 * 2^4 = 1792
# We need to pad or adjust to get 1800
adjusted_output_dim = 1792  # This is what the current architecture produces

# Initialize generator
# Note: The Generator class should produce adjusted_output_dim length signals
# If Generator doesn't accept output_dim, it should be designed to produce the correct length
generator = Generator(noise_dim=noise_dim).to(device)

# Calculate the correct flattened size after conv layers
# After 4 Conv1d layers with stride=2: 1792 -> 896 -> 448 -> 224 -> 112
# With 512 channels: 112 * 512 = 57344
if discriminator_type == 'spectral':
    # Recreate discriminator with correct flattened size
    discriminator = SpectralNormDiscriminator(input_length=adjusted_output_dim).to(device)
    # Force recreation of FC layer with correct input size
    test_input = torch.randn(1, 1, adjusted_output_dim).to(device)
    with torch.no_grad():
        conv_output = discriminator.conv_blocks(test_input)
        actual_flattened_size = conv_output.view(1, -1).shape[1]
    # Recreate FC layer with correct size
    discriminator.fc = torch.nn.Sequential(
        torch.nn.utils.spectral_norm(torch.nn.Linear(actual_flattened_size, 1))
    ).to(device)
    print("Using Spectral Normalization Discriminator")
    print(f"Input length to discriminator: {adjusted_output_dim}")
    print(f"Actual flattened size: {actual_flattened_size}")
    print(f"Note: Data will be interpolated from {output_dim} to {adjusted_output_dim}")
else:
    discriminator = Discriminator(input_length=adjusted_output_dim).to(device)
    # Fix FC layer for standard discriminator too
    test_input = torch.randn(1, 1, adjusted_output_dim).to(device)
    with torch.no_grad():
        conv_output = discriminator.conv_blocks(test_input)
        actual_flattened_size = conv_output.view(1, -1).shape[1]
    discriminator.fc = torch.nn.Sequential(
        torch.nn.Linear(actual_flattened_size, 1)
    ).to(device)
    print("Using Standard Discriminator")
    print(f"Input length to discriminator: {adjusted_output_dim}")
    print(f"Actual flattened size: {actual_flattened_size}")

# Initialize weights
generator.apply(weights_init)
discriminator.apply(weights_init)

# ========== OPTIMIZERS ==========
# WGAN-GP typically uses Adam with specific hyperparameters
g_optimizer = Adam(generator.parameters(), lr=0.0001, betas=(0.0, 0.9))
d_optimizer = Adam(discriminator.parameters(), lr=0.0001, betas=(0.0, 0.9))

# ========== TRAINER SETUP ==========
trainer = WGANGPTrainer(
    generator, discriminator, 
    g_optimizer, d_optimizer, 
    device, 
    lambda_gp=10,  # Gradient penalty coefficient
    n_critic=5     # Train discriminator 5 times per generator update
)

# ========== TRAINING LOOP ==========
num_epochs = 2000
d_losses = []
g_losses = []
w_distances = []
gp_values = []

# Interpolate data to match generator output size
import torch.nn.functional as F
X_dataset_adjusted = F.interpolate(
    torch.tensor(X_dataset_normalized).unsqueeze(1), 
    size=adjusted_output_dim, 
    mode='linear', 
    align_corners=False
).squeeze(1).numpy()

# Use adjusted data
dataloader_normalized = get_dataloader(X_dataset_adjusted, batch_size=batch_size, shuffle=True)

EPOCHS_TO_PLOT = 200
LOSS_MAX_VIEW = batch_size * EPOCHS_TO_PLOT

for epoch in range(num_epochs):
    epoch_d_loss = []
    epoch_g_loss = []
    epoch_wd = []
    epoch_gp = []
    
    for real_data in dataloader_normalized:
        # Ensure real_data has correct shape (batch, 1, adjusted_output_dim)
        if real_data.dim() == 2:
            real_data = real_data.unsqueeze(1)
        elif real_data.dim() == 3 and real_data.size(1) != 1:
            # If shape is (batch, seq_len, channels), transpose to (batch, channels, seq_len)
            real_data = real_data.transpose(1, 2)
        # Always ensure the size matches adjusted_output_dim
        if real_data.size(-1) != adjusted_output_dim:
            real_data = F.interpolate(real_data, size=adjusted_output_dim, mode='linear', align_corners=False)
        d_loss, g_loss, w_d, gp = trainer.train_step(real_data)
        epoch_d_loss.append(d_loss)
        epoch_g_loss.append(g_loss if isinstance(g_loss, float) else g_loss.item())
        epoch_wd.append(w_d)
        epoch_gp.append(gp)
    
    # Store average losses per epoch
    d_losses.append(np.mean(epoch_d_loss))
    g_losses.append(np.mean(epoch_g_loss))
    w_distances.append(np.mean(epoch_wd))
    gp_values.append(np.mean(epoch_gp))

    # Print progress
    if (epoch+1) % 50 == 0:
        print(f'Epoch [{epoch+1}/{num_epochs}]')
        print(f'  D Loss: {d_losses[-1]:.4f} | G Loss: {g_losses[-1]:.4f}')
        print(f'  Wasserstein Distance: {w_distances[-1]:.4f} | GP: {gp_values[-1]:.4f}')

    # Plot progress
    if (epoch+1) % EPOCHS_TO_PLOT == 0:
        plt.figure(figsize=(16, 10))
        generator.eval()
        with torch.no_grad():
            # Generate multiple samples
            noise = torch.randn(3, noise_dim).to(device)
            generated_samples = generator(noise).cpu().numpy()
        # Plot real sample for comparison (interpolate t to match adjusted length)
        plt.subplot(4, 2, 4)
        t_adjusted = np.linspace(0, t[-1], adjusted_output_dim)
        # Denormalize generated samples
        generated_samples_denorm = generated_samples * (X_max - X_min) + X_min
        
        # Plot generated samples
        for i in range(3):
            plt.subplot(4, 2, i+1)
            t_adjusted = np.linspace(0, t[-1], adjusted_output_dim)
            plt.plot(t_adjusted, generated_samples_denorm[i], color='blue', alpha=0.7, linewidth=0.8)
            plt.xlabel('Time [s]')
            plt.ylabel('Amplitude')
            plt.title(f'Generated Sample {i+1} - Epoch {epoch+1}')
            plt.grid(alpha=0.3)
        
        # Plot real sample for comparison (interpolate t to match adjusted length)
        plt.subplot(4, 2, 4)
        t_adjusted = np.linspace(0, t[-1], adjusted_output_dim)
        plt.plot(t_adjusted, X_dataset_adjusted[np.random.randint(0, X_dataset_adjusted.shape[0])], color='green', alpha=0.7, linewidth=0.8)
        plt.xlabel('Time [s]')
        plt.ylabel('Amplitude')
        plt.title('Real ECG Sample (Interpolated)')
        plt.grid(alpha=0.3)
        plt.plot(d_losses, label='D Loss', alpha=0.7)
        plt.plot(g_losses, label='G Loss', alpha=0.7)
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.title('Training Losses')
        plt.legend()
        plt.grid(alpha=0.3)

        # Plot Wasserstein distance
        plt.subplot(4, 2, 6)
        # Frequency domain comparison
        plt.subplot(4, 2, 8)
        from scipy.fft import fft, fftfreq
        # Plot gradient penalty
        plt.subplot(4, 2, 7)
        plt.plot(gp_values, color='red', alpha=0.7)
        plt.xlabel('Epoch')
        plt.ylabel('Gradient Penalty')
        plt.title('Gradient Penalty')
        plt.grid(alpha=0.3)

        # Frequency domain comparison
        plt.subplot(4, 2, 8)
        from scipy.fft import fft, fftfreq
        # Use adjusted data for FFT comparison
        fft_real = np.abs(fft(X_dataset_adjusted[0]))[:adjusted_output_dim//2]
        fft_gen = np.abs(fft(generated_samples_denorm[0]))[:adjusted_output_dim//2]
        freqs = fftfreq(adjusted_output_dim, ts)[:adjusted_output_dim//2]
        plt.ylabel('Magnitude')
        plt.title('Frequency Domain Comparison')
        plt.xlim([0, 50])  # Focus on relevant ECG frequencies
        plt.legend()
        plt.grid(alpha=0.3)
        
        plt.tight_layout()
        plt.show()

print("\n" + "="*50)
print("Training Finished!")
print("="*50)

Device: cuda
Training samples: 283
Signal length: 1800
Using Spectral Normalization Discriminator
Input length to discriminator: 1792
Actual flattened size: 57344
Note: Data will be interpolated from 1800 to 1792


RuntimeError: mat1 and mat2 shapes cannot be multiplied (64x115200 and 57344x1)

In [None]:
# ========== EVALUATION METRICS ==========
from scipy.stats import pearsonr, wasserstein_distance as wd
from scipy.signal import correlate

def calculate_metrics(real_data, generated_data):
    """Calculate various metrics to evaluate ECG quality"""
    metrics = {}
    
    # 1. Mean Squared Error
    mse = np.mean((real_data - generated_data) ** 2)
    metrics['MSE'] = mse
    
    # 2. Pearson Correlation
    corr, _ = pearsonr(real_data.flatten(), generated_data.flatten())
    metrics['Pearson_Correlation'] = corr
    
    # 3. Mean Absolute Error
    mae = np.mean(np.abs(real_data - generated_data))
    metrics['MAE'] = mae
    
    # 4. Dynamic Time Warping distance (simplified)
    from scipy.spatial.distance import euclidean
    dtw_dist = euclidean(real_data.flatten(), generated_data.flatten())
    metrics['DTW_Distance'] = dtw_dist
    
    # 5. Frequency domain similarity
    fft_real = np.abs(fft(real_data.flatten()))
    fft_gen = np.abs(fft(generated_data.flatten()))
    freq_similarity = pearsonr(fft_real, fft_gen)[0]
    metrics['Frequency_Similarity'] = freq_similarity
    
    return metrics

# Generate samples for evaluation
print("\n" + "="*50)
print("EVALUATION METRICS")
print("="*50 + "\n")

generator.eval()
num_samples = 100
with torch.no_grad():
    noise = torch.randn(num_samples, noise_dim).to(device)
    generated_samples = generator(noise).cpu().numpy()
    # Denormalize
    generated_samples = (generated_samples + 1) / 2 * (X_max - X_min) + X_min

# Calculate metrics
real_samples = X_dataset[:num_samples]
metrics = calculate_metrics(real_samples, generated_samples)

print("Quality Metrics:")
for key, value in metrics.items():
    print(f"  {key}: {value:.6f}")

# Statistical comparison
print("\nStatistical Comparison:")
print(f"  Real data - Mean: {real_samples.mean():.4f}, Std: {real_samples.std():.4f}")
print(f"  Generated - Mean: {generated_samples.mean():.4f}, Std: {generated_samples.std():.4f}")

# Visualize multiple generated samples
plt.figure(figsize=(15, 8))
for i in range(5):
    plt.subplot(2, 3, i+1)
    plt.plot(t, generated_samples[i], label='Generated', alpha=0.7, linewidth=0.8)
    plt.plot(t, real_samples[i], label='Real', alpha=0.5, linewidth=0.8)
    plt.xlabel('Time [s]')
    plt.ylabel('Amplitude')
    plt.title(f'Sample {i+1}')
    plt.legend()
    plt.grid(alpha=0.3)

plt.subplot(2, 3, 6)
plt.hist(real_samples.flatten(), bins=50, alpha=0.5, label='Real', density=True)
plt.hist(generated_samples.flatten(), bins=50, alpha=0.5, label='Generated', density=True)
plt.xlabel('Amplitude')
plt.ylabel('Density')
plt.title('Distribution Comparison')
plt.legend()
plt.grid(alpha=0.3)

plt.tight_layout()
plt.show()

# Save models
print("\n" + "="*50)
print("Saving models...")
torch.save({
    'generator': generator.state_dict(),
    'discriminator': discriminator.state_dict(),
    'g_optimizer': g_optimizer.state_dict(),
    'd_optimizer': d_optimizer.state_dict(),
    'epoch': num_epochs,
    'metrics': metrics
}, 'ecg_wgan_gp_model.pth')
print("Models saved to: ecg_wgan_gp_model.pth")
print("="*50)


EVALUATION METRICS



ValueError: operands could not be broadcast together with shapes (100,1800) (100,1792) 