In [4]:
import os, ast, random, time
import numpy as np
import pandas as pd
import wfdb
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import f1_score, accuracy_score, classification_report, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
import time
import ptbxl_utils

In [5]:
# Set random seed for reproducibility across Python, NumPy, and PyTorch
def set_seed(seed: int = 42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    print(f" Random seed set to {seed}\n")

set_seed(42)
SEED = 42  # Keep SEED defined for consistency (used later)

 Random seed set to 42



### Dataset Setup and Initialization
Load the PTB-XL metadata, select training, validation, and test folds, set the ECG sampling  
frequency (100Hz or 500Hz), and choose between filtered (AFIB vs NORM) or unfiltered   
(AFIB vs ALL other classes) dataset modes. The PTB-XL dataset is pre-divided into 10  
 stratified folds (1–10).One or multiple folds can be selected for each split — for example:    
rain_folds = [1, 2, 3]  
val_folds = [9]  
test_folds = [10]


In [14]:
data_path = "ptb-xl-dataset-1.0.3"
csv_path = os.path.join(data_path, "ptbxl_database.csv")
df = pd.read_csv(csv_path)

# Run dataset setup
train_data, val_data, test_data, freq = ptbxl_utils.setup_dataset(
    df,
    train_folds=[1,2],
    val_folds=[9],          
    test_folds=[10],        
    use_500Hz= False,        # True: use 500Hz ECG signals  |  False: 100Hz
    use_filtered=False       # True: Filterd data(only AFIB and Norm) | False: Full(AFIB vs all others)
)

Selected folds:
 Train: Fold [1, 2] (4356 samples)
 Validation: Fold [9] (2183 samples)
 Test: Fold [10] (2198 samples)
 Total: 8737 samples

Using ECG signal (100Hz)

Using full dataset (AFIB vs ALL — all non-AFIB treated as NORM)
Sampling rate: 100Hz
Train      | AFIB: 302    | Non-AFIB: 4054   | Total: 4356
Validation | AFIB: 151    | Non-AFIB: 2032   | Total: 2183
Test       | AFIB: 152    | Non-AFIB: 2046   | Total: 2198

Totals:
 AFIB: 605 | Non-AFIB: 8132 | Total: 8737



#### Create PyTorch Datasets class and DataLoaders

In [7]:
# Load an ECG record from PTB-XL using WFDB.
def load_ecg(record_path: str) -> np.ndarray:
    import os

    # Automatically prepend the dataset folder if missing
    base_dir = os.path.join(os.getcwd(), "ptb-xl-dataset-1.0.3")
    full_path = os.path.join(base_dir, record_path)

    if not os.path.exists(full_path + ".hea"):
        raise FileNotFoundError(f"ECG file not found at {full_path}.hea")

    record = wfdb.rdrecord(full_path)
    return record.p_signal.astype(np.float32)

# Quick test
sample_path = train_data.iloc[0]["filename"]
print("Testing path:", sample_path)
print("Loaded shape:", load_ecg(sample_path).shape)

Testing path: records100/00000/00002_lr
Loaded shape: (1000, 12)


In [8]:
# 7. PyTorch Dataset class
class PTBXL_Dataset(Dataset):
    """Custom dataset: loads ECG signals, normalizes per-lead, returns (tensor, label)."""
    def __init__(self, df_split, mean=None, std=None):
        self.paths = df_split["filename"].values
        self.labels = df_split["label"].astype(np.float32).values

        if mean is None or std is None:
            samp = df_split.sample(min(100, len(df_split)), random_state=SEED)["filename"]
            cat = np.concatenate([load_ecg(p) for p in samp], axis=0)
            self.mean = cat.mean(axis=0)
            self.std  = cat.std(axis=0) + 1e-7
        else:
            self.mean, self.std = mean, std

    def __len__(self): return len(self.paths)

    def __getitem__(self, idx):
        arr = load_ecg(self.paths[idx])
        arr = (arr - self.mean) / self.std
        arr = torch.tensor(arr.T, dtype=torch.float32)  # (12, L)
        label = torch.tensor(self.labels[idx], dtype=torch.float32)
        return arr, label

DataLoaders

In [9]:
use_cuda = torch.cuda.is_available()
train_set = PTBXL_Dataset(train_data)
val_set   = PTBXL_Dataset(val_data,  mean=train_set.mean, std=train_set.std)
test_set  = PTBXL_Dataset(test_data, mean=train_set.mean, std=train_set.std)

train_loader = DataLoader(train_set, batch_size=16, shuffle=True, num_workers=0, pin_memory=use_cuda)
val_loader   = DataLoader(val_set,   batch_size=16, shuffle=False, num_workers=0, pin_memory=use_cuda)
test_loader  = DataLoader(test_set,  batch_size=16, shuffle=False, num_workers=0, pin_memory=use_cuda)

print(f"DataLoaders ready (GPU = {use_cuda})\n")

DataLoaders ready (GPU = False)



### Define a simple 1D-CNN model(3 layers)

In [10]:
#CNN model (auto-adapts to input length)
class CNN1D(nn.Module):
    def __init__(self, input_length, in_channels=12):
        super().__init__()

        # Convolutional feature extractor
        self.conv_layers = nn.Sequential(
            nn.Conv1d(in_channels, 32, 5, padding=2), nn.ReLU(), nn.MaxPool1d(2),
            nn.Conv1d(32, 64, 5, padding=2), nn.ReLU(), nn.MaxPool1d(2),
            nn.Conv1d(64, 128, 5, padding=2), nn.ReLU(), nn.MaxPool1d(2),
        )

        # Dynamically compute flatten dimension
        with torch.no_grad():
            dummy = torch.zeros(1, in_channels, input_length)
            conv_out = self.conv_layers(dummy)
            flat_dim = conv_out.shape[1] * conv_out.shape[2]
        print(f"Detected flatten dimension: {flat_dim} for input length {input_length}")

        #Fully connected classification head
        self.fc_layers = nn.Sequential(
            nn.Linear(flat_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 1)
        )

    def forward(self, x):
        x = self.conv_layers(x)
        x = x.view(x.size(0), -1)
        return self.fc_layers(x)



# pick correct input length automatically
INPUT_LEN = 5000 if freq == 500 else 1000
model = CNN1D(input_length=INPUT_LEN)
device = torch.device("cuda" if use_cuda else "cpu")
model.to(device)
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)
print(f"Model created for input length {INPUT_LEN}, running on {device}\n")


Detected flatten dimension: 16000 for input length 1000
Model created for input length 1000, running on cpu



##### Define training and evaluation function

In [11]:
# Train the model for one epoch.
def train_one_epoch(model, loader, criterion, optimizer, device):
    model.train(); total_loss = 0.0
    for x, y in loader:
        x, y = x.to(device), y.to(device)
        optimizer.zero_grad()
        out = model(x)
        loss = criterion(out, y.unsqueeze(1))  # binary case
        loss.backward()
        optimizer.step()
        total_loss += loss.item() * x.size(0)
    avg_loss = total_loss / len(loader.dataset)
    return avg_loss

# Evaluate the model on validation/test data.
def evaluate_model(model, loader, criterion, device, threshold=0.5):
    model.eval(); total_loss = 0.0
    y_true, y_pred = [], []
    with torch.no_grad():
        for x, y in loader:
            x, y = x.to(device), y.to(device)
            out = model(x)
            loss = criterion(out, y.unsqueeze(1))
            total_loss += loss.item() * x.size(0)
            probs = torch.sigmoid(out)
            preds = (probs > threshold).squeeze(1).long()
            y_true.extend(y.cpu().numpy())
            y_pred.extend(preds.cpu().numpy())

    # Compute metrics
    f1 = f1_score(y_true, y_pred)
    acc = accuracy_score(y_true, y_pred)

    avg_loss = total_loss / len(loader.dataset)
    return avg_loss, f1, acc, np.array(y_true), np.array(y_pred)


#### Implement the training process

In [12]:
# 11. Training loop
start_time = time.time()
NUM_EPOCHS = 10
best_f1 = 0.0; best_state = None

for epoch in range(NUM_EPOCHS):
    print(f"Epoch {epoch+1}/{NUM_EPOCHS}")
    tr_loss = train_one_epoch(model, train_loader, criterion, optimizer, device)
    val_loss, val_f1, val_acc, y_true, y_pred = evaluate_model(model, val_loader, criterion, device)
    print(f"  Train Loss {tr_loss:.4f} | Val Loss {val_loss:.4f} | Val F1 {val_f1:.4f} | Val Acc {val_acc:.4f}")
    if val_f1 > best_f1:
        best_f1 = val_f1; best_state = model.state_dict()
        print("  New model saved\n")

if best_state:
    model.load_state_dict(best_state)
    print("Best model (highest Val F1) re-loaded.\n")

Epoch 1/10
  Train Loss 0.2471 | Val Loss 0.1881 | Val F1 0.0000 | Val Acc 0.9308
Epoch 2/10
  Train Loss 0.1644 | Val Loss 0.1314 | Val F1 0.0000 | Val Acc 0.9308
Epoch 3/10
  Train Loss 0.1107 | Val Loss 0.1218 | Val F1 0.0000 | Val Acc 0.9295
Epoch 4/10
  Train Loss 0.0844 | Val Loss 0.1256 | Val F1 0.5809 | Val Acc 0.9537
  New model saved

Epoch 5/10
  Train Loss 0.0595 | Val Loss 0.1324 | Val F1 0.7081 | Val Acc 0.9569
  New model saved

Epoch 6/10
  Train Loss 0.0489 | Val Loss 0.1258 | Val F1 0.6710 | Val Acc 0.9533
Epoch 7/10
  Train Loss 0.0442 | Val Loss 0.1637 | Val F1 0.6769 | Val Acc 0.9615
Epoch 8/10
  Train Loss 0.0176 | Val Loss 0.1855 | Val F1 0.7039 | Val Acc 0.9588
Epoch 9/10
  Train Loss 0.0213 | Val Loss 0.3012 | Val F1 0.6295 | Val Acc 0.9574
Epoch 10/10
  Train Loss 0.0229 | Val Loss 0.2068 | Val F1 0.6622 | Val Acc 0.9537
Best model (highest Val F1) re-loaded.



#### Evaluate the final model

In [13]:
# Evaluate on the Test Set
test_loss, test_f1, test_acc, y_true, y_pred = evaluate_model(model, test_loader, criterion, device)

print("\nTest Metrics Summary")
print(f"  Test Loss: {test_loss:.4f}")
print(f"  Test F1-score: {test_f1:.4f}")
print(f"  Test Accuracy: {test_acc:.4f}")
print(f"  Sampling frequency: {freq}Hz")
print(f"  Device used: {'GPU' if device.type == 'cuda' else 'CPU'}")


# Switch model to evaluation mode for consistent results
model.eval()

# Label mapping for clarity
print("\nLabel mapping:")
print("0: Normal Sinus Rhythm (NORM)")
print("1: Atrial Fibrillation (AFIB)")

# Detailed test report
print("\nClassification Report:")
print(classification_report(y_true, y_pred, target_names=["Normal", "AFIB"]))

print("Confusion Matrix:")
cm = confusion_matrix(y_true, y_pred)
print(cm)

tn, fp, fn, tp = cm.ravel()
total_normal = cm[0].sum()
total_afib = cm[1].sum()

print(f"{tn} Normal ECGs correctly classified ({tn/total_normal*100:.1f}%)")
print(f"{fp} Normal ECGs wrongly predicted as AFIB ({fp/total_normal*100:.1f}%) [False Positives]")
print(f"{tp} AFIB ECGs correctly classified ({tp/total_afib*100:.1f}%)")
print(f"{fn} AFIB ECGs wrongly predicted as Normal ({fn/total_afib*100:.1f}%) [False Negatives]")

# Dataset Summary
print("\nDataset Sizes:")
print(f"Training records:   {len(train_data)}")
print(f"Validation records: {len(val_data)}")
print(f"Test records:       {len(test_data)}")

# Runtime Summary
end_time = time.time()
total_time = end_time - start_time
hours, rem = divmod(total_time, 3600)
minutes, seconds = divmod(rem, 60)
print(f"\nRuntime: {int(hours):02d}h {int(minutes):02d}m {int(seconds):02d}s")




Test Metrics Summary
  Test Loss: 0.2278
  Test F1-score: 0.6148
  Test Accuracy: 0.9527
  Sampling frequency: 100Hz
  Device used: CPU

Label mapping:
0: Normal Sinus Rhythm (NORM)
1: Atrial Fibrillation (AFIB)

Classification Report:
              precision    recall  f1-score   support

      Normal       0.97      0.98      0.97      2046
        AFIB       0.70      0.55      0.61       152

    accuracy                           0.95      2198
   macro avg       0.84      0.76      0.79      2198
weighted avg       0.95      0.95      0.95      2198

Confusion Matrix:
[[2011   35]
 [  69   83]]
2011 Normal ECGs correctly classified (98.3%)
35 Normal ECGs wrongly predicted as AFIB (1.7%) [False Positives]
83 AFIB ECGs correctly classified (54.6%)
69 AFIB ECGs wrongly predicted as Normal (45.4%) [False Negatives]

Dataset Sizes:
Training records:   4356
Validation records: 2183
Test records:       2198

Runtime: 00h 04m 58s
