# Desease prediction by ECG

## Data processing

In [None]:
import os
import numpy as np
import wfdb

In [None]:
data_path = "./ecg_resources/data"
label_path = "./ecg_resources/annotations.csv"
output_path = "./data"

min_length = 2200 # this is length of the shortest timestamp
batch = []

labels = np.loadtxt(label_path, delimiter=',', skiprows=1, dtype=str)
trimed_labels = np.delete(labels, [0, 1, 2, 3, -1], axis=1) # keeps only these columns -> ['1dAVb' 'RBBB' 'LBBB' 'SB' 'AF' 'ST']
casted_labels = trimed_labels.astype(np.float32)

# labels preview
# [[0. 0. 0. 0. 0. 0.]
#  [0. 0. 0. 0. 1. 0.]
#  [1. 0. 0. 0. 0. 0.]
#  ...
#  [0. 0. 0. 0. 0. 0.]
#  [0. 0. 0. 0. 0. 0.]
#  [0. 0. 0. 0. 0. 0.]]

filename = "batch"
file_group = 1

os.makedirs(output_path, exist_ok=True)

In [None]:
while file_group <= 39_999:
    record_name = f"TNMG{file_group}_N1"
    print(f"Processing record: {record_name}")
    try:
        record_signal = wfdb.rdrecord(os.path.join(data_path, record_name)).p_signal

        if record_signal.shape[0] > min_length:
            trimmed_signal = record_signal[:min_length, :]
        else:
            trimmed_signal = record_signal

        batch.append(trimmed_signal)

    except Exception as e:
        print(f"Can't load file: {record_name}, error: {e}\n")
        file_group += 1
        continue

    if len(batch) == 100:
        numpy_array = np.array(batch, dtype=np.float32)
        trimmed_labels = casted_labels[:100]

        np.savez(os.path.join(output_path, f"{filename}-{int(file_group / 100)}.npz"),
                 signals=numpy_array, labels=trimmed_labels)

        print(f"Batch saved as {filename}-{int(file_group / 100)}.npz\n")

        batch = []
        casted_labels = casted_labels[100:]

    file_group += 1

## Creating a PyTorch dataset

In [1]:
import os
import numpy as np
import torch

torch.__version__

'2.5.1+cu124'

In [None]:
input_path = "./data"

data = []
labels = []

for i, file in enumerate(os.listdir(input_path)):
    print(f"{i}. processing file: {file}")
    file_path = os.path.join(input_path, file)

    record = np.load(file_path)
    signals = record["signals"]
    label = record["labels"]

    data.append(signals)
    labels.append(label)

data = np.array(data)
labels = np.array(labels)

torch.save({'data': data, 'labels': labels}, 'dataset.pt')

## Dataset class

In [None]:
import torch
from torch.utils.data import Dataset

class ECGDataset(Dataset):
    def __init__(self, data, labels):
        # Convert the data and labels to torch tensors
        self.data = torch.tensor(data, dtype=torch.float32)
        self.labels = torch.tensor(labels, dtype=torch.long)  # Assume labels are integers (class indices)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        x = self.data[index]  # Shape: (100, 2200, 8)
        y = self.labels[index]  # Shape: (100, 6) - multi-class labels for each sequence

        # Convert (100, 2200, 8) → (100, 8, 2200) to match input shape (channels, timesteps)
        x = x.permute(0, 2, 1)  # (100, 2200, 8) → (100, 8, 2200)

        # For multi-class classification, you need one label per sample, not one for each sequence
        # We will take the class label of the first sequence (or average it, depending on your task)
        y = y.argmax(dim=1)  # Assuming y is a one-hot vector, take the max index as the class

        return x, y

## Model class

In [None]:
from torch import nn
from torch.nn import functional as F

class ECGCNN(nn.Module):
    def __init__(self, num_classes=6):
        super(ECGCNN, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=1600, out_channels=16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        self.conv2 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        self.pool = nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2))
        self.fc1 = nn.Linear(35200, 128)  # Adjust based on your input size
        self.fc2 = nn.Linear(128, num_classes)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        print(x.shape)
        #x = x.view(-1, 32 * 550 * 50)  # Flatten the tensor
        x = x.flatten()
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

## Model training

In [None]:
import torch
from model import ECGCNN
from torch import nn, optim
from dataset import ECGDataset
from torch.utils.data import DataLoader

device = "cuda" if torch.cuda.is_available() else "cpu"

# Instance of model, loss function, and optimizer
model = ECGCNN().to(device)

loss_fn = nn.CrossEntropyLoss()
optimizer = optim.Adam(params=model.parameters(), lr=0.001)

# Create dataloader
loaded_data = torch.load('dataset.pt', weights_only=False)

dataset = ECGDataset(loaded_data['data'], loaded_data['labels'])  # shape -> data(298, 100, 2200, 8), labels(100)
dataloader = DataLoader(dataset, batch_size=16, shuffle=True)
dataloader_length = len(dataloader)

# Training loop
epochs = 300
for epoch in range(epochs):
    running_loss = 0.0  # To accumulate loss for each epoch
    for i, (inputs, labels) in enumerate(dataloader):
        # Move data to the correct device
        inputs, labels = inputs.to(device), labels.to(device)

        # Reshape inputs to [batch_size, 8, 2200]
        inputs = inputs.permute(0, 3, 2, 1).reshape(-1, 8, 2200)

        # Forward pass
        outputs = model(inputs)
        print(outputs.shape, labels.shape)
        loss = loss_fn(outputs, labels.argmax(dim=1))  # Convert one-hot to class indices

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Accumulate loss
        running_loss += loss.item()

    # Print average loss for the epoch
    epoch_loss = running_loss / dataloader_length
    print(f'Epoch [{epoch+1}/{epochs}], Loss: {epoch_loss:.4f}')

# Evaluation
model.eval()  # Set the model to evaluation mode
with torch.inference_mode():
    correct = 0
    total = 0
    for inputs, labels in dataloader:
        # Move data to the correct device
        inputs, labels = inputs.to(device), labels.to(device)

        # Reshape inputs to [batch_size, 8, 2200]
        inputs = inputs.view(-1, 8, 2200)

        # Forward pass
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)

        # Calculate accuracy
        total += labels.size(0)
        correct += (predicted == labels.argmax(dim=1)).sum().item()

    print(f'Accuracy of the model on the test images: {100 * correct / total:.2f} %')

# Save trained model
torch.save(model.state_dict(), "ecg_classifier.pth")
print("Training complete, model saved!")
