## Importing Dependencies

In [1]:
import os
import pandas as pd
import numpy as np
import wfdb
from scipy import signal
from sklearn.preprocessing import MultiLabelBinarizer, StandardScaler
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras import layers, models
import matplotlib.pyplot as plt

## Path to PTBXL, please adjust accordingly

In [2]:
DATASET_PATH = '/Users/aroraji/Desktop/PersonalProjects/ML/GermanyProject/ptb-xl-a-large-publicly-available-electrocardiography-dataset-1.0.3'

## Loading the Data

In [3]:
df = pd.read_csv(os.path.join(DATASET_PATH, 'ptbxl_database.csv'), index_col='ecg_id')
scp_df = pd.read_csv(os.path.join(DATASET_PATH, 'scp_statements.csv'), index_col=0)
df['scp_codes'] = df['scp_codes'].apply(lambda x: eval(x))

In [4]:
def aggregate_diagnostic(y_dic):
    tmp = []
    for key in y_dic.keys():
        if key in scp_df.index:
            diagnostic_class = scp_df.loc[key]['diagnostic_class']
            if isinstance(diagnostic_class, str) and diagnostic_class.strip():
                tmp.append(diagnostic_class)
    return list(set(tmp))


In [5]:
df['diagnostic_superclass'] = df['scp_codes'].apply(aggregate_diagnostic)

In [6]:
print(df['diagnostic_superclass'].head())

ecg_id
1    [NORM]
2    [NORM]
3    [NORM]
4    [NORM]
5    [NORM]
Name: diagnostic_superclass, dtype: object


In [7]:
df = df[df['diagnostic_superclass'].map(len) > 0].reset_index(drop=True)

In [8]:
print(df['diagnostic_superclass'].head(1000))


0           [NORM]
1           [NORM]
2           [NORM]
3           [NORM]
4           [NORM]
          ...     
995    [HYP, STTC]
996         [NORM]
997         [NORM]
998         [NORM]
999           [CD]
Name: diagnostic_superclass, Length: 1000, dtype: object


In [9]:
def aggregate_subdiagnostic(y_dic):
    tmp = []
    for key in y_dic.keys():
        if key in scp_df.index:
            diagnostic_subclass = scp_df.loc[key]['diagnostic_subclass']
            if isinstance(diagnostic_subclass, str) and diagnostic_subclass.strip():
                tmp.append(diagnostic_subclass)
    return list(set(tmp))

In [10]:
df['diagnostic_subclass'] = df['scp_codes'].apply(aggregate_subdiagnostic)

In [11]:
print(df['diagnostic_subclass'].head(1000))

0           [NORM]
1           [NORM]
2           [NORM]
3           [NORM]
4           [NORM]
          ...     
995    [LVH, STTC]
996         [NORM]
997         [NORM]
998         [NORM]
999        [CRBBB]
Name: diagnostic_subclass, Length: 1000, dtype: object


## Refining the data and End Pre-Processing

In [12]:
df['diagnostic_superclass'] = df['diagnostic_superclass'].apply(
    lambda lst: [x for x in lst if isinstance(x, str) and x.strip()]
)
df = df[df['diagnostic_superclass'].map(len) > 0].reset_index(drop=True)

In [13]:
df['diagnostic_subclass'] = df['diagnostic_subclass'].apply(
    lambda lst: [x for x in lst if isinstance(x, str) and x.strip()]
)
df = df[df['diagnostic_subclass'].map(len) > 0].reset_index(drop=True)

In [14]:
scp_df = pd.read_csv(
    os.path.join(DATASET_PATH, 'scp_statements.csv'),
    index_col=0
)
print("Unique diagnostic classes:")
print(scp_df['diagnostic_class'].unique())

Unique diagnostic classes:
['STTC' 'NORM' 'MI' 'HYP' 'CD' nan]


In [15]:
print(df['diagnostic_superclass'].head())

0    [NORM]
1    [NORM]
2    [NORM]
3    [NORM]
4    [NORM]
Name: diagnostic_superclass, dtype: object


In [16]:
print(df['diagnostic_subclass'].head(1000))

0           [NORM]
1           [NORM]
2           [NORM]
3           [NORM]
4           [NORM]
          ...     
995    [LVH, STTC]
996         [NORM]
997         [NORM]
998         [NORM]
999        [CRBBB]
Name: diagnostic_subclass, Length: 1000, dtype: object


## Setting up a Multilabel Binarizer

In [17]:
from sklearn.preprocessing import MultiLabelBinarizer
all_labels = df['diagnostic_superclass'].tolist() + df['diagnostic_subclass'].tolist()

mlb = MultiLabelBinarizer()
mlb.fit(all_labels)
labels = mlb.transform(df['diagnostic_superclass'])  # For initial labeling

print("All Classes:")
print(mlb.classes_)

All Classes:
['AMI' 'CD' 'CLBBB' 'CRBBB' 'HYP' 'ILBBB' 'IMI' 'IRBBB' 'ISCA' 'ISCI'
 'ISC_' 'IVCD' 'LAFB/LPFB' 'LAO/LAE' 'LMI' 'LVH' 'MI' 'NORM' 'NST_' 'PMI'
 'RAO/RAE' 'RVH' 'SEHYP' 'STTC' 'WPW' '_AVB']


In [18]:
df['filename'] = df['filename_hr'].apply(lambda x: os.path.join(DATASET_PATH, x))

## Continued Preprocessing of Signals (Plus Downsampling to reduce Computations for testing)

In [19]:
def load_signals(df):
    signals = []
    for filename in df['filename']:
        signal, fields = wfdb.rdsamp(filename)
        signals.append(signal)
    return np.array(signals)

X = load_signals(df)

def downsample_signals(X, target_length=1000):
    X_downsampled = signal.resample(X, target_length, axis=1)
    return X_downsampled

X = downsample_signals(X, target_length=1000)

def normalize_signals(X):
    N, L, C = X.shape
    X_normalized = np.zeros_like(X)
    for i in range(N):
        scaler = StandardScaler()
        X_normalized[i] = scaler.fit_transform(X[i])
    return X_normalized

X = normalize_signals(X)


## Splitting Train and Test

In [50]:
df = df.reset_index()
train_indices = df[df.strat_fold < 10].index
test_indices = df[df.strat_fold == 10].index

X_train = X[train_indices]
X_test = X[test_indices]

y_train = labels[train_indices]
y_test = labels[test_indices]

## Resnet

In [51]:
input_shape = X.shape[1:]
num_classes = len(mlb_all.classes_)

def build_resnet(input_shape, num_classes):
    inputs = layers.Input(shape=input_shape)
    x = layers.Conv1D(64, kernel_size=7, strides=2, padding='same')(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)
    x = layers.MaxPooling1D(pool_size=3, strides=2, padding='same')(x)

    def res_block(x, filters, kernel_size=3, stride=1):
        shortcut = x
        x = layers.Conv1D(filters, kernel_size, strides=stride, padding='same')(x)
        x = layers.BatchNormalization()(x)
        x = layers.ReLU()(x)
        x = layers.Conv1D(filters, kernel_size, strides=1, padding='same')(x)
        x = layers.BatchNormalization()(x)
        if stride != 1 or x.shape[-1] != shortcut.shape[-1]:
            shortcut = layers.Conv1D(filters, kernel_size=1, strides=stride, padding='same')(shortcut)
            shortcut = layers.BatchNormalization()(shortcut)
        x = layers.add([x, shortcut])
        x = layers.ReLU()(x)
        return x

    x = res_block(x, 64)
    x = res_block(x, 64)
    
    x = res_block(x, 128, stride=2)
    x = res_block(x, 128)
    
    x = res_block(x, 256, stride=2)
    x = res_block(x, 256)
    
    x = layers.GlobalAveragePooling1D()(x)
    x = layers.Dense(128, activation='relu')(x)
    outputs = layers.Dense(num_classes, activation='sigmoid')(x)
    
    model = models.Model(inputs, outputs)
    return model

def build_model():
    return build_resnet(input_shape, num_classes)

In [52]:
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

In [53]:
batch_size = 32
epochs = 50

history = model.fit(
    X_train, y_train,
    batch_size=batch_size,
    epochs=epochs,
    validation_data=(X_test, y_test)
)


In [54]:
plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'], 'b-', label='Training Accuracy')
plt.plot(history.history['val_accuracy'], 'r-', label='Validation Accuracy')
plt.title('Model Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(history.history['loss'], 'b-', label='Training Loss')
plt.plot(history.history['val_loss'], 'r-', label='Validation Loss')
plt.title('Model Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

plt.show()

## CNN

In [55]:
def build_cnn(input_shape, num_classes):
    model = models.Sequential()
    model.add(layers.Conv1D(filters=64, kernel_size=7, activation='relu', input_shape=input_shape))
    model.add(layers.BatchNormalization())
    model.add(layers.MaxPooling1D(pool_size=2))
    model.add(layers.Conv1D(filters=128, kernel_size=5, activation='relu'))
    model.add(layers.BatchNormalization())
    model.add(layers.MaxPooling1D(pool_size=2))
    model.add(layers.Conv1D(filters=256, kernel_size=3, activation='relu'))
    model.add(layers.BatchNormalization())
    model.add(layers.MaxPooling1D(pool_size=2))
    model.add(layers.Flatten())
    model.add(layers.Dense(128, activation='relu'))
    model.add(layers.Dropout(0.5))
    model.add(layers.Dense(num_classes, activation='sigmoid'))

    return model
input_shape = X_train.shape[1:]
num_classes = y_train.shape[1]
model = build_cnn(input_shape, num_classes)


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


In [56]:
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

batch_size = 32
epochs = 50
history = model.fit(
    X_train, y_train,
    batch_size=batch_size,
    epochs=epochs,
    validation_data=(X_test, y_test)
)

## ViT

In [57]:
def build_transformer(input_shape, num_classes):
    inputs = layers.Input(shape=input_shape)
    x = inputs

    position_embedding = layers.Embedding(
        input_dim=input_shape[0],
        output_dim=input_shape[1]
    )
    positions = tf.range(start=0, limit=input_shape[0], delta=1)
    positions = position_embedding(positions)
    x = x + positions

    num_heads = 4
    ff_dim = 128
    def transformer_encoder_block(inputs, head_size, ff_dim, dropout=0.1):
        x = layers.LayerNormalization(epsilon=1e-6)(inputs)
        x = layers.MultiHeadAttention(num_heads=num_heads, key_dim=head_size, dropout=dropout)(x, x)
        x = layers.Dropout(dropout)(x)
        x = layers.Add()([x, inputs])

        x_ff = layers.LayerNormalization(epsilon=1e-6)(x)
        x_ff = layers.Dense(ff_dim, activation='relu')(x_ff)
        x_ff = layers.Dropout(dropout)(x_ff)
        x_ff = layers.Dense(inputs.shape[-1], activation='relu')(x_ff)
        x_ff = layers.Dropout(dropout)(x_ff)
        x = layers.Add()([x, x_ff])

        return x
    x = transformer_encoder_block(x, head_size=input_shape[1], ff_dim=ff_dim)
    x = transformer_encoder_block(x, head_size=input_shape[1], ff_dim=ff_dim)
    x = transformer_encoder_block(x, head_size=input_shape[1], ff_dim=ff_dim)

    x = layers.GlobalAveragePooling1D()(x)
    x = layers.Dense(128, activation='relu')(x)
    outputs = layers.Dense(num_classes, activation='sigmoid')(x)
    model = models.Model(inputs, outputs)
    return model

input_shape = X_train.shape[1:] 
num_classes = y_train.shape[1]

model = build_transformer(input_shape, num_classes)


In [58]:
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

batch_size = 128
epochs = 50
history = model.fit(
    X_train, y_train,
    batch_size=batch_size,
    epochs=epochs,
    validation_data=(X_test, y_test)
)
