# ECG data analysis

Using the PTB-XL data, this notebook:
1. Loads the data
2. Explores the metadata,
3. Visualizes the signal data in interactive figures
4. Summarized a theory and literature review.
5. Extracts wave peaks from the 1D signals and visualizes these
6. explores the useage of a codebase for multilabel diagnostic classification, to expand on in future instance.

**Note:** on the first run, the second call will make the runtime crash to downgrade fastai to an older version.

In [1]:
# Import torchvision
import torch
import torchvision
from torchvision import datasets
from torchvision import transforms
from torchvision.transforms import ToTensor

# Check versions
print(torch.__version__)
print(torchvision.__version__)

2.0.0+cu117
0.15.1+cu117


In [2]:
## In this cell, set 'show_all_figs = False'
## if you want to save RAM for model training:
show_all_figs = True

In [3]:
import torch
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
sns.set_style('darkgrid')

In [4]:
## Import the libraries we will use in our initial exploration:
from pathlib import Path
import os
import requests
import zipfile
# from google.colab import drive
import pandas as pd
import numpy as np
# import plotly.graph_objects as go
# import plotly.express as px
import wfdb
import ast

## Allow "hot-reloading" of modules
%load_ext autoreload
%autoreload 2

## 1. Loading the data

In [36]:
data_filepath = 'D:/Test Jupyter/ECG-Classfier-main/data/ecg_featurizer/'

Y_train = np.load(data_filepath + 'X_train.npy')
Z_train = np.load(data_filepath + 'y_train.npy', allow_pickle=True)
Y_test = np.load(data_filepath + 'X_test.npy')
Z_test = np.load(data_filepath + 'y_test.npy', allow_pickle=True)

In [6]:
print(Y_train.shape, Z_train.shape)
print(Y_test.shape,  Z_test.shape)

(17441, 26) (17441, 5)
(4396, 26) (4396, 5)


In [37]:
Y_train[0]

array([ 9.40000000e-01,  1.49071198e-02,  5.96700000e-01,  5.69632338e-02,
        9.44444444e-01,  5.47947911e-02,  6.74054292e-02,  3.12024046e-02,
        9.40000000e-01,  9.20144916e-02,  2.20656609e-02,  7.62759502e-02,
        9.37500000e-01,  1.39194109e-02, -4.70345236e-02,  1.70020277e-02,
        9.26250000e-01,  5.65547301e-02, -5.86236777e-02,  8.38237741e-03,
        1.17777800e+01,  4.26296000e+00,             nan,             nan,
        1.30000000e+01,  2.64575000e+00])

## Handle Nan values and standardize feature variables

In [38]:
from sklearn.impute import SimpleImputer

# Instantiate the imputer
imputer = SimpleImputer(strategy='mean')  # You can choose 'median' or 'most_frequent' as well

# Fit the imputer on your training data and transform it
Y_train = imputer.fit_transform(Y_train)

# Apply the same imputer to your test data
Y_test = imputer.transform(Y_test)

In [39]:
Y_train[0]

array([ 9.40000000e-01,  1.49071198e-02,  5.96700000e-01,  5.69632338e-02,
        9.44444444e-01,  5.47947911e-02,  6.74054292e-02,  3.12024046e-02,
        9.40000000e-01,  9.20144916e-02,  2.20656609e-02,  7.62759502e-02,
        9.37500000e-01,  1.39194109e-02, -4.70345236e-02,  1.70020277e-02,
        9.26250000e-01,  5.65547301e-02, -5.86236777e-02,  8.38237741e-03,
        1.17777800e+01,  4.26296000e+00,  3.48918572e+01,  1.37843482e+01,
        1.30000000e+01,  2.64575000e+00])

In [40]:
from sklearn.preprocessing import StandardScaler

# Initialize the StandardScaler
scaler = StandardScaler()

# Fit the scaler on the training data and transform both training and test data
Y_train = scaler.fit_transform(Y_train)
Y_test = scaler.transform(Y_test)


In [41]:
Y_train[0]

array([ 5.68295978e-01, -4.80387092e-01, -2.92598160e-01, -4.17777843e-01,
        5.82545553e-01, -1.51812181e-01,  1.07468846e+00, -1.71371722e-02,
        1.66127909e-01, -1.58598559e-01, -6.81270856e-01,  5.38051595e-01,
        3.61249090e-01, -4.76503536e-01,  3.93540206e-01, -4.01501546e-01,
        4.75191114e-01, -9.24219300e-02,  7.31744955e-01, -5.58001923e-01,
       -9.34994491e-02,  1.65922693e-01, -2.66692127e-13, -1.19894673e-13,
       -6.76955342e-01,  2.27724702e-01])

In [42]:
# Convert your data to PyTorch tensors
# Convert NumPy arrays to PyTorch tensors with the appropriate data types
Y_train = torch.FloatTensor(Y_train)
# Converting DataFrame to a NumPy array and then to Pytorch tensor
# Z_train = Z_train.to_numpy()  # Convert Z_train DataFrame to a NumPy array
# Convert the NumPy array to a PyTorch tensor
Z_train = torch.FloatTensor(Z_train)

# Convert NumPy arrays to PyTorch tensors with the appropriate data types
Y_test = torch.FloatTensor(Y_test)
# Converting DataFrame to a NumPy array and then to Pytorch tensor
# Z_test = Z_test.to_numpy()  # Convert Z_test DataFrame to a NumPy array
# Convert the NumPy array to a PyTorch tensor
Z_test = torch.FloatTensor(Z_test)


In [43]:
Y_train.shape

torch.Size([17441, 26])

In [44]:
Y_test.shape

torch.Size([4396, 26])

In [45]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

## 2. Create Dataloaders

In [46]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

# Define a custom dataset class
class CustomDataset(Dataset):
    def __init__(self, Y, Z):
        self.Y = torch.Tensor(Y)  # Input data
        self.Z = torch.Tensor(Z)  # Multilabel class labels

    def __len__(self):
        return len(self.Y)

    def __getitem__(self, idx):
        return self.Y[idx], self.Z[idx]

# Create custom datasets and dataloaders
train_dataset = CustomDataset(Y_train, Z_train)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

test_dataset = CustomDataset(Y_test, Z_test)
test_loader = DataLoader(test_dataset, batch_size=32)

In [47]:
for train_images, train_labels in train_loader:
    sample_image = train_images[0]    # Reshape them according to your needs.
    sample_label = train_labels[0]

# sample_image
sample_label

tensor([0., 1., 1., 0., 0.])

In [48]:
BATCH_SIZE = 32
# Let's check out what we have created
print(f"DataLoaders: {train_loader, test_loader}")
print(f"Length of the train_loader: {len(train_loader)} batches of {BATCH_SIZE}...")
print(f"Length of the test_loader: {len(test_loader)} batches of {BATCH_SIZE}...")

DataLoaders: (<torch.utils.data.dataloader.DataLoader object at 0x00000249D470AB60>, <torch.utils.data.dataloader.DataLoader object at 0x00000249D4709330>)
Length of the train_loader: 546 batches of 32...
Length of the test_loader: 138 batches of 32...


In [49]:
# Check out what's inside training data loader
train_features_batch, train_labels_batch = next(iter(train_loader))
train_features_batch.shape, train_labels_batch.shape
# test_features_batch, test_labels_batch = next(iter(test_loader))


(torch.Size([32, 26]), torch.Size([32, 5]))

## 3. Start creating models

In [50]:
import torch
import torch.nn as nn

# Define the neural network model
class MultilabelClassifier(nn.Module):
    def __init__(self, input_size, output_size):
        super(MultilabelClassifier, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_size, 128),  # Input layer
            nn.ReLU(),
            nn.Linear(128, 64),  # Hidden layer
            nn.ReLU(),
            nn.Linear(64, output_size),  # Output layer
            nn.Sigmoid()  # Sigmoid activation for multilabel classification
        )

    def forward(self, x):
        return self.fc(x)

# Instantiate the model
input_size = 26  # Number of input features
output_size = 5  # Number of output classes (labels)
model = MultilabelClassifier(input_size, output_size).to(device)


RuntimeError: CUDA error: device-side assert triggered
CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1.
Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.


In [51]:
model


MultilabelClassifier(
  (fc): Sequential(
    (0): Linear(in_features=26, out_features=128, bias=True)
    (1): ReLU()
    (2): Linear(in_features=128, out_features=64, bias=True)
    (3): ReLU()
    (4): Linear(in_features=64, out_features=5, bias=True)
    (5): Sigmoid()
  )
)

In [23]:
# Pass a random image through model
# rand_image_tensor = torch.randn(size=(1,12,500))
rand_image_tensor = torch.randn(size=(1,26))
rand_image_tensor.shape

torch.Size([1, 26])

In [24]:
#Pass image through model
model(rand_image_tensor.unsqueeze(0).to(device))

tensor([[[0.5297, 0.4630, 0.4784, 0.5201, 0.4288]]], device='cuda:0',
       grad_fn=<SigmoidBackward0>)

In [25]:
from timeit import default_timer as timer
def print_train_time(start:float,
                     end:float,
                     device:torch.device = None):
  """Prints differnce between start and end time"""
  total_time = end - start
  print(f"Train time on {device}: {total_time: .3f} seconds")
  return total_time

## 4. Define train and test steps

In [26]:
def train_step(model, data_loader, loss_fn, optimizer, device):
    model.train()
    train_loss = 0.0
    correct_predictions = 0
    total_samples = 0

    for inputs, labels in data_loader:
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = loss_fn(outputs, labels)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        correct_predictions += (predicted == labels.argmax(dim=1)).sum().item()
        total_samples += labels.size(0)

    train_accuracy = (correct_predictions / total_samples) * 100
    train_loss /= len(data_loader)

    # print(f"Train Loss: {train_loss:.4f} | Train Accuracy: {train_accuracy:.2f}%")

    return train_loss, train_accuracy


In [27]:
def test_step(model: torch.nn.Module,
              data_loader: torch.utils.data.DataLoader,
              loss_fn: torch.nn.Module,
              device: torch.device = device):
    """Performs a testing loop step on model going over data_loader"""
    test_loss, correct_predictions, total_samples = 0, 0, 0

    # Put the model in eval mode
    model.eval()

    # Turn off gradients and inference mode context manager
    with torch.no_grad():
        for X, y in data_loader:
            # Send the data to the target device
            X, y = X.to(device), y.to(device)

            # 1. Forward pass
            test_pred = model(X)

            # 2. Calculate loss (accumulatively)
            test_loss += loss_fn(test_pred, y).item()

            # 3. Calculate accuracy (accumulatively)
            _, predicted = torch.max(test_pred, 1)
            correct_predictions += (predicted == y.argmax(dim=1)).sum().item()
            # correct_predictions += (predicted == y).sum().item()
            total_samples += y.size(0)

        # Calculate test loss and accuracy
        test_loss /= len(data_loader)
        test_accuracy = (correct_predictions / total_samples) * 100

        # print(f"Test Loss: {test_loss:.5f} | Test Accuracy: {test_accuracy:.2f}%\n")

        return test_loss, test_accuracy


In [28]:
def evaluate_model(model, data_loader, loss_fn, device, threshold=0.5):
    model.eval()
    test_loss = 0.0
    # correct_predictions = 0
    correct_samples = 0
    total_samples = 0
    all_true_labels = []
    all_pred_scores = []

    with torch.no_grad():
        for inputs, labels in data_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            outputs = model(inputs)
            loss = loss_fn(outputs, labels)
            test_loss += loss.item()

            predicted_labels = (outputs >= threshold).float()  # Apply threshold (e.g., 0.5)

            # Calculate correct predictions for the batch
            # batch_correct_predictions = (predicted_labels == labels).sum().item()
            batch_correct_samples = (predicted_labels == labels).all(dim=1).sum().item()


            # Count the total number of labels in the batch
            batch_total_samples = labels.sum().item()

            # correct_predictions += batch_correct_predictions
            correct_samples += batch_correct_samples
            # total_samples += inputs.size(0)
            total_samples += batch_total_samples


            all_true_labels.extend(labels.cpu().numpy())
            all_pred_scores.extend(predicted_labels.cpu().numpy())

        test_loss /= len(data_loader)
        # test_accuracy = (correct_predictions / total_samples) * 100
        test_accuracy = (correct_samples / total_samples) * 100

    return test_loss, test_accuracy, all_true_labels, all_pred_scores

In [29]:
def evaluate_model2(model, data_loader, loss_fn, device, threshold=0.5):
    model.eval()
    test_loss = 0.0
    correct_samples = 0
    total_samples = 0
    all_true_labels = []
    all_pred_scores = []

    with torch.no_grad():
        for inputs, labels in data_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            outputs = model(inputs)
            loss = loss_fn(outputs, labels)
            test_loss += loss.item()

            predicted_labels = (outputs >= threshold).float()  # Apply threshold (e.g., 0.5)

            # Check if all labels for each sample are predicted correctly
            batch_correct_samples = (predicted_labels == labels).all(dim=1).sum().item()

            correct_samples += batch_correct_samples
            total_samples += inputs.size(0)

            all_true_labels.extend(labels.cpu().numpy())
            all_pred_scores.extend(predicted_labels.cpu().numpy())

        test_loss /= len(data_loader)
        test_accuracy = (correct_samples / total_samples) * 100

    return test_loss, test_accuracy, all_true_labels, all_pred_scores

## 5. Call model and print evaluation metrics

In [30]:
# loss_fn = nn.BCEWithLogitsLoss()
loss_fn = nn.BCELoss()
optimizer = torch.optim.Adam(params=model.parameters(),
                            lr = 0.001)#optimize all bias and weight, i.e. everything in model_2.state_dict()

In [35]:


import matplotlib.pyplot as plt
from sklearn.metrics import precision_recall_curve, average_precision_score, multilabel_confusion_matrix, classification_report, precision_recall_fscore_support
import numpy as np
# Import tqdm for progress bar
from tqdm.auto import tqdm

class_names = ['NORM', 'MI', 'STTC', 'CD', 'HYP']

def plot_metrics(train_losses, test_losses, train_accuracies, test_accuracies):
    plt.figure(figsize=(12, 4))

    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label='Train Loss', marker='o')
    plt.plot(test_losses, label='Test Loss', marker='o')
    plt.title('Loss Over Epochs')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(train_accuracies, label='Train Accuracy', marker='o')
    plt.plot(test_accuracies, label='Test Accuracy', marker='o')
    plt.title('Accuracy Over Epochs')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy (%)')
    plt.legend()

    plt.tight_layout()
    plt.show()

# Initialize empty lists to store training and testing metrics
train_losses = []
test_losses = []
train_accuracies = []
test_accuracies = []
all_true_labels = []
all_pred_scores = []

# Training and testing loop
epochs = 25
for epoch in tqdm(range(epochs)):
    print(f"Epoch:{epoch}\n-----------")
    train_loss, train_accuracy = train_step(model=model,
                                            data_loader=train_loader,
                                            loss_fn=loss_fn,
                                            optimizer=optimizer,
                                            device=device)
    train_losses.append(train_loss)
    train_accuracies.append(train_accuracy)
    print(f"Train Loss: {train_loss:.4f} | Train Accuracy: {train_accuracy:.2f}%")

    test_loss, test_accuracy, true_labels, pred_scores = evaluate_model2(model=model,
                                                                       data_loader=test_loader,
                                                                       loss_fn=loss_fn,
                                                                       device=device)
    test_losses.append(test_loss)
    test_accuracies.append(test_accuracy)
    print(f"Test Loss: {test_loss:.4f} | Test Accuracy: {test_accuracy:.2f}%")

    all_true_labels.extend(true_labels)
    all_pred_scores.extend(pred_scores)

# Plot training and testing metrics
plot_metrics(train_losses, test_losses, train_accuracies, test_accuracies)

# Convert predicted scores to binary predictions based on a threshold (e.g., 0.5)
threshold = 0.5
all_pred_labels = (np.array(all_pred_scores) >= threshold).astype(int)

# Calculate and print precision, recall, F1-score, confusion matrix
precision, recall, f1_score, _ = precision_recall_fscore_support(all_true_labels, all_pred_labels, average='weighted')

print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1_score:.4f}")

# Print classification report
class_report = classification_report(all_true_labels, all_pred_labels, target_names=class_names)
print("Classification Report:")
print(class_report)


  0%|          | 0/25 [00:00<?, ?it/s]

Epoch:0
-----------


RuntimeError: CUDA error: device-side assert triggered
CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1.
Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.
