In [1]:
import torch
import numpy as np
from biosppy.signals import ecg
from torch.utils.data import DataLoader
from load_ptbdata import load_data
from torch.utils.data import random_split

In [2]:
sampling_rate = 100
X, Y = load_data("data/ptb-xl/",
                 sampling_rate=sampling_rate,
                 limit=None,  # The maximum number of signals to load
                 only_precordial_leads=True,  # Use only the precordial leads
                 only_normal=False  # Whether to use only "normal" (healthy) heart beats
                 )
X.shape

Loading and converting annotation data...
Loading diagnostic aggregation data...
Applying diagnostic superclass...
Loading raw signal data...


(21799, 1000, 6)

In [7]:
all_templates = []
valid_indices = []

for sig_idx, ecg_signal in enumerate(X):

    lead_templates = []
    skip_signal = False

    for lead in range(ecg_signal.shape[1]):

        signal = ecg_signal[:, lead]
        
        try:
            # Compute the ECG templates
            out = ecg.ecg(signal=signal, sampling_rate=sampling_rate, show=False)
        except:
            skip_signal = True
            break

        templates = out['templates']
        # mean_template = np.mean(templates, axis=0)
        median_template = np.median(templates, axis=0)

        lead_templates.append(median_template)
        # lead_templates.append(mean_template)
    
    if not skip_signal:
        # Only keep signals for which all leads could be processed successfully
        # Sometimes certain recordings (usually the unhealthy hearbeats) break 
        # the biosppy ecg function and the templates cannot be computed, so we
        # skip them (until I find a better way of dealing with this).
        valid_indices.append(sig_idx)
        all_templates.append(lead_templates)

all_templates = np.array(all_templates)

# Remove skipped signals from X and Y
X_original = X
X = X[valid_indices]
Y = Y.iloc[valid_indices]

print(f"Could not process {len(X_original)-len(valid_indices)} signals")
print(f"Remaining signals: {X.shape[0]}") 

Could not process 1 signals
Remaining signals: 9513


In [10]:
all_templates.shape

(9513, 6, 60)

In [11]:
X_train = all_templates[np.where(Y.strat_fold < 9)]
X_val = all_templates[np.where(Y.strat_fold == 9)]
X_test = all_templates[np.where(Y.strat_fold == 10)]
print(f"Train: {X_train.shape}, Val: {X_val.shape}, Test: {X_test.shape}")

Train: (7595, 6, 60), Val: (955, 6, 60), Test: (963, 6, 60)


In [3]:
# Use first 8 folds for training, fold 9 for validation, and fold 10 for testing
X_train = X[np.where(Y.strat_fold < 9)]
X_val = X[np.where(Y.strat_fold == 9)]
X_test = X[np.where(Y.strat_fold == 10)]
print(f"Training set: {X_train.shape[0]}")
print(f"Validation set: {X_val.shape[0]}")
print(f"Test set: {X_test.shape[0]}")

Training set: 17418
Validation set: 2183
Test set: 2198


In [4]:
# Convert to torch tensors
X_train = torch.from_numpy(X_train).float()
X_val = torch.from_numpy(X_val).float()
X_test = torch.from_numpy(X_test).float()

print(f"X_train shape: {X_train.shape}")
print(f"X_val shape: {X_val.shape}")
print(f"X_test shape: {X_test.shape}")

# Reshape
X_train = X_train.permute(0, 2, 1)
X_val = X_val.permute(0, 2, 1)
X_test = X_test.permute(0, 2, 1)

print(f"X_train shape: {X_train.shape}")
print(f"X_val shape: {X_val.shape}")
print(f"X_test shape: {X_test.shape}")

X_train shape: torch.Size([17418, 1000, 6])
X_val shape: torch.Size([2183, 1000, 6])
X_test shape: torch.Size([2198, 1000, 6])
X_train shape: torch.Size([17418, 6, 1000])
X_val shape: torch.Size([2183, 6, 1000])
X_test shape: torch.Size([2198, 6, 1000])


In [5]:
torch.save(X_train, "data/ptb-xl/ptbxl_ecg_100_all_train.pt")
torch.save(X_val, "data/ptb-xl/ptbxl_ecg_100_all_val.pt")
torch.save(X_test, "data/ptb-xl/ptbxl_ecg_100_all_test.pt")

In [None]:
# Load the data
X_train = torch.load("data/ptb-xl/ptbxl_ecg_100_all_templates_train.pt")
X_val = torch.load("data/ptb-xl/ptbxl_ecg_100_all_templates_val.pt")
X_test = torch.load("data/ptb-xl/ptbxl_ecg_100_all_templates_test.pt")


In [10]:
# Convert to PyTorch tensor
X = torch.tensor(X, dtype=torch.float32)
# Reshape to (N, 6, T)
X = X.reshape(X.shape[0], 6, -1)
print(X.shape)
# Save to file
torch.save(X, "data/ptb-xl/ptbxl_ecg_100_all.pt")

  X = torch.tensor(X, dtype=torch.float32)


torch.Size([21799, 6, 1000])


In [11]:
# Load from file
filename = "data/ptb-xl/ptbxl_ecg_100_all.pt"
ecg_tensor = torch.load(filename)

# Check the shape of the loaded tensor
print(ecg_tensor.shape)  # Should be (N, 6, T)

torch.Size([21799, 6, 1000])


In [12]:
# Split into test set
test_size = int(0.2 * len(ecg_tensor))  # 20% for testing
train_size = len(ecg_tensor) - test_size  # Remaining for training

train_dataset, test_dataset = random_split(ecg_tensor, [train_size, test_size])
print(f"Train dataset size: {len(train_dataset)}")
print(f"Test dataset size: {len(test_dataset)}")

Train dataset size: 17440
Test dataset size: 4359


In [13]:
# Convert Subsets to Tensors
train_dataset = torch.stack([train_dataset[i] for i in range(len(train_dataset))])
test_dataset = torch.stack([test_dataset[i] for i in range(len(test_dataset))])

# Save the split datasets
torch.save(train_dataset, "data/ptb-xl/ptbxl_ecg_100_all_train.pt")
torch.save(test_dataset, "data/ptb-xl/ptbxl_ecg_100_all_test.pt")