## Importing Dependencies

In [1]:
import os
import pandas as pd
import numpy as np
import wfdb
from scipy import signal
from sklearn.preprocessing import MultiLabelBinarizer, StandardScaler
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras import layers, models
import matplotlib.pyplot as plt

## Path to PTBXL, please adjust accordingly

In [2]:
DATASET_PATH = '/Users/aroraji/Desktop/PersonalProjects/ML/GermanyProject/ptb-xl-a-large-publicly-available-electrocardiography-dataset-1.0.3'

## Loading the Data

In [3]:
df = pd.read_csv(os.path.join(DATASET_PATH, 'ptbxl_database.csv'), index_col='ecg_id')
scp_df = pd.read_csv(os.path.join(DATASET_PATH, 'scp_statements.csv'), index_col=0)
df['scp_codes'] = df['scp_codes'].apply(lambda x: eval(x))

In [4]:
def aggregate_diagnostic(y_dic):
    tmp = []
    for key in y_dic.keys():
        if key in scp_df.index:
            diagnostic_class = scp_df.loc[key]['diagnostic_class']
            if isinstance(diagnostic_class, str) and diagnostic_class.strip():
                tmp.append(diagnostic_class)
    return list(set(tmp))


In [5]:
df['diagnostic_superclass'] = df['scp_codes'].apply(aggregate_diagnostic)

In [6]:
print(df['diagnostic_superclass'].head())

ecg_id
1    [NORM]
2    [NORM]
3    [NORM]
4    [NORM]
5    [NORM]
Name: diagnostic_superclass, dtype: object


In [7]:
df = df[df['diagnostic_superclass'].map(len) > 0].reset_index(drop=True)

In [8]:
print(df['diagnostic_superclass'].head(1000))


0           [NORM]
1           [NORM]
2           [NORM]
3           [NORM]
4           [NORM]
          ...     
995    [STTC, HYP]
996         [NORM]
997         [NORM]
998         [NORM]
999           [CD]
Name: diagnostic_superclass, Length: 1000, dtype: object


In [9]:
def aggregate_subdiagnostic(y_dic):
    tmp = []
    for key in y_dic.keys():
        if key in scp_df.index:
            diagnostic_subclass = scp_df.loc[key]['diagnostic_subclass']
            if isinstance(diagnostic_subclass, str) and diagnostic_subclass.strip():
                tmp.append(diagnostic_subclass)
    return list(set(tmp))

In [10]:
df['diagnostic_subclass'] = df['scp_codes'].apply(aggregate_subdiagnostic)

In [11]:
print(df['diagnostic_subclass'].head(1000))

0           [NORM]
1           [NORM]
2           [NORM]
3           [NORM]
4           [NORM]
          ...     
995    [STTC, LVH]
996         [NORM]
997         [NORM]
998         [NORM]
999        [CRBBB]
Name: diagnostic_subclass, Length: 1000, dtype: object


## Refining the data and End Pre-Processing

In [12]:
df['diagnostic_superclass'] = df['diagnostic_superclass'].apply(
    lambda lst: [x for x in lst if isinstance(x, str) and x.strip()]
)
df = df[df['diagnostic_superclass'].map(len) > 0].reset_index(drop=True)

In [13]:
df['diagnostic_subclass'] = df['diagnostic_subclass'].apply(
    lambda lst: [x for x in lst if isinstance(x, str) and x.strip()]
)
df = df[df['diagnostic_subclass'].map(len) > 0].reset_index(drop=True)

In [14]:
scp_df = pd.read_csv(
    os.path.join(DATASET_PATH, 'scp_statements.csv'),
    index_col=0
)
print("Unique diagnostic classes:")
print(scp_df['diagnostic_class'].unique())

Unique diagnostic classes:
['STTC' 'NORM' 'MI' 'HYP' 'CD' nan]


In [15]:
print(df['diagnostic_superclass'].head())

0    [NORM]
1    [NORM]
2    [NORM]
3    [NORM]
4    [NORM]
Name: diagnostic_superclass, dtype: object


In [16]:
print(df['diagnostic_subclass'].head(1000))

0           [NORM]
1           [NORM]
2           [NORM]
3           [NORM]
4           [NORM]
          ...     
995    [STTC, LVH]
996         [NORM]
997         [NORM]
998         [NORM]
999        [CRBBB]
Name: diagnostic_subclass, Length: 1000, dtype: object


## Setting up a Multilabel Binarizer

In [17]:
from sklearn.preprocessing import MultiLabelBinarizer
all_labels = df['diagnostic_superclass'].tolist() + df['diagnostic_subclass'].tolist()

mlb = MultiLabelBinarizer()
mlb.fit(all_labels)
labels = mlb.transform(df['diagnostic_superclass'])  # For initial labeling

print("All Classes:")
print(mlb.classes_)

All Classes:
['AMI' 'CD' 'CLBBB' 'CRBBB' 'HYP' 'ILBBB' 'IMI' 'IRBBB' 'ISCA' 'ISCI'
 'ISC_' 'IVCD' 'LAFB/LPFB' 'LAO/LAE' 'LMI' 'LVH' 'MI' 'NORM' 'NST_' 'PMI'
 'RAO/RAE' 'RVH' 'SEHYP' 'STTC' 'WPW' '_AVB']


In [18]:
df['filename'] = df['filename_hr'].apply(lambda x: os.path.join(DATASET_PATH, x))

## Continued Preprocessing of Signals (Plus Downsampling to reduce Computations for testing)

In [19]:
def load_signals(df):
    signals = []
    for filename in df['filename']:
        signal, fields = wfdb.rdsamp(filename)
        signals.append(signal)
    return np.array(signals)

X = load_signals(df)

def downsample_signals(X, target_length=1000):
    X_downsampled = signal.resample(X, target_length, axis=1)
    return X_downsampled

X = downsample_signals(X, target_length=1000)

def normalize_signals(X):
    N, L, C = X.shape
    X_normalized = np.zeros_like(X)
    for i in range(N):
        scaler = StandardScaler()
        X_normalized[i] = scaler.fit_transform(X[i])
    return X_normalized

X = normalize_signals(X)


## Splitting Train and Test

In [20]:
df = df.reset_index()
train_indices = df[df.strat_fold < 10].index
test_indices = df[df.strat_fold == 10].index

X_train = X[train_indices]
X_test = X[test_indices]

y_train = labels[train_indices]
y_test = labels[test_indices]

## Resnet

In [21]:
mlb_super = MultiLabelBinarizer()
y_super = mlb_super.fit_transform(df['diagnostic_superclass'])
mlb_sub = MultiLabelBinarizer()
y_sub = mlb_sub.fit_transform(df['diagnostic_subclass'])

all_classes = list(set(mlb_super.classes_).union(set(mlb_sub.classes_)))
mlb_all = MultiLabelBinarizer(classes=all_classes)
mlb_all.fit(all_classes)

y_super_all = mlb_all.transform(df['diagnostic_superclass'])
y_sub_all = mlb_all.transform(df['diagnostic_subclass'])

In [22]:
input_shape = X.shape[1:]
num_classes = len(mlb_all.classes_)

def build_resnet(input_shape, num_classes):
    inputs = layers.Input(shape=input_shape)
    x = layers.Conv1D(64, kernel_size=7, strides=2, padding='same')(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)
    x = layers.MaxPooling1D(pool_size=3, strides=2, padding='same')(x)

    def res_block(x, filters, kernel_size=3, stride=1):
        shortcut = x
        x = layers.Conv1D(filters, kernel_size, strides=stride, padding='same')(x)
        x = layers.BatchNormalization()(x)
        x = layers.ReLU()(x)
        x = layers.Conv1D(filters, kernel_size, strides=1, padding='same')(x)
        x = layers.BatchNormalization()(x)
        if stride != 1 or x.shape[-1] != shortcut.shape[-1]:
            shortcut = layers.Conv1D(filters, kernel_size=1, strides=stride, padding='same')(shortcut)
            shortcut = layers.BatchNormalization()(shortcut)
        x = layers.add([x, shortcut])
        x = layers.ReLU()(x)
        return x

    x = res_block(x, 64)
    x = res_block(x, 64)
    
    x = res_block(x, 128, stride=2)
    x = res_block(x, 128)
    
    x = res_block(x, 256, stride=2)
    x = res_block(x, 256)
    
    x = layers.GlobalAveragePooling1D()(x)
    x = layers.Dense(128, activation='relu')(x)
    outputs = layers.Dense(num_classes, activation='sigmoid')(x)
    
    model = models.Model(inputs, outputs)
    return model

def build_model():
    return build_resnet(input_shape, num_classes)

In [23]:
# model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

In [24]:
# batch_size = 32
# epochs = 1

# history = model.fit(
#     X_train, y_train,
#     batch_size=batch_size,
#     epochs=epochs,
#     validation_data=(X_test, y_test)
# )


In [25]:
# plt.figure(figsize=(12, 4))

# plt.subplot(1, 2, 1)
# plt.plot(history.history['accuracy'], 'b-', label='Training Accuracy')
# plt.plot(history.history['val_accuracy'], 'r-', label='Validation Accuracy')
# plt.title('Model Accuracy')
# plt.xlabel('Epoch')
# plt.ylabel('Accuracy')
# plt.legend()

# plt.subplot(1, 2, 2)
# plt.plot(history.history['loss'], 'b-', label='Training Loss')
# plt.plot(history.history['val_loss'], 'r-', label='Validation Loss')
# plt.title('Model Loss')
# plt.xlabel('Epoch')
# plt.ylabel('Loss')
# plt.legend()

# plt.show()

## CNN

In [26]:
def build_cnn(input_shape, num_classes):
    model = models.Sequential()
    model.add(layers.Conv1D(filters=64, kernel_size=7, activation='relu', input_shape=input_shape))
    model.add(layers.BatchNormalization())
    model.add(layers.MaxPooling1D(pool_size=2))
    model.add(layers.Conv1D(filters=128, kernel_size=5, activation='relu'))
    model.add(layers.BatchNormalization())
    model.add(layers.MaxPooling1D(pool_size=2))
    model.add(layers.Conv1D(filters=256, kernel_size=3, activation='relu'))
    model.add(layers.BatchNormalization())
    model.add(layers.MaxPooling1D(pool_size=2))
    model.add(layers.Flatten())
    model.add(layers.Dense(128, activation='relu'))
    model.add(layers.Dropout(0.5))
    model.add(layers.Dense(num_classes, activation='sigmoid'))

    return model
input_shape = X_train.shape[1:]
num_classes = y_train.shape[1]
model = build_cnn(input_shape, num_classes)


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


In [27]:
# model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# batch_size = 32
# epochs = 10
# history = model.fit(
#     X_train, y_train,
#     batch_size=batch_size,
#     epochs=epochs,
#     validation_data=(X_test, y_test)
# )

## ViT

In [28]:
def build_transformer(input_shape, num_classes):
    inputs = layers.Input(shape=input_shape)
    x = inputs

    position_embedding = layers.Embedding(
        input_dim=input_shape[0],
        output_dim=input_shape[1]
    )
    positions = tf.range(start=0, limit=input_shape[0], delta=1)
    positions = position_embedding(positions)
    x = x + positions

    num_heads = 4
    ff_dim = 128
    def transformer_encoder_block(inputs, head_size, ff_dim, dropout=0.1):
        x = layers.LayerNormalization(epsilon=1e-6)(inputs)
        x = layers.MultiHeadAttention(num_heads=num_heads, key_dim=head_size, dropout=dropout)(x, x)
        x = layers.Dropout(dropout)(x)
        x = layers.Add()([x, inputs])

        x_ff = layers.LayerNormalization(epsilon=1e-6)(x)
        x_ff = layers.Dense(ff_dim, activation='relu')(x_ff)
        x_ff = layers.Dropout(dropout)(x_ff)
        x_ff = layers.Dense(inputs.shape[-1], activation='relu')(x_ff)
        x_ff = layers.Dropout(dropout)(x_ff)
        x = layers.Add()([x, x_ff])

        return x
    x = transformer_encoder_block(x, head_size=input_shape[1], ff_dim=ff_dim)
    x = transformer_encoder_block(x, head_size=input_shape[1], ff_dim=ff_dim)
    x = transformer_encoder_block(x, head_size=input_shape[1], ff_dim=ff_dim)

    x = layers.GlobalAveragePooling1D()(x)
    x = layers.Dense(128, activation='relu')(x)
    outputs = layers.Dense(num_classes, activation='sigmoid')(x)
    model = models.Model(inputs, outputs)
    return model

input_shape = X_train.shape[1:] 
num_classes = y_train.shape[1]

model = build_transformer(input_shape, num_classes)


In [29]:
# model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# batch_size = 128
# epochs = 5
# history = model.fit(
#     X_train, y_train,
#     batch_size=batch_size,
#     epochs=epochs,
#     validation_data=(X_test, y_test)
# )


## Setup of CL benchmarks

In [30]:
tasks = [
    {
        'X': X,
        'y': y_super_all,
        'classes': mlb_super.classes_,
        'task_name': 'Superdiagnostic Classification'
    },
    {
        'X': X,
        'y': y_sub_all,
        'classes': mlb_sub.classes_,
        'task_name': 'Subdiagnostic Classification'
    }
]

for idx, task in enumerate(tasks):
    print(f"Task {idx+1} - {task['task_name']}")
    print("Classes:", task['classes'])
    print()

for task in tasks:
    X_train_task, X_test_task, y_train_task, y_test_task = train_test_split(
        task['X'], task['y'], test_size=0.2, random_state=42
    )
    task['X_train'] = X_train_task
    task['X_test'] = X_test_task
    task['y_train'] = y_train_task
    task['y_test'] = y_test_task

Task 1 - Superdiagnostic Classification
Classes: ['CD' 'HYP' 'MI' 'NORM' 'STTC']

Task 2 - Subdiagnostic Classification
Classes: ['AMI' 'CLBBB' 'CRBBB' 'ILBBB' 'IMI' 'IRBBB' 'ISCA' 'ISCI' 'ISC_' 'IVCD'
 'LAFB/LPFB' 'LAO/LAE' 'LMI' 'LVH' 'NORM' 'NST_' 'PMI' 'RAO/RAE' 'RVH'
 'SEHYP' 'STTC' 'WPW' '_AVB']



## Setup for Elastic Weights Consolidation

1. **Fisher Information Matrix Computation**
$$
F = \frac{1}{N} \sum_{i=1}^{N} \left( \nabla_\theta L(y_i, f(x_i; \theta)) \right)^2
$$

2. **EWC Loss Function**
$$
L_{\text{EWC}}(\theta) = L_{\text{current}}(\theta) + \sum_{k} \frac{\lambda}{2} \sum_{j} F_{k}^{(j)} \left( \theta^{(j)} - \theta_k^{*(j)} \right)^2
$$

In [31]:
class EWC:
    def __init__(self, model, lambda_ewc=1000):
        self.model = model
        self.lambda_ewc = lambda_ewc
        self.fisher_matrices = []
        self.star_vars = []

    def compute_fisher(self, X, y, batch_size=32):
        fisher = [np.zeros(var.shape.as_list(), dtype=np.float32) for var in self.model.trainable_variables]
        dataset = tf.data.Dataset.from_tensor_slices((X, y)).batch(batch_size)
        for X_batch, y_batch in dataset:
            with tf.GradientTape() as tape:
                preds = self.model(X_batch, training=False)
                labels = tf.cast(y_batch, dtype=tf.float32)  # Cast labels to float32
                loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(labels=labels, logits=preds))
            grads = tape.gradient(loss, self.model.trainable_variables)
            for n, grad in enumerate(grads):
                fisher[n] += np.square(grad.numpy())
        num_batches = len(list(dataset))
        for n in range(len(fisher)):
            fisher[n] /= num_batches
        return fisher

    def consolidate(self, X, y):
        fisher = self.compute_fisher(X, y)
        self.fisher_matrices.append(fisher)
        self.star_vars.append([var.numpy() for var in self.model.trainable_variables])

    def ewc_loss(self):
        if not self.fisher_matrices:
            return 0
        else:
            ewc_loss = 0
            for task in range(len(self.fisher_matrices)):
                for var, var_star, fisher in zip(self.model.trainable_variables, self.star_vars[task], self.fisher_matrices[task]):
                    ewc_loss += (self.lambda_ewc / 2) * tf.reduce_sum(
                        tf.multiply(tf.convert_to_tensor(fisher), tf.square(var - var_star))
                    )
            return ewc_loss

## Setup for Synaptic Intelligence

1. Parameter change:

   $$
   \Delta \theta_i = \theta_i - \theta_i^{\text{prev}}
   $$

2. Accumulating importance:

   $$
   w_i \leftarrow w_i + g_i \cdot \Delta \theta_i
   $$

3. Update Omega:

   $$
   \Omega_i \leftarrow \Omega_i + \frac{w_i}{(\Delta \theta_i)^2 + \epsilon}
   $$

4. SI loss function:

   $$
   L_{\text{SI}} = L_{\text{current}} + \lambda \sum_i \Omega_i (\theta_i - \theta_i^{\text{prev}})^2
   $$

In [32]:
class SI:
    def __init__(self, model, lambda_si=1.0, epsilon=0.1):
        self.model = model
        self.lambda_si = lambda_si
        self.epsilon = epsilon
        self.prev_params = [var.numpy() for var in self.model.trainable_variables]
        self.w = [np.zeros(var.shape, dtype=np.float32) for var in self.model.trainable_variables]
        self.omega = [np.zeros(var.shape, dtype=np.float32) for var in self.model.trainable_variables]

    def update_omega(self):
        delta_params = [var.numpy() - prev for var, prev in zip(self.model.trainable_variables, self.prev_params)]
        for i in range(len(self.omega)):
            self.omega[i] += self.w[i] / (np.square(delta_params[i]) + self.epsilon)
        self.w = [np.zeros_like(var) for var in self.model.trainable_variables]
        self.prev_params = [var.numpy() for var in self.model.trainable_variables]

    def accumulate_importance(self, grads):
        for i in range(len(grads)):
            self.w[i] += grads[i].numpy() * (self.model.trainable_variables[i].numpy() - self.prev_params[i])

    def si_loss(self):
        loss = 0
        for var, var_prev, omega in zip(self.model.trainable_variables, self.prev_params, self.omega):
            loss += tf.reduce_sum(omega.astype(np.float32) * tf.square(var - var_prev))
        return self.lambda_si * loss

## Train Setups

### Please pardon the bad results they are due to training over a single epoch

### EWC Setup

In [33]:
model_ewc = build_model()
model_si = build_model()

optimizer_ewc = tf.keras.optimizers.Adam()
optimizer_si = tf.keras.optimizers.Adam()
batch_size = 32
epochs = 1

loss_fn = tf.keras.losses.BinaryCrossentropy(from_logits=True)

ewc = EWC(model_ewc, lambda_ewc=1000)
si = SI(model_si, lambda_si=1.0, epsilon=0.1)

for task_idx, task in enumerate(tasks):
    print(f"\nTraining on Task {task_idx+1} with EWC: {task['task_name']}")
    train_dataset = tf.data.Dataset.from_tensor_slices((task['X_train'], task['y_train'])).batch(batch_size)
    test_dataset = tf.data.Dataset.from_tensor_slices((task['X_test'], task['y_test'])).batch(batch_size)

    relevant_class_indices = [mlb_all.classes_.tolist().index(c) for c in task['classes']]

    for epoch in range(epochs):
        for X_batch, y_batch in train_dataset:
            with tf.GradientTape() as tape:
                logits = model_ewc(X_batch, training=True)
                y_batch_task = tf.gather(y_batch, relevant_class_indices, axis=1)
                logits_task = tf.gather(logits, relevant_class_indices, axis=1)
                y_batch_task = tf.cast(y_batch_task, dtype=tf.float32)
                loss = loss_fn(y_batch_task, logits_task)
                ewc_penalty = ewc.ewc_loss()
                total_loss = loss + ewc_penalty
            grads = tape.gradient(total_loss, model_ewc.trainable_variables)
            optimizer_ewc.apply_gradients(zip(grads, model_ewc.trainable_variables))
    ewc.consolidate(task['X_train'], task['y_train'])

    test_loss = tf.keras.metrics.Mean()
    test_accuracy = tf.keras.metrics.BinaryAccuracy()
    for X_batch, y_batch in test_dataset:
        logits = model_ewc(X_batch, training=False)
        y_batch_task = tf.gather(y_batch, relevant_class_indices, axis=1)
        logits_task = tf.gather(logits, relevant_class_indices, axis=1)
        y_batch_task = tf.cast(y_batch_task, dtype=tf.float32)
        loss = loss_fn(y_batch_task, logits_task)
        test_loss(loss)
        test_accuracy.update_state(y_batch_task, tf.sigmoid(logits_task))
    print(f"Test Loss: {test_loss.result()}, Test Accuracy: {test_accuracy.result()}")


Training on Task 1 with EWC: Superdiagnostic Classification
Test Loss: 0.6661294102668762, Test Accuracy: 0.47106125950813293

Training on Task 2 with EWC: Subdiagnostic Classification
Test Loss: 0.6889272332191467, Test Accuracy: 0.24824687838554382


### SI Setup

In [34]:
for task_idx, task in enumerate(tasks):
    print(f"\nTraining on Task {task_idx+1} with SI: {task['task_name']}")
    train_dataset = tf.data.Dataset.from_tensor_slices((task['X_train'], task['y_train'])).batch(batch_size)
    test_dataset = tf.data.Dataset.from_tensor_slices((task['X_test'], task['y_test'])).batch(batch_size)

    relevant_class_indices = [mlb_all.classes_.tolist().index(c) for c in task['classes']]

    for epoch in range(epochs):
        for X_batch, y_batch in train_dataset:
            with tf.GradientTape() as tape:
                logits = model_si(X_batch, training=True)
                y_batch_task = tf.gather(y_batch, relevant_class_indices, axis=1)
                logits_task = tf.gather(logits, relevant_class_indices, axis=1)
                y_batch_task = tf.cast(y_batch_task, dtype=tf.float32)
                loss = loss_fn(y_batch_task, logits_task)
                si_penalty = si.si_loss()
                total_loss = loss + si_penalty
            grads = tape.gradient(total_loss, model_si.trainable_variables)
            si.accumulate_importance(grads)
            optimizer_si.apply_gradients(zip(grads, model_si.trainable_variables))
    si.update_omega()

    test_loss = tf.keras.metrics.Mean()
    test_accuracy = tf.keras.metrics.BinaryAccuracy()
    for X_batch, y_batch in test_dataset:
        logits = model_si(X_batch, training=False)
        y_batch_task = tf.gather(y_batch, relevant_class_indices, axis=1)
        logits_task = tf.gather(logits, relevant_class_indices, axis=1)
        y_batch_task = tf.cast(y_batch_task, dtype=tf.float32)
        loss = loss_fn(y_batch_task, logits_task)
        test_loss(loss)
        test_accuracy.update_state(y_batch_task, tf.sigmoid(logits_task))
    print(f"Test Loss: {test_loss.result()}, Test Accuracy: {test_accuracy.result()}")


Training on Task 1 with SI: Superdiagnostic Classification
Test Loss: 0.6764861345291138, Test Accuracy: 0.6427768468856812

Training on Task 2 with SI: Subdiagnostic Classification
Test Loss: 0.6928368806838989, Test Accuracy: 0.9415207505226135
