In [1]:
%pip install wfdb

Note: you may need to restart the kernel to use updated packages.


In [3]:
import os
import numpy as np
import wfdb
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.callbacks import EarlyStopping
from sklearn.model_selection import train_test_split

# ---------------------------
# Data Loading & Preprocessing
# ---------------------------
def load_ecg_data(base_dir, fixed_length=1000):
    X, y = [], []
    
    # Iterate through each patient folder
    for patient in os.listdir(base_dir):
        patient_path = os.path.join(base_dir, patient)
        if not os.path.isdir(patient_path):
            continue
        
        # Process each .dat file in the patient folder
        for file in os.listdir(patient_path):
            if file.endswith('.dat'):
                file_prefix = file.split('.')[0]
                file_path = os.path.join(patient_path, file_prefix)
                
                # Read ECG signal using WFDB
                try:
                    signals, fields = wfdb.rdsamp(file_path)
                except:
                    print(f"Skipping corrupt/invalid file: {file_path}")
                    continue
                
                # Use the first lead (modify if multi-lead needed)
                ecg_signal = signals[:, 0]
                
                # Normalize signal to [0, 1]
                ecg_normalized = (ecg_signal - np.min(ecg_signal)) / (np.max(ecg_signal) - np.min(ecg_signal))
                
                # Pad/Truncate to fixed length
                if len(ecg_normalized) > fixed_length:
                    ecg_processed = ecg_normalized[:fixed_length]
                else:
                    ecg_processed = np.pad(ecg_normalized, (0, fixed_length - len(ecg_normalized)), mode='constant')
                
                X.append(ecg_processed)
                y.append(1 if 'lre' in file else 0)  # Label based on filename
    
    # Convert to numpy arrays and reshape for LSTM
    X = np.array(X).reshape(-1, fixed_length, 1)
    y = np.array(y)
    return X, y

# Load data
BASE_DIR = 'D:\Machine-Learning\ptb-diagnostic-ecg-database-1.0.0'  # Update this path
X_ecg, y_ecg = load_ecg_data(BASE_DIR)

# Split data into train/test sets
X_train, X_test, y_train, y_test = train_test_split(
    X_ecg, y_ecg, test_size=0.2, random_state=42
)

# ---------------------------
# LSTM Model Architecture
# ---------------------------
model = Sequential([
    LSTM(64, return_sequences=True, input_shape=(X_train.shape[1], X_train.shape[2])),
    Dropout(0.5),
    LSTM(32),
    Dropout(0.5),
    Dense(1, activation='sigmoid')
])

model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
model.summary()

# ---------------------------
# Training & Evaluation
# ---------------------------
early_stop = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

history = model.fit(
    X_train, y_train,
    validation_split=0.2,
    epochs=50,
    batch_size=32,
    callbacks=[early_stop]
)

loss, accuracy = model.evaluate(X_test, y_test)
print(f"Test Accuracy: {accuracy * 100:.2f}%")

  super().__init__(**kwargs)


Epoch 1/50
[1m11/11[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 654ms/step - accuracy: 0.6081 - loss: 0.6828 - val_accuracy: 0.6364 - val_loss: 0.6591
Epoch 2/50
[1m11/11[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 555ms/step - accuracy: 0.6409 - loss: 0.6666 - val_accuracy: 0.6364 - val_loss: 0.6561
Epoch 3/50
[1m11/11[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 553ms/step - accuracy: 0.6656 - loss: 0.6440 - val_accuracy: 0.6364 - val_loss: 0.6591
Epoch 4/50
[1m11/11[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 544ms/step - accuracy: 0.6649 - loss: 0.6483 - val_accuracy: 0.6364 - val_loss: 0.6555
Epoch 5/50
[1m11/11[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 558ms/step - accuracy: 0.6769 - loss: 0.6445 - val_accuracy: 0.6364 - val_loss: 0.6578
Epoch 6/50
[1m11/11[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 547ms/step - accuracy: 0.6774 - loss: 0.6382 - val_accuracy: 0.6364 - val_loss: 0.6587
Epoch 7/50
[1m11/11[0m [

LSTM - Torch

In [8]:
# %% [1] Install Required Libraries
%pip install wfdb torch torchvision torchaudio


Collecting torch
  Using cached torch-2.6.0-cp310-cp310-win_amd64.whl.metadata (28 kB)
Collecting torchvision
  Using cached torchvision-0.21.0-cp310-cp310-win_amd64.whl.metadata (6.3 kB)
Collecting torchaudio
  Using cached torchaudio-2.6.0-cp310-cp310-win_amd64.whl.metadata (6.7 kB)
Downloading torch-2.6.0-cp310-cp310-win_amd64.whl (204.2 MB)
   ---------------------------------------- 0.0/204.2 MB ? eta -:--:--
   ---------------------------------------- 0.0/204.2 MB ? eta -:--:--
   ---------------------------------------- 1.0/204.2 MB 4.6 MB/s eta 0:00:45
   ---------------------------------------- 2.1/204.2 MB 5.3 MB/s eta 0:00:38
    --------------------------------------- 3.1/204.2 MB 4.9 MB/s eta 0:00:42
    --------------------------------------- 3.9/204.2 MB 4.6 MB/s eta 0:00:44
    --------------------------------------- 4.7/204.2 MB 4.7 MB/s eta 0:00:43
   - -------------------------------------- 5.8/204.2 MB 4.6 MB/s eta 0:00:44
   - -------------------------------------

In [None]:


# %% [2] Imports
import os
import numpy as np
import pandas as pd
import wfdb
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report

# %% [3] Configuration
config = {
    "data_dir": "D:\Machine-Learning\ptb-diagnostic-ecg-database-1.0.0",
    "batch_size": 32,
    "max_seq_length": 5000,  # 5 seconds at 1000 Hz (downsampled to 250 Hz)
    "hidden_size": 128,
    "num_layers": 2,
    "learning_rate": 0.001,
    "num_epochs": 15,
    "num_classes": 15  # Will be updated automatically
}

# %% [4] Data Loading and Preprocessing
def load_ecg_data(data_dir):
    records = [f.split('.')[0] for f in os.listdir(data_dir) if f.endswith('.hea')]
    signals = []
    labels = []
    
    for record in records:
        # Load signal
        signal, _ = wfdb.rdsamp(os.path.join(data_dir, record))
        # Load header information
        header = wfdb.rdheader(os.path.join(data_dir, record))
        
        # Preprocess signal
        signal = signal[:, :15]  # Select first 15 channels
        signal = signal[::4, :]  # Downsample to 250 Hz
        signals.append(signal)
        
        # Extract label from comments
        diagnosis = next((c.split(': ')[1] for c in header.comments if c.startswith('# Diagnosis')), 'Unknown')
        labels.append(diagnosis)
    
    return signals, labels

# Load raw data
signals, labels = load_ecg_data(config["data_dir"])

# %% [5] Label Encoding
le = LabelEncoder()
encoded_labels = le.fit_transform(labels)
config["num_classes"] = len(le.classes_)
print(f"Class mapping: {dict(zip(le.classes_, le.transform(le.classes_)))}")

# %% [6] Dataset Class
class ECGDataset(Dataset):
    def __init__(self, signals, labels, max_len):
        self.signals = signals
        self.labels = labels
        self.max_len = max_len
        
    def __len__(self):
        return len(self.signals)
    
    def __getitem__(self, idx):
        signal = self.signals[idx]
        label = self.labels[idx]
        
        # Truncate or pad sequence
        if signal.shape[0] > self.max_len:
            signal = signal[:self.max_len, :]
        else:
            pad_len = self.max_len - signal.shape[0]
            signal = np.pad(signal, ((0, pad_len), (0, 0)), 'constant')
            
        return torch.FloatTensor(signal), torch.LongTensor([label])

# %% [7] Data Splitting and Loaders
X_train, X_test, y_train, y_test = train_test_split(
    signals, encoded_labels, 
    test_size=0.2, 
    stratify=encoded_labels,
    random_state=42
)

train_dataset = ECGDataset(X_train, y_train, config["max_seq_length"])
test_dataset = ECGDataset(X_test, y_test, config["max_seq_length"])

train_loader = DataLoader(
    train_dataset,
    batch_size=config["batch_size"],
    shuffle=True,
    num_workers=2
)

test_loader = DataLoader(
    test_dataset,
    batch_size=config["batch_size"],
    shuffle=False,
    num_workers=2
)

# %% [8] LSTM Model Architecture
class ECG_LSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super().__init__()
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            bidirectional=True
        )
        self.classifier = nn.Sequential(
            nn.Linear(hidden_size*2, 64),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(64, num_classes)
        )
        
    def forward(self, x):
        out, _ = self.lstm(x)  # (batch_size, seq_len, hidden_size*2)
        out = out[:, -1, :]     # Take last timestep output
        return self.classifier(out)

# %% [9] Initialize Model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = ECG_LSTM(
    input_size=15,
    hidden_size=config["hidden_size"],
    num_layers=config["num_layers"],
    num_classes=config["num_classes"]
).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=config["learning_rate"])

# %% [10] Training Loop
def train_model(model, dataloader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    
    for inputs, labels in dataloader:
        inputs = inputs.to(device)
        labels = labels.squeeze().to(device)
        
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item() * inputs.size(0)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        
    return running_loss/total, correct/total

# %% [11] Evaluation Loop
def evaluate_model(model, dataloader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs = inputs.to(device)
            labels = labels.squeeze().to(device)
            
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            
            running_loss += loss.item() * inputs.size(0)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            
    return running_loss/total, correct/total, all_preds, all_labels

# %% [12] Training Execution
best_acc = 0.0
for epoch in range(config["num_epochs"]):
    train_loss, train_acc = train_model(model, train_loader, criterion, optimizer, device)
    test_loss, test_acc, _, _ = evaluate_model(model, test_loader, criterion, device)
    
    print(f"Epoch {epoch+1}/{config['num_epochs']}")
    print(f"Train Loss: {train_loss:.4f} | Acc: {train_acc:.4f}")
    print(f"Test Loss: {test_loss:.4f} | Acc: {test_acc:.4f}")
    print("-" * 50)
    
    if test_acc > best_acc:
        best_acc = test_acc
        torch.save(model.state_dict(), "best_model.pth")

# %% [13] Final Evaluation
model.load_state_dict(torch.load("best_model.pth"))
_, test_acc, preds, labels = evaluate_model(model, test_loader, criterion, device)
print(classification_report(labels, preds, target_names=le.classes_))

Collecting torch
  Downloading torch-2.6.0-cp310-cp310-win_amd64.whl.metadata (28 kB)
Collecting torchvision
  Downloading torchvision-0.21.0-cp310-cp310-win_amd64.whl.metadata (6.3 kB)
Collecting torchaudio
  Downloading torchaudio-2.6.0-cp310-cp310-win_amd64.whl.metadata (6.7 kB)
Collecting sklearn
  Downloading sklearn-0.0.post12.tar.gz (2.6 kB)
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'error'
Note: you may need to restart the kernel to use updated packages.


  error: subprocess-exited-with-error
  
  × python setup.py egg_info did not run successfully.
  │ exit code: 1
  ╰─> [15 lines of output]
      The 'sklearn' PyPI package is deprecated, use 'scikit-learn'
      rather than 'sklearn' for pip commands.
      
      Here is how to fix this error in the main use cases:
      - use 'pip install scikit-learn' rather than 'pip install sklearn'
      - replace 'sklearn' by 'scikit-learn' in your pip requirements files
        (requirements.txt, setup.py, setup.cfg, Pipfile, etc ...)
      - if the 'sklearn' package is used by one of your dependencies,
        it would be great if you take some time to track which package uses
        'sklearn' instead of 'scikit-learn' and report it to their issue tracker
      - as a last resort, set the environment variable
        SKLEARN_ALLOW_DEPRECATED_SKLEARN_PACKAGE_INSTALL=True to avoid this error
      
      More information is available at
      https://github.com/scikit-learn/sklearn-pypi-packag

ModuleNotFoundError: No module named 'torch'