In [5]:
!pip install wfdb




[notice] A new release of pip is available: 24.0 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [10]:
import os
import wfdb
import pandas as pd
import numpy as np
import ast
import requests

# Define data directory relative to the current working directory
data_dir = './data'
records_500hz_dir_relative = 'records500'
records_500hz_dir_full = os.path.join(data_dir, records_500hz_dir_relative)

# Get a list of all record names (without extensions) in the records500 directory
# Assuming record names are in the format 'XXXXX_hr'
record_names = [f.split('.')[0] for f in os.listdir(records_500hz_dir_full) if f.endswith('.hea')]
record_names = sorted(list(set(record_names))) # Remove duplicates and sort

# Function to load metadata from a .hea file
def load_metadata_from_header(record_name):
    try:
        header = wfdb.rdheader(record_name, pn_dir=records_500hz_dir_full)
        # Extract relevant information from the header
        # This will depend on what information you need (e.g., sampling frequency, number of leads)
        # For 'NORM' and 'MI' classification, we need the diagnostic codes.
        # The diagnostic codes are usually in the comments of the header file.
        # We'll need to parse the comments to find the 'scp_codes'.
        scp_codes = {}
        for comment in header.comments:
            if comment.startswith('SCP'):
                 # Assuming the format is 'SCP codes: {<code>: <value>}'
                 try:
                     scp_codes_str = comment.split('SCP codes: ')[1]
                     scp_codes = ast.literal_eval(scp_codes_str)
                 except:
                    pass # Handle potential parsing errors

        return {'record_name': record_name, 'scp_codes': scp_codes, 'sampling_frequency': header.fs, 'num_leads': header.n_sig}
    except Exception as e:
        print(f"Error loading header for {record_name}: {e}")
        return None

# Load metadata for all records
metadata_list = [load_metadata_from_header(record_name) for record_name in record_names]

# Convert the list of dictionaries to a pandas DataFrame
Y = pd.DataFrame([m for m in metadata_list if m is not None])

# Filter for 'NORM' and 'MI' classes
relevant_scp_codes = ['NORM', 'MI']
def contains_relevant_code(scp_codes_dict):
    if isinstance(scp_codes_dict, dict):
        for code in scp_codes_dict.keys():
            if code in relevant_scp_codes:
                return True
    return False

Y_filtered = Y[Y["scp_codes"].apply(contains_relevant_code)].copy()


# Display the first few rows of the filtered metadata
display(Y_filtered.head())

KeyError: 'scp_codes'

### Subtask:
Evaluate the trained model's performance on a separate validation set using appropriate metrics.

**Reasoning**:
Evaluating the model on a validation set provides an estimate of its performance on unseen data and helps to assess how well it generalizes. We will use metrics like accuracy and potentially others relevant to classification tasks.

In [None]:
import torch

# Set the model to evaluation mode
model.eval()

# Initialize variables to track correct predictions and total predictions
correct_predictions = 0
total_predictions = 0

# Disable gradient calculation for evaluation
with torch.no_grad():
    for inputs, labels in val_dataloader:
        # Move data to the appropriate device
        inputs, labels = inputs.to(device), labels.to(device)

        # Forward pass
        outputs = model(inputs)

        # Get the predicted class
        _, predicted = torch.max(outputs.data, 1)

        # Update counts
        total_predictions += labels.size(0)
        correct_predictions += (predicted == labels).sum().item()

# Calculate accuracy
accuracy = correct_predictions / total_predictions

print(f"Accuracy on the validation set: {accuracy:.4f}")

# Note: This evaluation is based on the training that was performed on dummy data
# and was potentially interrupted. The results will not reflect the model's
# performance on the actual ECG dataset.

Accuracy on the validation set: 1.0000


### Subtask:
Implement the training loop for the 1D CNN model using PyTorch.

**Reasoning**:
The training loop is where the model learns from the data. It involves iterating over the training data in batches, calculating the loss, computing gradients, and updating the model's weights using an optimizer.

In [None]:
import torch.optim as optim
import torch.nn as nn

# Instantiate the model, loss function, and optimizer
model = ECGNet(num_classes=2) # Make sure ECGNet class is defined in a previous cell
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001) # You can adjust the learning rate

# Set up device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Training loop
num_epochs = 10 # You can adjust the number of epochs
for epoch in range(num_epochs):
    model.train() # Set the model to training mode
    running_loss = 0.0
    for i, (inputs, labels) in enumerate(train_dataloader):
        # Move data to the appropriate device
        inputs, labels = inputs.to(device), labels.to(device)

        # Zero the parameter gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)

        # Backward pass and optimize
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    # Print epoch statistics
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(train_dataloader):.4f}")

print("Finished Training")

KeyboardInterrupt: 

### Subtask:
Define the architecture of the 1D CNN model using PyTorch.

**Reasoning**:
A 1D CNN is suitable for processing sequential data like ECG signals. The model will consist of convolutional layers to extract features, pooling layers to reduce dimensionality, and fully connected layers for classification.

In [None]:
import torch.nn as nn
import torch.nn.functional as F
import torch # Import torch to use transpose

class ECGNet(nn.Module):
    def __init__(self, num_classes=2):
        super(ECGNet, self).__init__()
        # Define the convolutional layers
        self.conv1 = nn.Conv1d(in_channels=12, out_channels=32, kernel_size=5, stride=1, padding=2)
        self.pool1 = nn.MaxPool1d(kernel_size=2, stride=2)
        self.conv2 = nn.Conv1d(in_channels=32, out_channels=64, kernel_size=5, stride=1, padding=2)
        self.pool2 = nn.MaxPool1d(kernel_size=2, stride=2)
        self.conv3 = nn.Conv1d(in_channels=64, out_channels=128, kernel_size=5, stride=1, padding=2)
        self.pool3 = nn.MaxPool1d(kernel_size=2, stride=2)

        # Define fully connected layers
        # Calculate the output size of the convolutional layers to determine the input size of the first fully connected layer
        # This requires knowing the input signal length and how the pooling layers affect it.
        # For a signal length of 5000 and kernel_size=2, stride=2 for pooling:
        # After pool1: (5000 - 2)/2 + 1 = 2499
        # After pool2: (2499 - 2)/2 + 1 = 1249 (integer division might vary) -> let's double check or use a dummy forward pass
        # After pool3: (1249 - 2)/2 + 1 = 624 (integer division might vary) -> let's double check or use a dummy forward pass

        # A safer way is to calculate it dynamically or pass a dummy tensor through.
        # Assuming input shape (batch_size, num_leads, signal_length) -> (batch_size, 12, 5000)
        def _get_conv_output_size(length):
            size = (length + 2 * 2 - 5) // 1 + 1 # conv1
            size = (size - 2) // 2 + 1 # pool1
            size = (size + 2 * 2 - 5) // 1 + 1 # conv2
            size = (size - 2) // 2 + 1 # pool2
            size = (size + 2 * 2 - 5) // 1 + 1 # conv3
            size = (size - 2) // 2 + 1 # pool3
            return size

        fc_input_size = _get_conv_output_size(5000) * 128 # multiply by the number of output channels from the last conv layer

        self.fc1 = nn.Linear(fc_input_size, 256)
        self.fc2 = nn.Linear(256, num_classes)


    def forward(self, x):
        # Input shape: (batch_size, signal_length, num_leads) from DataLoader
        # Transpose to (batch_size, num_leads, signal_length) for Conv1d
        x = x.transpose(1, 2)

        x = F.relu(self.conv1(x))
        x = self.pool1(x)
        x = F.relu(self.conv2(x))
        x = self.pool2(x)
        x = F.relu(self.conv3(x))
        x = self.pool3(x)

        # Flatten the output for the fully connected layers
        x = x.view(x.size(0), -1)

        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

# Example of creating the model
# model = ECGNet(num_classes=2)
# print(model)

### Subtask:
Prepare the filtered ECG data and labels for use with PyTorch, creating custom `Dataset` and `DataLoader` classes.

**Reasoning**:
To train a PyTorch model, the data needs to be organized into `Dataset` and `DataLoader` objects. The `Dataset` will handle loading individual samples (ECG signals and their corresponding labels), and the `DataLoader` will provide batches of data for training and validation, handling shuffling and parallel loading.

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
import numpy as np # Import numpy for creating dummy data

class ECGDataset(Dataset):
    def __init__(self, data, labels):
        self.data = data
        self.labels = labels

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        # Assuming data is a list/array of signals and labels is a pandas Series/numpy array
        signal = self.data.iloc[idx] # Adjust indexing based on your data structure
        label = self.labels.iloc[idx] # Adjust indexing based on your data structure

        # Convert numpy arrays to PyTorch tensors
        signal = torch.tensor(signal, dtype=torch.float32)
        label = torch.tensor(label, dtype=torch.long) # Assuming labels are integers for classification

        return signal, label

# Assuming Y_filtered is your pandas DataFrame with 'scp_codes' column
# And you have already filtered for 'NORM' and 'MI'

# Map 'NORM' and 'MI' to numerical labels
label_mapping = {'NORM': 0, 'MI': 1}
Y_filtered['numeric_label'] = Y_filtered['scp_codes'].apply(lambda x: label_mapping.get(list(x.keys())[0], -1))

# Remove rows with no relevant label (if any were missed)
Y_filtered = Y_filtered[Y_filtered['numeric_label'] != -1].copy()

# --- Temporary signal data for testing ---
# Replace this section with the actual signal data loading,
# when you download the full dataset.
# Create dummy signal data: a list of numpy arrays.
# The signal size (e.g., 5000 points) and number of leads (e.g., 12)
# should match the real data.
dummy_signal_length = 5000 # Assuming a signal length of 500 Hz
dummy_num_leads = 12     # Assuming 12 leads
Y_filtered['signal'] = [np.random.randn(dummy_signal_length, dummy_num_leads) for _ in range(len(Y_filtered))]
# --- End of temporary signal data section ---


# Separate signals and labels
# Now 'signal' column exists with dummy data
signals = Y_filtered['signal']
labels = Y_filtered['numeric_label']

# Split data into training and validation sets (simple split for now)
# In a real project, you'd use train_test_split from sklearn
train_size = int(0.8 * len(Y_filtered))
val_size = len(Y_filtered) - train_size

train_signals = signals[:train_size]
train_labels = labels[:train_size]
val_signals = signals[train_size:]
val_labels = labels[train_size:]


# Create Dataset and DataLoader instances
train_dataset = ECGDataset(train_signals, train_labels)
val_dataset = ECGDataset(val_signals, val_labels)

batch_size = 32 # You can adjust this
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

print(f"Number of training samples: {len(train_dataset)}")
print(f"Number of validation samples: {len(val_dataset)}")
print(f"Number of training batches: {len(train_dataloader)}")
print(f"Number of validation batches: {len(val_dataloader)}")

# Example of accessing a batch
# train_features, train_labels = next(iter(train_dataloader))
# print(f"Feature batch shape: {train_features.size()}")
# print(f"Labels batch shape: {train_labels.size()}")

Number of training samples: 7611
Number of validation samples: 1903
Number of training batches: 238
Number of validation batches: 60


Now that the data is loaded and filtered, we need to preprocess the ECG signals. This involves:

- Extracting the signal data for the filtered records.
- Resampling the signals to a consistent length if necessary (although for this task at 500Hz, most signals might already be at a consistent length or close enough for a 1D CNN).
- Normalizing the signal data.
- Splitting the data into training and validation sets.

In [None]:
# Extract signal data for the filtered records
# Assuming the signal files are organized by record_path in the metadata
def load_signal(record_path):
    # Correct the file path to point to the 500Hz records
    file_path = os.path.join(data_dir, record_path).replace('records100', 'records500')
    try:
        # rdsamp returns a tuple: (signals, metadata)
        signals, meta = wfdb.rdsamp(file_path)
        return signals
    except Exception as e:
        print(f"Error loading signal {file_path}: {e}")
        return None

# Apply the loading function to the filtered metadata
Y_filtered['signal'] = Y_filtered['record_path'].apply(load_signal)

# Remove rows where signal loading failed
Y_filtered.dropna(subset=['signal'], inplace=True)

# Further preprocessing steps (normalization, consistent length, train/test split) would go here
# For now, let's just confirm the data structure
print(f"Number of filtered records with loaded signals: {len(Y_filtered)}")
print(f"Example signal shape: {Y_filtered['signal'].iloc[0].shape if len(Y_filtered) > 0 else 'No signals loaded'}")

NameError: name 'Y_filtered' is not defined

# Task
Build and train a 1D CNN using PyTorch to classify ECG signals from the PTB-XL dataset into 'Normal ECG' and 'Myocardial Infarction' classes, using a sampling rate of 500Hz.

## Load and preprocess data

### Subtask:
Load the PTB-XL dataset, focusing on the specified classes ('NORM' and 'MI'). Preprocess the ECG signals to a consistent format suitable for the CNN model.


**Reasoning**:
Load the metadata and signal data from the specified file paths and filter the metadata to include only records with diagnostic classes 'NORM' and 'MI'.

