In [1]:
import torch
import os
import pandas as pd
import matplotlib.pyplot as plt
import wfdb
import torch.nn as nn 
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
import numpy as np

In [2]:


def extract_label(header):
    snomed_code_map = {
        '426177001': 'SB',
        '426783006': 'SR',
        '164889003': 'AFIB',
        '427084000': 'ST',
        '164890007': 'AF',
        '427393009': 'SA',
        '426761007': 'SVT',
        '713422000': 'AT',
        '233896004': 'AVNRT',
        '233897008': 'AVRT'
    }
    
    for line in header:
        if line.startswith('#Dx:'):
            codes = line.strip().split(':')[1].strip().split(',')
            for code in codes:
                if code in snomed_code_map:
                    return snomed_code_map[code]
    return "Unknown"
def load_ecg_sample_by_filename(data_dir, file_name):
    file_path = os.path.join(data_dir, file_name)

    #print(f"Checking file: {file_path}.mat")  # Add this line for debugging

    if not os.path.exists(file_path + '.mat'):
        raise ValueError(f"No data found for the specified filename: {file_name}")


    # Read ECG signal from the .mat file
    record = wfdb.rdrecord(file_path)
    signal = record.p_signal[:, 0]

    # Read the header file
    header_content = read_header_file(file_path + '.hea')

    # Extract label from the header file
    label = extract_label(header_content)

    return signal, label

def read_header_file(file_path):
    with open(file_path, 'r') as file:
        content = file.readlines()
    return content

def load_ecg_data(data_dir, rhythm_types=None):
    signals = []
    labels = []

    for root, _, files in os.walk(data_dir):
        for file in files:
            if file.endswith('.mat'):
                file_name = os.path.splitext(file)[0]
                try:
                    signal, label = load_ecg_sample_by_filename(root, file_name)
                    if rhythm_types is None or label in rhythm_types:
                        signals.append(signal)
                        labels.append(label)
                except Exception as e:
                    print(f"Error loading file {file}: {e}")

    return signals, labels

data_dir = '/Users/aman/Downloads/ecg/WFDBRecords/'
rhythm_types = ['SB', 'SR', 'AFIB', 'ST', 'AF', 'SA', 'SVT', 'AT', 'AVNRT', 'AVRT']

signals, labels = load_ecg_data(data_dir, rhythm_types)

Error loading file JS01052.mat: time data '/' does not match format '%d/%m/%Y'
Error loading file JS23074.mat: list index out of range


In [3]:
import numpy as np
import scipy.signal as signal

def preprocess_ecg_signal(ecg_signal, lowcut=0.5, highcut=50.0, fs=500.0):
    nyquist = 0.5 * fs
    low = lowcut / nyquist
    high = highcut / nyquist
    b, a = signal.butter(1, [low, high], btype='band')
    filtered_signal = signal.lfilter(b, a, ecg_signal)
    normalized_signal = (filtered_signal - np.mean(filtered_signal)) / np.std(filtered_signal)
    return normalized_signal

preprocessed_signals = [preprocess_ecg_signal(signal) for signal in signals]

In [4]:
#Create Patches
def segment_and_save(preprocessed_signals, labels, patch_size=250):
    segmented_signals = []
    segmented_labels = []

    for signal, label in zip(preprocessed_signals, labels):
        patches = [signal[i:i+patch_size] for i in range(0, len(signal), patch_size)]
        patches = [patch for patch in patches if len(patch) == patch_size]
        segmented_signals.extend(patches)
        segmented_labels.extend([label] * len(patches))

    segmented_signals = np.array(segmented_signals)
    segmented_labels = np.array(segmented_labels)

    np.save('segmented_signals.npy', segmented_signals)
    np.save('segmented_labels.npy', segmented_labels)

from sklearn import preprocessing

# Initialize label encoder
le = preprocessing.LabelEncoder()

# Fit label encoder and transform labels into numbers
labels_encoded = le.fit_transform(labels)

# Assume 'preprocessed_signals' and 'labels' are your preprocessed signals and labels
preprocessed_signals = preprocessed_signals
labels = labels_encoded

# Call the function
segment_and_save(preprocessed_signals, labels)

In [5]:
def mask_and_save(input_file='segmented_signals.npy', output_file='masked_signals.npy', patch_size=250, mask_fraction=0.30):
    # Load the segmented signals
    data = np.load(input_file)

    # Create a copy of the data to avoid modifying the original data
    data_masked = data.copy()

    # Calculate the number of patches in each ECG segment
    num_patches = data.shape[1] // patch_size

    # Calculate the number of patches to mask in each ECG segment
    num_mask = int(mask_fraction * num_patches)

    for i in range(data.shape[0]):
        # Choose random patches to mask
        mask_patches = np.random.choice(num_patches, num_mask, replace=False)

        for p in mask_patches:
            # Mask the data in these patches
            start_idx = p * patch_size
            end_idx = start_idx + patch_size
            data_masked[i, start_idx:end_idx] = 0

    # Save the masked signals to disk
    np.save(output_file, data_masked)


In [6]:
# Assume you have segmented signals saved in 'segmented_signals.npy'
mask_and_save('segmented_signals.npy', 'masked_signals.npy', patch_size=250, mask_fraction=0.3)


In [7]:
from sklearn.model_selection import train_test_split

class ECGDataset(Dataset):
    def __init__(self, masked_signals, original_signals):
        self.masked_signals = masked_signals
        self.original_signals = original_signals

    def __len__(self):
        return len(self.masked_signals)

    def __getitem__(self, idx):
        return self.masked_signals[idx], self.original_signals[idx]

# Load the data
masked_signals = np.load('masked_signals.npy')
original_signals = np.load('segmented_signals.npy')

# Split the data into training and validation sets
masked_train, masked_val, original_train, original_val = train_test_split(masked_signals, original_signals, test_size=0.2, random_state=42)

# Create datasets
train_dataset = ECGDataset(masked_train, original_train)
val_dataset = ECGDataset(masked_val, original_val)

# Create dataloaders
train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=32, shuffle=False)


In [8]:

import torch
import torch.nn as nn
from torch.nn import TransformerEncoder, TransformerEncoderLayer
import torch.nn.functional as F
import torch.nn.init as init

class SingleLeadECGModel(nn.Module):
    def __init__(self):
        super(SingleLeadECGModel, self).__init__()

        self.class_token = nn.Parameter(torch.zeros(1, 1, 128))

        # Patch Embedding Module for Encoder
        self.conv1 = nn.Conv1d(in_channels=1, out_channels=32, kernel_size=15, stride=1)
        init.xavier_uniform_(self.conv1.weight)
        
        self.bn1 = nn.BatchNorm1d(32)
        
        self.conv2 = nn.Conv1d(in_channels=32, out_channels=64, kernel_size=7, stride=1)
        init.xavier_uniform_(self.conv2.weight)
        
        self.bn2 = nn.BatchNorm1d(64)
        
        self.conv3 = nn.Conv1d(in_channels=64, out_channels=128, kernel_size=60, stride=125, dilation=2)
        init.xavier_uniform_(self.conv3.weight)
        
        self.ln = nn.LayerNorm(128)

        # Transformer Blocks for Encoder
        encoder_layers = TransformerEncoderLayer(d_model=128, nhead=8, dim_feedforward=128*3)
        self.transformer_encoder = TransformerEncoder(encoder_layers, num_layers=6)

        # Patch Embedding Module for Decoder
        self.decoder_linear = nn.Linear(128, 64)
        init.xavier_uniform_(self.decoder_linear.weight)

        # Transformer Blocks for Decoder
        decoder_layers = TransformerEncoderLayer(d_model=64, nhead=8, dim_feedforward=64*3)
        self.transformer_decoder = TransformerEncoder(decoder_layers, num_layers=3)

        # Prediction Layer
        self.pred_linear = nn.Linear(64, 250)
        init.xavier_uniform_(self.pred_linear.weight)

    def forward(self, x):
        # Patch Embedding Module for Encoder
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = self.ln(self.conv3(x).squeeze(-1))

        # Adding class token to patches
        class_token = self.class_token.repeat(x.size(0), 1, 1)
        x=x.unsqueeze(1)
        x = x + class_token

        # Transformer Blocks for Encoder
        x = self.transformer_encoder(x)

        # Patch Embedding Module for Decoder
        x = self.decoder_linear(x)

        # Transformer Blocks for Decoder
        x = self.transformer_decoder(x)

        # Prediction Layer
        x = self.pred_linear(x)

        # Reshaping output to match original input
        x = x.view(-1, 1, 250)

        return x

In [9]:
import torch
from torch.optim import AdamW
from torch.nn import L1Loss

# Initialize the model and optimizer
model = SingleLeadECGModel().float()
optimizer = AdamW(model.parameters(), lr=1e-3, weight_decay=0.05)

# Move model to GPU if available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

# Loss function
criterion = L1Loss()

In [10]:
# Training loop with validation
for epoch in range(10):
    model.train()
    running_loss = 0.0

    for i, data in enumerate(train_dataloader):
        # Get the inputs; data is a list of [inputs, labels]
        inputs, labels = data[0].unsqueeze(1).float().to(device), data[1].unsqueeze(1).float().to(device)

        # Zero the parameter gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)

        # Calculate loss
        loss = criterion(outputs, labels)

        # Backward pass and optimize
        loss.backward()
        optimizer.step()

        # Print statistics
        running_loss += loss.item()

    # Print average loss per epoch
    train_loss = running_loss / len(train_dataloader)
    print(f"Epoch {epoch + 1}, Training Loss: {train_loss}")

    model.eval()
    running_loss = 0.0
    with torch.no_grad():
        for i, data in enumerate(val_dataloader):
            # Get the inputs; data is a list of [inputs, labels]
            inputs, labels = data[0].to(device), data[1].to(device)

            # Forward pass
            outputs = model(inputs)

            # Calculate loss
            loss = criterion(outputs, labels)

            # Print statistics
            running_loss += loss.item()

        # Print average loss per epoch
        val_loss = running_loss / len(val_dataloader)
        print(f"Epoch {epoch + 1}, Validation Loss: {val_loss}")

# Save the model weights
torch.save(model.state_dict(), 'model_weights.pth')


Epoch 1, Training Loss: 0.5740189938924198


RuntimeError: Given groups=1, weight of size [32, 1, 15], expected input[1, 32, 250] to have 1 channels, but got 32 channels instead