In [20]:
import pandas as pd
import numpy as np
import wfdb
import ast
import sklearn

import tensorflow as tf
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Dropout, Input
from tensorflow.keras.models import Sequential
from tensorflow.keras.utils import to_categorical
from sklearn.preprocessing import MultiLabelBinarizer

In [21]:
def load_raw_data(df, sampling_rate, path):
    if sampling_rate == 100:
        data = [wfdb.rdsamp(path+f) for f in df.filename_lr]
    else:
        data = [wfdb.rdsamp(path+f) for f in df.filename_hr]
    data = np.array([signal for signal, meta in data])
    return data

def aggregate_diagnostic(y_dic):
    tmp = []
    for key in y_dic.keys():
        if key in agg_df.index:
            tmp.append(agg_df.loc[key].diagnostic_class)
    return list(set(tmp))

In [22]:
y_strings = np.array(['NORM','MI', 'CD', 'STTC', 'HYP'], dtype=object)

In [14]:
path = 'ptb-xl/'
sampling_rate=100

# load and convert annotation data
Y = pd.read_csv(path+'ptbxl_database.csv', index_col='ecg_id')
Y.scp_codes = Y.scp_codes.apply(lambda x: ast.literal_eval(x))

# Load raw signal data
X = load_raw_data(Y, sampling_rate, path)

# Load scp_statements.csv for diagnostic aggregation
agg_df = pd.read_csv(path+'scp_statements.csv', index_col=0)
agg_df = agg_df[agg_df.diagnostic == 1]

# Apply diagnostic superclass
Y['diagnostic_superclass'] = Y.scp_codes.apply(aggregate_diagnostic)

# Split data into train and test
test_fold = 10
# Train
X_train = X[np.where(Y.strat_fold != test_fold)]
y_train = Y[(Y.strat_fold != test_fold)].diagnostic_superclass

# Test
X_test = X[np.where(Y.strat_fold == test_fold)]
y_test = Y[Y.strat_fold == test_fold].diagnostic_superclass

ValueError: setting an array element with a sequence.

In [30]:
mlb = MultiLabelBinarizer()
y_train_enc = mlb.fit_transform(y_train)
y_test_enc = mlb.fit_transform(y_test)
print("Encoded Labels (y_encoded):\n", y_train_enc)
print("\nClass Names (Order of Columns):\n", mlb.classes_)

Encoded Labels (y_encoded):
 [[0 0 0 1 0]
 [0 0 0 1 0]
 [0 0 0 1 0]
 ...
 [0 0 0 0 1]
 [0 0 0 1 0]
 [0 0 0 1 0]]

Class Names (Order of Columns):
 ['CD' 'HYP' 'MI' 'NORM' 'STTC']


# Model Training

In [36]:


# THis is the number of classes. For Superclass it will be 5, if we do all of or a subset of the subclasses there can be more or less.
NUM_CLASSES = 5

# The input shape for the dataset
IN_SHAPE = (1000,12)

model = Sequential([
    Input(IN_SHAPE),
    Conv1D(filters=32, kernel_size=5, activation='relu'),
    MaxPooling1D(pool_size=2),
    Conv1D(filters=64, kernel_size=5, activation='relu'),
    MaxPooling1D(pool_size=2),
    Flatten(),
    Dense(64, activation='relu'),
    # Dropout(0.5),
    Dense(NUM_CLASSES, activation='sigmoid')]
)

In [37]:
model.compile(
    optimizer='adam', 
    loss='binary_crossentropy', # could also use categorical_crossentropy here for a single choice per input. Change output to softmax if doing that approach though
    metrics=['accuracy', tf.keras.metrics.AUC(multi_label=True)]
)

In [38]:
print("X_train dtype:", X_train.dtype)
print("y_train dtype:", y_train_enc.dtype)
model.fit(X_train, y_train_enc)

X_train dtype: float64
y_train dtype: int64
[1m613/613[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m20s[0m 29ms/step - accuracy: 0.5859 - auc_4: 0.8260 - loss: 0.3976


<keras.src.callbacks.history.History at 0x1b1b9be9970>

In [39]:
loss, accuracy, auc_score = model.evaluate(X_test, y_test_enc, verbose=1)

# Print the results
print(f"Test Loss: {loss:.4f}")
print(f"Test Accuracy: {accuracy:.4f}")
print(f"Test AUC: {auc_score:.4f}")

[1m69/69[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 7ms/step - accuracy: 0.6406 - auc_4: 0.8664 - loss: 0.3634
Test Loss: 0.3634
Test Accuracy: 0.6406
Test AUC: 0.8664
