In [10]:
import pandas as pd
import numpy as np
import wfdb
import ast
import sklearn

import tensorflow as tf
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Dropout, Input
from tensorflow.keras.models import Sequential
from sklearn.preprocessing import MultiLabelBinarizer
from perturbations import PerturbationConfig, apply_perturbation


In [11]:
def load_raw_data(df, sampling_rate, path):
    if sampling_rate == 100:
        data = [wfdb.rdsamp(path+f) for f in df.filename_lr]
    else:
        data = [wfdb.rdsamp(path+f) for f in df.filename_hr]
    data = np.array([signal for signal, meta in data])
    return data

def aggregate_diagnostic(y_dic):
    tmp = []
    for key in y_dic.keys():
        if key in agg_df.index:
            tmp.append(agg_df.loc[key].diagnostic_class)
    return list(set(tmp))

In [12]:
path = 'ptb-xl/'
sampling_rate=100

# load and convert annotation data
Y = pd.read_csv(path+'ptbxl_database.csv', index_col='ecg_id')
Y.scp_codes = Y.scp_codes.apply(lambda x: ast.literal_eval(x))

# Load raw signal data
X = load_raw_data(Y, sampling_rate, path)

# Load scp_statements.csv for diagnostic aggregation
agg_df = pd.read_csv(path+'scp_statements.csv', index_col=0)
agg_df = agg_df[agg_df.diagnostic == 1]

# Apply diagnostic superclass
Y['diagnostic_superclass'] = Y.scp_codes.apply(aggregate_diagnostic)

# Split data into train and test
test_fold = 10
# Train
X_train = X[np.where(Y.strat_fold != test_fold)]
y_train = Y[(Y.strat_fold != test_fold)].diagnostic_superclass

# Test
X_test = X[np.where(Y.strat_fold == test_fold)]
y_test = Y[Y.strat_fold == test_fold].diagnostic_superclass

### Fixing Y data for proper training format

In [13]:
mlb = MultiLabelBinarizer()
y_train_enc = mlb.fit_transform(y_train)
y_test_enc = mlb.fit_transform(y_test)
print("Encoded Labels (y_encoded):\n", y_train_enc)
print("\nClass Names (Order of Columns):\n", mlb.classes_)

Encoded Labels (y_encoded):
 [[0 0 0 1 0]
 [0 0 0 1 0]
 [0 0 0 1 0]
 ...
 [0 0 0 0 1]
 [0 0 0 1 0]
 [0 0 0 1 0]]

Class Names (Order of Columns):
 ['CD' 'HYP' 'MI' 'NORM' 'STTC']


# Model Training

## Model structure

In [14]:
# THis is the number of classes. For Superclass it will be 5, if we do all of or a subset of the subclasses there can be more or less.
NUM_CLASSES = 5

# The input shape for the dataset
IN_SHAPE = (1000,12)

model = Sequential([
    Input(IN_SHAPE),
    Conv1D(filters=32, kernel_size=5, activation='relu'),
    MaxPooling1D(pool_size=2),
    Conv1D(filters=64, kernel_size=5, activation='relu'),
    MaxPooling1D(pool_size=2),
    Flatten(),
    Dense(64, activation='relu'),
    # Dropout(0.5),
    Dense(NUM_CLASSES, activation='sigmoid')]
)

## Compiling and Fitting

In [15]:
model.compile(
    optimizer='adam', 
    loss='binary_crossentropy', # could also use categorical_crossentropy here for a single choice per input. Change output to softmax if doing that approach though
    metrics=['accuracy', tf.keras.metrics.AUC(multi_label=True)]
)

In [16]:
model.fit(X_train, y_train_enc)

[1m613/613[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m17s[0m 24ms/step - accuracy: 0.5970 - auc_1: 0.8336 - loss: 0.3910


<keras.src.callbacks.history.History at 0x1e219365730>

## Testing

In [17]:
loss, accuracy, auc_score = model.evaluate(X_test, y_test_enc, verbose=1)

# Print the results
print(f"Test Loss: {loss:.4f}")
print(f"Test Accuracy: {accuracy:.4f}")
print(f"Test AUC: {auc_score:.4f}")

[1m69/69[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 9ms/step - accuracy: 0.6146 - auc_1: 0.8715 - loss: 0.3614
Test Loss: 0.3614
Test Accuracy: 0.6146
Test AUC: 0.8715


## Noise Robustness Evaluation

We compare model performance on clean signals versus signals with band-limited noise applied
through the perturbation API (strength = 0.3). Adjust the configuration to explore different artefacts.


In [23]:
noise_seed = 42
noise_config = PerturbationConfig(
    ptype='band_noise',
    strength=1,
    center_time=5.0,
    window_seconds=None,
    extra={'beta': 0.1, 'band': (5.0, 40.0)}
)

def evaluate_clean_and_noisy(model, X_eval, y_eval, fs, noise_cfg, seed=None):
    rng = np.random.default_rng(seed) if seed is not None else None
    print('=== Clean evaluation ===')
    clean_metrics = model.evaluate(X_eval, y_eval, verbose=1)
    print(f'Clean -> Loss: {clean_metrics[0]:.4f}, Acc: {clean_metrics[1]:.4f}, AUC: {clean_metrics[2]:.4f}')
    print('=== Noisy evaluation ===')
    X_noisy = np.stack([
        apply_perturbation(x, fs=fs, config=noise_cfg, rng=rng)
        for x in X_eval
    ])
    noisy_metrics = model.evaluate(X_noisy, y_eval, verbose=1)
    print(f'Noisy -> Loss: {noisy_metrics[0]:.4f}, Acc: {noisy_metrics[1]:.4f}, AUC: {noisy_metrics[2]:.4f}')
    return clean_metrics, noisy_metrics, X_noisy

clean_metrics, noisy_metrics, X_test_noisy = evaluate_clean_and_noisy(
    model,
    X_test,
    y_test_enc,
    fs=sampling_rate,
    noise_cfg=noise_config,
    seed=noise_seed,
)


=== Clean evaluation ===
[1m69/69[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 14ms/step - accuracy: 0.6146 - auc_1: 0.8715 - loss: 0.3614
Clean -> Loss: 0.3614, Acc: 0.6146, AUC: 0.8715
=== Noisy evaluation ===
[1m69/69[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 12ms/step - accuracy: 0.6106 - auc_1: 0.8714 - loss: 0.3625
Noisy -> Loss: 0.3625, Acc: 0.6106, AUC: 0.8714
