In [1]:
import torch
from torch import nn
from torch.utils.data import TensorDataset, DataLoader
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt


In [2]:
src = pd.read_csv('src_data.csv')
lbl = pd.read_csv('lbl_data.csv')

In [3]:
mask = np.zeros(len(src), dtype=int)

# Populate the mask with 1 for QRS and 2 for P
for _, row in lbl.iterrows():
    if row['Value'] == 'QRS':
        mask[row['ROILimits_1']:row['ROILimits_2'] + 1] = 1
    elif row['Value'] == 'P':
        mask[row['ROILimits_1']:row['ROILimits_2'] + 1] = 2

# Now we have the mask, we can save it to a CSV file
mask_df = pd.DataFrame(mask, columns=['mask'])

ROILimits_1      0
ROILimits_2     12
Value          QRS
Name: 0, dtype: object
ROILimits_1     35
ROILimits_2     51
Value          QRS
Name: 1, dtype: object
ROILimits_1     77
ROILimits_2     89
Value          QRS
Name: 2, dtype: object
ROILimits_1    113
ROILimits_2    127
Value          QRS
Name: 3, dtype: object
ROILimits_1    153
ROILimits_2    166
Value          QRS
Name: 4, dtype: object
ROILimits_1    192
ROILimits_2    207
Value          QRS
Name: 5, dtype: object
ROILimits_1    229
ROILimits_2    245
Value          QRS
Name: 6, dtype: object
ROILimits_1    269
ROILimits_2    284
Value          QRS
Name: 7, dtype: object
ROILimits_1    310
ROILimits_2    324
Value          QRS
Name: 8, dtype: object
ROILimits_1    348
ROILimits_2    362
Value          QRS
Name: 9, dtype: object
ROILimits_1    387
ROILimits_2    400
Value          QRS
Name: 10, dtype: object
ROILimits_1    427
ROILimits_2    440
Value          QRS
Name: 11, dtype: object
ROILimits_1    464
ROILimits_2    480


In [5]:
X = src.values.flatten()
Y = mask_df.values.flatten()
print(X.shape)
print(Y.shape)

(2935,)
(2935,)


In [6]:
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

In [7]:
# Parameters
sequence_length = 50  # Length of input sequences for the LSTM
hidden_size = 64      # Number of features in the hidden state of the LSTM
num_classes = 3       # Number of output classes (0: nothing, 1: QRS, 2: P)
batch_size = 64       # Batch size for training
learning_rate = 0.001 # Learning rate for the optimizer
epochs = 10           # Number of epochs to train for

# Custom dataset
class ECGDataset(Dataset):
    def __init__(self, sequences, labels):
        self.sequences = sequences
        self.labels = labels

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        return (self.sequences[idx], self.labels[idx])

# Create sequences for LSTM
def create_sequences(data, sequence_length):
    sequences = []
    for i in range(len(data) - sequence_length):
        sequences.append(data[i:i+sequence_length])
    return sequences

X_seq = create_sequences(X, sequence_length)
Y_seq = Y[sequence_length:]
for seq, lab in zip(X_seq, Y_seq):
    # plt.plot(seq)
    # plt.plot(lab*200)
    # plt.show()
    print(lab)
# Split into training and testing sets
X_train, X_test, Y_train, Y_test = train_test_split(
    X_seq, Y_seq, test_size=0.2, random_state=42
)

# Convert to PyTorch tensors
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
Y_train_tensor = torch.tensor(Y_train, dtype=torch.long)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
Y_test_tensor = torch.tensor(Y_test, dtype=torch.long)

# Create DataLoader for training and testing
train_dataset = ECGDataset(X_train_tensor, Y_train_tensor)
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)

test_dataset = ECGDataset(X_test_tensor, Y_test_tensor)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)

# Define LSTM Model
class LSTMClassifier(nn.Module):
    def __init__(self, sequence_length, hidden_size, num_classes):
        super(LSTMClassifier, self).__init__()
        self.lstm = nn.LSTM(input_size=1, hidden_size=hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        # Initialize hidden and cell states with zeros
        h0 = torch.zeros(1, x.size(0), hidden_size)
        c0 = torch.zeros(1, x.size(0), hidden_size)
        
        # Forward propagate LSTM
        out, _ = self.lstm(x, (h0, c0))
        
        # Decode the hidden state of the last time step
        out = self.fc(out[:, -1, :])
        # print(f'outsize: {out.size()}')
        return out

# Initialize model, loss, and optimizer
model = LSTMClassifier(sequence_length, hidden_size, num_classes)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

# Training loop
for epoch in range(epochs):
    for sequences, labels in train_loader:
        optimizer.zero_grad()
        outputs = model(sequences.unsqueeze(-1)) # Add channel dimension
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
    print(f'Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}')

# Evaluation
model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for sequences, labels in test_loader:
        outputs = model(sequences.unsqueeze(-1))
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

accuracy = 100 * correct / total
print(f'Accuracy on test set: {accuracy:.2f}%')

1
1
0
0
0
0
0
0
0
0
0
2
2
2
2
2
2
2
2
2
2
0
0
0
0
0
0
1
1
1
1
1
1
1
1
1
1
1
1
1
0
0
0
0
0
0
0
0
0
0
0
0
2
2
2
2
2
2
2
2
0
0
0
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
0
0
0
0
0
0
0
0
0
0
2
2
2
2
2
2
2
2
2
2
2
0
0
0
0
1
1
1
1
1
1
1
1
1
1
1
1
1
1
0
0
0
0
0
0
0
0
2
2
2
2
2
2
2
2
2
2
2
2
2
0
0
0
0
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
0
0
0
0
0
0
0
0
0
0
2
2
2
2
2
2
2
2
2
0
0
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
0
0
0
0
0
0
0
0
0
0
0
2
2
2
2
2
2
2
2
0
0
0
0
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
0
0
0
0
0
0
0
0
0
0
0
2
2
2
2
2
2
2
2
0
0
0
0
0
0
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
0
0
0
0
0
0
0
0
0
2
2
2
2
2
2
2
2
2
0
0
0
0
0
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
0
0
0
0
0
0
0
0
0
0
0
2
2
2
2
2
2
2
2
2
0
0
0
0
1
1
1
1
1
1
1
1
1
1
1
1
1
1
0
0
0
0
0
0
0
0
0
0
0
2
2
2
2
2
2
2
2
2
2
2
0
0
0
0
1
1
1
1
1
1
1
1
1
1
1
1
1
1
0
0
0
0
0
0
0
0
0
2
2
2
2
2
2
2
2
2
2
0
0
0
0
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
0
0
0
0
0
0
0
0
0
2
2
2
2
2
2
2
2
2
2
2
2
0
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
0
0
0
0
0
0
0
0
0
0
2
2
2
2
2
2
2
2
2
2
2
0
1
1
1
1
1
1
1
1
1


  X_train_tensor = torch.tensor(X_train, dtype=torch.float32)


Epoch [1/10], Loss: 0.5142
Epoch [2/10], Loss: 0.6615
Epoch [3/10], Loss: 0.9270
Epoch [4/10], Loss: 0.4953
Epoch [5/10], Loss: 0.2626
Epoch [6/10], Loss: 0.0539
Epoch [7/10], Loss: 0.0456
Epoch [8/10], Loss: 0.8703
Epoch [9/10], Loss: 0.1396
Epoch [10/10], Loss: 0.5917
Accuracy on test set: 92.55%


In [None]:
import matplotlib.pyplot as plt

for sequences, labels in test_loader:
    print(sequences.unsqueeze(-1).size())
    print(labels)

torch.Size([64, 50, 1])
tensor([0, 2, 0, 2, 0, 2, 0, 0, 2, 2, 2, 0, 2, 2, 0, 0, 1, 0, 2, 0, 0, 0, 1, 0,
        1, 0, 0, 0, 0, 2, 1, 2, 0, 2, 2, 0, 0, 0, 2, 0, 1, 1, 0, 0, 0, 2, 0, 0,
        2, 2, 1, 0, 1, 1, 0, 2, 2, 2, 0, 1, 0, 2, 2, 1])
torch.Size([64, 50, 1])
tensor([0, 0, 0, 0, 2, 0, 2, 0, 1, 1, 1, 1, 1, 0, 2, 0, 2, 1, 0, 0, 1, 1, 0, 1,
        1, 2, 1, 1, 2, 1, 0, 2, 0, 0, 0, 0, 1, 1, 1, 0, 2, 1, 1, 1, 2, 1, 2, 2,
        1, 0, 2, 1, 2, 0, 1, 1, 2, 0, 0, 1, 2, 0, 1, 2])
torch.Size([64, 50, 1])
tensor([0, 0, 1, 0, 0, 0, 2, 0, 1, 0, 1, 2, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0, 1, 1,
        0, 1, 1, 1, 1, 1, 0, 2, 0, 1, 2, 1, 2, 0, 1, 0, 2, 1, 0, 2, 0, 2, 1, 1,
        0, 2, 0, 2, 2, 2, 0, 2, 2, 0, 0, 2, 1, 0, 2, 0])
torch.Size([64, 50, 1])
tensor([1, 1, 2, 1, 1, 1, 0, 0, 2, 0, 2, 1, 0, 0, 0, 1, 1, 2, 1, 0, 1, 0, 2, 1,
        1, 1, 2, 1, 1, 1, 0, 2, 2, 0, 2, 2, 0, 1, 2, 1, 2, 0, 2, 1, 1, 0, 1, 1,
        2, 0, 0, 2, 2, 1, 1, 1, 0, 1, 0, 0, 1, 2, 1, 1])
torch.Size([64, 50, 1])
tensor([1, 2

In [1]:
import numpy as np
import pandas as pd

# Your label array
labels = np.array([0,0,0,0,0,0,1,1,1,1,1,1,0,0,0,0,0,0,2,2,2,2,2,2])

# Identify the changes in the label array
changes = np.diff(labels, prepend=labels[0])

# Identify the start and end indices
starts = np.where(changes != 0)[0]
ends = np.where(changes != 0)[0][1:]

# Include the end of the last segment
ends = np.append(ends, len(labels))

# Get the wave types
waves = labels[starts]

# Create the DataFrame
df = pd.DataFrame({
    'wave': waves,
    'start_index': starts,
    'end_index': ends - 1
})

# Filter out the segments with wave type 0
df = df[df['wave'] != 0].reset_index(drop=True)

# Map the wave types to their names
wave_mapping = {1: 'p', 2: 'qrs'}
df['wave'] = df['wave'].map(wave_mapping)

print(df)


  wave  start_index  end_index
0    p            6         11
1  qrs           18         23


array([-1251., -1251., -1137., ...,  -167.,     8.,   -15.])

In [1]:
(32.0 - 13.0)/512

0.037109375