1. Load required libraries

In [None]:
import pandas as pd

import numpy as np
print(np.__version__)

import matplotlib.pyplot as plt

import sklearn
print(sklearn.__version__)

import tensorflow as tf
print(tf.__version__)

from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Bidirectional
from tensorflow.keras.utils import to_categorical

2. Load and prepare the data

In [None]:
import os
from pathlib import Path

# Assuming your CSV files are in a 'data' directory relative to your script
data_dir = Path('csvdata')
print(data_dir)



In [None]:
def load_data(ecg_file, labels_file):
    ecg_signal = pd.read_csv(ecg_file, header=None).values.flatten()
    labels = pd.read_csv(labels_file)
    return ecg_signal, labels

def create_mask(ecg_signal, labels):
    mask = np.zeros(len(ecg_signal), dtype=int)
    for _, row in labels.iterrows():
        start, end = row['ROILimits_1'], row['ROILimits_2']
        value = row['Value']
        mask[start:end] = ['P', 'QRS', 'T'].index(value) + 1
    return mask

def resize_data(ecg_signal, mask, segment_length=5000):
    num_segments = len(ecg_signal) // segment_length
    ecg_segments = np.array_split(ecg_signal[:num_segments*segment_length], num_segments)
    mask_segments = np.array_split(mask[:num_segments*segment_length], num_segments)
    return np.array(ecg_segments), np.array(mask_segments)

# Passing very long input signals into the LSTM network can result in estimation performance degradation and excessive memory usage. 
# To avoid these effects, break the ECG signals and their corresponding label masks.
# create as many 5000-sample segments as possible and discard the remaining samples. 

# Normalise data
def normalise_signal(signal):
    return (signal - np.mean(signal)) / np.std(signal)


# Initialize empty lists:    
all_ecg_segments_norm = []
all_mask_segments_norm = []
#These lists will store the processed ECG segments and their corresponding masks.

# Load and process all files

for i in range(1, 211):  # Assuming 210 files
    ecg_file = data_dir / f'ecg{i}_ecgSignal.csv'
    labels_file = data_dir / f'ecg{i}_signalRegionLabels.csv'
    
    ecg_signal, labels = load_data(ecg_file, labels_file)
    ecg_signal_norm = normalise_signal(ecg_signal)
    mask = create_mask(ecg_signal_norm, labels)
    ecg_segments, mask_segments = resize_data(ecg_signal_norm, mask)
    
    all_ecg_segments_norm.extend(ecg_segments)  
    all_mask_segments_norm.extend(mask_segments)

# The processed segments are added to the respective lists.

# Convert to numpy arrays:
X_norm = np.array(all_ecg_segments_norm)
y_norm = np.array(all_mask_segments_norm)


Array verification

ecg1_signal = pd.read_csv(data_dir/'ecg1_ecgSignal.csv', header=None).values.flatten()
ecg1_labels = pd.read_csv(data_dir/'ecg1_signalRegionLabels.csv')
print(type(ecg1_signal))
print(type(ecg1_labels))
print(ecg1_signal[:5])
print(ecg1_signal.ndim)
print(ecg1_signal.shape)
print(ecg1_labels.head())

ecg1_signal_norm = (ecg1_signal - np.mean(ecg1_signal)) / np.std(ecg1_signal)
print(type(ecg1_signal_norm))
print(ecg1_signal_norm[:5])
print(ecg1_signal_norm.ndim)
print(ecg1_signal_norm.shape)

print("all_ecg_segments_norm list length:", len(all_ecg_segments_norm))
print(type(all_ecg_segments_norm))
print("all_mask_segments_norm length:", len(all_mask_segments_norm))
print(type(all_mask_segments_norm))
print(type(X_norm))
print("X_norm shape:", X_norm.shape)
print(type(y_norm))
print("y_norm shape:", y_norm.shape)
print("X_norm", X_norm[:5])
print("y_norm", y_norm[:5])

3. Split the data into training and testing sets

test_size=0.3: This parameter specifies that 30% of the data should be allocated to the test set, while the remaining 70% will be used for training

random_state=42: This parameter sets a seed for the random number generator, ensuring that the split is reproducible. Using the same random_state will always produce the same split, which is crucial for reproducibility in machine learning experiments

In [None]:
X_norm_train, X_norm_test, y_norm_train, y_norm_test = train_test_split(X_norm, y_norm, test_size=0.3, random_state=42)


4. Prepare the data for LSTM

## one-hot encoding for categorical labels
One-hot encoding is a technique used to convert categorical variables into a format suitable for machine learning algorithms. It transforms categorical data into a binary representation, allowing models to process and interpret non-numeric information effectively.
How One-Hot Encoding Works
The process of one-hot encoding involves the following steps:
	1.	Identify unique categories within a categorical variable.
	2.	Create new binary columns, one for each unique category.
	3.	For each data point, assign a value of 1 in the column corresponding to its category and 0 in all other columns.

In [None]:
y_norm_train_cat = to_categorical(y_norm_train)
y_norm_test_cat= to_categorical(y_norm_test)

Array dimension analysis

print("y_norm_train shape:", type(y_norm_train_cat), y_norm_train_cat.shape)
print("y_norm_train shape:", type(y_norm_train), y_norm_train.shape)
print("X_norm_train shape:", X_norm_train.shape)
print("y_norm_train_cat shape:", y_norm_train_cat.shape)


5. Create and compile the LSTM model

Selection of Layer Sizes
	1.	64 units in the first LSTM layer:
	•	This is likely chosen to capture a rich set of features from the input sequence.
	•	A larger number allows for more complex pattern recognition.
	2.	32 units in the second LSTM layer:
	•	Reduction in units helps in distilling the most important features.
	•	It’s common to reduce the number of units in deeper layers to prevent overfitting.
	3.	5 neurons in the output layer:
	•	This directly corresponds to the number of classes in the classification task.
	•	Each neuron represents the probability of the input belonging to one of the 5 classes.

In [None]:
from keras.models import Sequential
from keras.layers import Bidirectional, LSTM, Dense, Input, TimeDistributed

model_norm = Sequential([
    Input(shape=(5000, 1)),
    Bidirectional(LSTM(64, return_sequences=True)),
    Bidirectional(LSTM(32, return_sequences=True)),
    Dense(4, activation='softmax')
])

model_norm.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])



6. Train the model

Key Components
	1.	model.fit(): This is the Keras method used to train the model on data.
	2.	X_train: The input training data. This typically contains the features or sequences the model will learn from.
	3.	y_train_cat: The target labels for the training data. The ‘_cat’ suffix suggests these are categorical (one-hot encoded) labels.
	4.	validation_split=0.2: This parameter sets aside 20% of the training data for validation. The model won’t train on this data but will use it to evaluate performance after each epoch.
	5.	epochs=10: The number of times the model will iterate over the entire training dataset. Here, it’s set to 10 complete passes.
	6.	batch_size=32: This defines how many samples the model will process before updating its internal parameters. A batch size of 32 is a common choice, balancing between computational efficiency and model update frequency.
	7.	history: The variable that stores the output of the training process. It contains information about the training metrics (like loss and accuracy) for each epoch.

In [None]:
history = model_norm.fit(X_norm_train, y_norm_train_cat, validation_split=0.2, epochs=10, batch_size=32)


7. Evaluate the model

In [None]:
test_loss, test_accuracy = model_norm.evaluate(X_norm_test, y_norm_test_cat)
print(f"Test accuracy: {test_accuracy:.4f}")

print("Input shape:", model_norm.input_shape)
print("Output shape:", model_norm.output_shape)

