In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
import wfdb
from scipy import signal
from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.model_selection import train_test_split
from sklearn.model_selection import train_test_split
import neurokit2 as nk
from sklearn.preprocessing import MinMaxScaler
from scipy.signal import butter, lfilter, freqz, iirnotch
import tensorflow as tf
from tensorflow.keras import layers

pd.reset_option('display.max_columns')  # Show all columns
pd.reset_option('display.expand_frame_repr')  # Disable wrapping
pd.reset_option('display.max_colwidth')  # Show full contents of each cell


In [None]:
# Load the CSV file
superclass_df = pd.read_csv('superclass.csv')

# Initialize empty arrays to store the data
data = []

# Specify the root directory
root_dir = '../ptb-xl/ptb-xl-a-large-publicly-available-electrocardiography-dataset-1.0.3/'

# Loop over all records in the DataFrame
for idx, row in superclass_df.iterrows():
    if (idx+1)%100 == 0:
        print(f"Processing record {idx+1}/{len(superclass_df)}")
    # Get filename without extension
    filename = os.path.splitext(row['filename_lr'])[0]

    # Construct the full path
    full_path = os.path.join(root_dir, filename)

    # Load the raw ECG signal data from the .dat file
    dat, fields = wfdb.rdsamp(full_path)

    # Load the labels data from the .hea file
    hea = wfdb.rdheader(full_path)

    # Add data to the list
    data.append({
        'record_name': filename,
        'superclass': row['diagnostic_superclass'],
        'signal': dat,
        'age': row['age'],
        'sex': row['sex']
    })

# Convert the list to a DataFrame
df = pd.DataFrame(data)

# Save the DataFrame
df.to_pickle('df_processed.pkl')

df_orig = pd.read_pickle('df_processed.pkl')
df_orig

# Preprocessing

## One Hot

In [None]:
# Make sure each label is in a separate list
df['superclass'] = df['superclass'].apply(lambda x: [x] if isinstance(x, str) else x)

mlb = MultiLabelBinarizer()
label_encoded = mlb.fit_transform(df['superclass'])
df_encoded = pd.concat([df.drop('superclass', axis=1), pd.DataFrame(label_encoded, columns=mlb.classes_)], axis=1)

df_encoded



In [None]:
# Create a new column 'DISEASE' which is the sum of the four disease columns
df_encoded['DISEASE'] = df_encoded[['[\'CD\']', '[\'HYP\']', '[\'MI\']', '[\'STTC\']']].sum(axis=1)

# Any row where 'DISEASE' is greater than 0 means the patient has some disease, so set it to 1
df_encoded['DISEASE'] = df_encoded['DISEASE'].apply(lambda x: 1 if x > 0 else 0)

# Now you can drop the original four disease columns
df_encoded = df_encoded.drop(columns=['[\'CD\']', '[\'HYP\']', '[\'MI\']', '[\'STTC\']'])



## SPLIT


In [None]:
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

# Separate the target labels (heart condition classes) and the input features
X_ecg = np.array(df_encoded['signal'].tolist())
X_info = df_encoded[[]]  # age and sex as additional features
y = df_encoded.drop(['record_name', 'signal', 'age', 'sex'], axis=1)

# Split the data into training, validation, and test sets
X_ecg_train, X_ecg_test, X_info_train, X_info_test, y_train, y_test = train_test_split(
    X_ecg, X_info, y, test_size=0.3, random_state=12
)
X_ecg_val, X_ecg_test, X_info_val, X_info_test, y_val, y_test = train_test_split(
    X_ecg_test, X_info_test, y_test, test_size=0.5, random_state=12
)

# Apply feature scaling (if needed) on the ECG signal data
scaler = StandardScaler()
X_ecg_train = np.array([scaler.fit_transform(sample) for sample in X_ecg_train])
X_ecg_val = np.array([scaler.transform(sample) for sample in X_ecg_val])
X_ecg_test = np.array([scaler.transform(sample) for sample in X_ecg_test])

# Check the shapes of the datasets
print("ECG Data Shapes:")
print("Training data:", X_ecg_train.shape)
print("Validation data:", X_ecg_val.shape)
print("Test data:", X_ecg_test.shape)
print("\nInfo Data Shapes:")
print("Training data:", X_info_train.shape)
print("Validation data:", X_info_val.shape)
print("Test data:", X_info_test.shape)
print("\nTarget Labels Shape:")
print("Training labels:", y_train.shape)
print("Validation labels:", y_val.shape)
print("Test labels:", y_test.shape)


## test display

In [None]:
import matplotlib.pyplot as plt
plt.figure(figsize=(15, 8))

# Create a list to store all ECG signals
all_ecgs = []

# Iterate over the 12 leads
for i in range(12):
    single_ecg = []
    for sublist in df_encoded.loc[3,'signal']:  # Get the data for one patient
        single_ecg.append(sublist[i])  # Get data for one lead

    signals, info = nk.ecg_process(single_ecg, sampling_rate=100)
    cleaned_ecg = signals["ECG_Clean"]

    all_ecgs.append(cleaned_ecg)

# Combine all ECG signals into a single 2D array
all_ecgs = np.stack(all_ecgs)

# Plot each ECG lead with a slight vertical shift for better visibility
for i, ecg in enumerate(all_ecgs):
    plt.plot(ecg + i*2)  # Shift each lead by 200 units

plt.title('ECG signals for all 12 leads')
plt.show()


## Remove last beat if incomplete

In [None]:
cpt=0
def clean_heartbeats(row):
    global cpt
    all_leads = []
    for sublist in row['signal']:
        all_leads.append(sublist)  # Get all leads

    # Process lead II to get R-peaks
    single_ecg_lead_II = [sublist[1] for sublist in row['signal']]  # LEAD II
    try:
        signals, info = nk.ecg_process(single_ecg_lead_II, sampling_rate=500)
    except ValueError as e:
        print(f"An error occurred while processing the ECG data for record {row['record_name']}: {e}")
        return single_ecg_lead_II  # Return the original ECG signal if an error occurs

    rpeaks = info["ECG_R_Peaks"]
    # Remove NaN values from rpeaks
    rpeaks = rpeaks[~np.isnan(rpeaks)]
    try:
        heartbeats_II = np.split(single_ecg_lead_II, rpeaks.astype(int))
    except ValueError as e:
        print(f"An error occurred while splitting the ECG data for record {row['record_name']}: {e}")
        return single_ecg_lead_II  # Return the original ECG signal if an error occurs

    # Check if the last heartbeat is incomplete (i.e., significantly shorter than the others)
    average_heartbeat_length = np.mean([len(heartbeat) for heartbeat in heartbeats_II])
    if len(heartbeats_II[-1]) < average_heartbeat_length * 0.75:
        rpeaks = rpeaks[:-1]  # Remove the last R-peak
        cpt += 1

    # Use the R-peaks to segment all leads and remove incomplete heartbeats
    cleaned_ecg_leads = []
    for lead in all_leads:
        try:
            heartbeats = np.split(lead, rpeaks.astype(int))
        except ValueError as e:
            print(f"An error occurred while splitting the ECG data for record {row['record_name']}: {e}")
            return lead  # Return the original ECG signal if an error occurs

        if len(heartbeats[-1]) < average_heartbeat_length * 0.75:
            heartbeats = heartbeats[:-1]  # Remove the last heartbeat if incomplete
        # Concatenate the heartbeats back into a single ECG signal
        cleaned_ecg_lead = np.concatenate(heartbeats)
        cleaned_ecg_leads.append(cleaned_ecg_lead)

    # Stack the cleaned leads back together
    cleaned_ecg = np.stack(cleaned_ecg_leads, axis=-1)
    return cleaned_ecg


# Apply the function to the 'signal' column of the DataFrame
df_encoded_removed_beats = df_encoded
df_encoded_removed_beats['signal'] = df_encoded.apply(clean_heartbeats, axis=1)
print(df_encoded_removed_beats)
print(df_encoded_removed_beats['signal'][0])
print(df_encoded_removed_beats['signal'][0][0])
print(df_encoded_removed_beats['signal'][0][0][0])
print(df_encoded_removed_beats.shape)
print(f"number of incomplete beats patients: {cpt}/{len(df_encoded.index)}")



In [None]:
# Define the sample rate and the frequencies for the notch and low-pass filters
sample_rate = 100
notch_freq = 50
low_pass_freq = 15  # Adjust the low-pass frequency to maintain similar filtering characteristics

def design_filters(sample_rate, notch_freq, low_pass_freq):
    # Design the notch filter
    nyquist = 0.5 * sample_rate
    freq_ratio = notch_freq / nyquist
    notch_filter = iirnotch(freq_ratio, 30)  # 30 is the quality factor of the filter

    # Design the low-pass filter
    low_pass_filter = butter(5, low_pass_freq / nyquist, btype='low')

    return notch_filter, low_pass_filter

# Design the filters
notch_filter, low_pass_filter = design_filters(sample_rate, notch_freq, low_pass_freq)
# Define a function to apply the filters
def apply_filters(data, notch_filter, low_pass_filter):
    # Apply the notch filter
    data = lfilter(*notch_filter, data)

    # Apply the low-pass filter
    data = lfilter(*low_pass_filter, data)

    return data
# Apply the filters to the data
print(df_encoded.shape)
df_encoded['signal'] = df_encoded['signal'].apply(lambda signal: apply_filters(signal, notch_filter, low_pass_filter))
print(df_encoded.shape)


## Remove Outliers

In [None]:

# Compute the mean and standard deviation for each record
means = np.array([np.mean(record) for record in df_encoded['signal']])
std_devs = np.array([np.std(record) for record in df_encoded['signal']])

# Compute the average mean and standard deviation
avg_mean = np.mean(means)
avg_std_dev = np.mean(std_devs)

# Compute the standard deviation of the means and standard deviations
std_mean = np.std(means)
std_std_dev = np.std(std_devs)

# Identify outliers as records where the mean or standard deviation is more than 3 standard deviations from the average
outlier_indices = np.where(
    (np.abs(means - avg_mean) > 3 * std_mean) |
    (np.abs(std_devs - avg_std_dev) > 3 * std_std_dev)
)[0]

# Remove outliers from df
df_cleaned = df_encoded.drop(outlier_indices)
print(df_cleaned.shape)
print(len(df_cleaned['signal'][0]))
print(df_cleaned['signal'][0][0])
#df_cleaned = df_encoded

ResNet


In [None]:
from tensorflow.keras.layers import Input, Conv1D, BatchNormalization, Activation, Add, MaxPooling1D, Flatten, Dense, Dropout
from tensorflow.keras.models import Model

def residual_block(X, filters):
    # Save the input value
    X_shortcut = X

    # First component of main path
    X = Conv1D(filters, kernel_size=8, strides=1, padding="same")(X)
    X = BatchNormalization()(X)
    X = Activation('relu')(X)

    # Second component of main path
    X = Conv1D(filters, kernel_size=5, strides=1, padding="same")(X)
    X = BatchNormalization()(X)

    # Add shortcut value to main path
    X_shortcut = Conv1D(filters, kernel_size=3, strides=1, padding="same")(X_shortcut)
    X_shortcut = BatchNormalization()(X_shortcut)

    X = Add()([X, X_shortcut])
    X = Activation('relu')(X)

    return X

def create_model(input_shape=(5000,12), classes=2):
    # Define the input
    X_input = Input(input_shape)

    # Stage 1
    X = Conv1D(64, kernel_size=7, strides=2)(X_input)
    X = BatchNormalization()(X)
    X = Activation('relu')(X)
    X = MaxPooling1D(pool_size=3, strides=2)(X)

    # Stage 2
    X = residual_block(X, filters=64)

    # Stage 3
    X = residual_block(X, filters=128)

    # Stage 4
    X = residual_block(X, filters=256)

    # Output layer
    X = Flatten()(X)
    X = Dense(classes, activation='sigmoid')(X)

    # Create model
    model = Model(inputs=X_input, outputs=X, name='ResNet')

    return model

# Call the function to create the model
model_resnet = create_model(input_shape=(5000, 12), classes=1)

# Compile the model
model_resnet.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# Print the model summary
model_resnet.summary()

history_resnet = model_resnet.fit(
    X_ecg_train,
    y_train,
    validation_data=(X_ecg_val, y_val),
    epochs=20,  # specify the number of epochs
    batch_size=32,  # specify your batch size
    #class_weight = class_weight
)


In [None]:
from sklearn.metrics import roc_auc_score, classification_report

y_pred = model_resnet.predict(X_ecg_test)  # assuming X_test is your test set
threshold = 0.5
y_pred_binary = (y_pred > threshold).astype(int)
y_test_binary = (y_test > threshold).astype(int)
# ROC AUC
roc_auc = roc_auc_score(y_test_binary, y_pred_binary)
print(f'ROC AUC Score: {roc_auc}')

# Classification Report
target_names = ['NORM', 'DISEASE']
print(classification_report(y_test_binary, y_pred_binary, target_names=target_names))



In [None]:
from sklearn.metrics import confusion_matrix

# Assuming you have the true labels in `y_true` and the predicted labels in `y_pred`


cm = confusion_matrix(y_test_binary, y_pred_binary)

print(cm)


In [None]:
def plot_smooth_loss(history, key='loss', window=5):
    plt.figure(figsize=(16,10))

    val = pd.Series(history.history[key])
    val_smooth = val.rolling(window=window, center=True).mean().fillna(val)
    plt.plot(val_smooth, label='Validation Loss')

    train = pd.Series(history.history[key])
    train_smooth = train.rolling(window=window, center=True).mean().fillna(train)
    plt.plot(train_smooth, label='Training Loss')

    plt.xlabel('Epochs')
    plt.ylabel(key.replace('_',' ').title())
    plt.legend()

    # Find the maximum epoch for the x-axis limit
    max_epoch = max(history.epoch)
    plt.xlim([0, max_epoch])

    # Set the x-axis ticks to integers
    plt.xticks(np.arange(0, max_epoch+1, step=1))
    plt.title('Loss per Epoch')

# Plot the loss
plot_smooth_loss(history_resnet)

def plot_smooth_accuracy(history, key='accuracy', window=5):
    plt.figure(figsize=(16,10))

    val = pd.Series(history.history['val_'+key])
    val_smooth = val.rolling(window=window, center=True).mean().fillna(val)
    plt.plot(val_smooth, label='Validation Accuracy')

    train = pd.Series(history.history[key])
    train_smooth = train.rolling(window=window, center=True).mean().fillna(train)
    plt.plot(train_smooth, label='Training Accuracy')

    plt.xlabel('Epochs')
    plt.ylabel(key.replace('_',' ').title())
    plt.legend()

    # Find the maximum epoch for the x-axis limit
    max_epoch = max(history.epoch)
    plt.xlim([0, max_epoch])

    # Set the x-axis ticks to integers
    plt.xticks(np.arange(0, max_epoch+1, step=1))
    plt.title('Accuracy per Epoch')

# Plot the accuracy
plot_smooth_accuracy(history_resnet)


## Transformer Test

In [None]:
import tensorflow as tf
from tensorflow.keras import layers
from keras.callbacks import ModelCheckpoint, EarlyStopping

def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0):
    # Normalization and Attention
    x = layers.LayerNormalization(epsilon=1e-6)(inputs)
    x = layers.MultiHeadAttention(
        key_dim=head_size, num_heads=num_heads, dropout=dropout
    )(x, x)
    x = layers.Dropout(dropout)(x)
    res = x + inputs

    # Feed Forward Part
    x = layers.LayerNormalization(epsilon=1e-6)(res)
    x = layers.Conv1D(filters=ff_dim, kernel_size=1, activation="relu")(x)
    x = layers.Dropout(dropout)(x)
    x = layers.Conv1D(filters=inputs.shape[-1], kernel_size=1)(x)
    return x + res

inputs = layers.Input(shape=(5000, 12))
x = transformer_encoder(inputs, head_size=64, num_heads=2, ff_dim=512, dropout=0.1)

# The Global Average Pooling layer and output layer
x = layers.GlobalAveragePooling1D()(x)
outputs = layers.Dense(1, activation="sigmoid")(x)

model_tran = tf.keras.Model(inputs=inputs, outputs=outputs)

model_tran.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.0005),
    loss='binary_crossentropy',
    metrics=['accuracy']
)


# Define early stopping
early_stopping = EarlyStopping(monitor='val_loss', patience=10, verbose=1)

# Pass it as a callback to the fit function
history_tran = model_tran.fit(X_ecg_train, y_train, batch_size=32, epochs=100, validation_data=(X_ecg_val, y_val), callbacks=[early_stopping])


In [None]:
def plot_smooth_loss(history, key='loss', window=5):
    plt.figure(figsize=(16,10))

    val = pd.Series(history.history[key])
    val_smooth = val.rolling(window=window, center=True).mean().fillna(val)
    plt.plot(val_smooth, label='Validation Loss')

    train = pd.Series(history.history[key])
    train_smooth = train.rolling(window=window, center=True).mean().fillna(train)
    plt.plot(train_smooth, label='Training Loss')

    plt.xlabel('Epochs')
    plt.ylabel(key.replace('_',' ').title())
    plt.legend()

    # Find the maximum epoch for the x-axis limit
    max_epoch = max(history.epoch)
    plt.xlim([0, max_epoch])

    # Set the x-axis ticks to integers
    plt.xticks(np.arange(0, max_epoch+1, step=1))
    plt.title('Loss per Epoch')

# Plot the loss
plot_smooth_loss(history_tran)

def plot_smooth_accuracy(history, key='accuracy', window=5):
    plt.figure(figsize=(16,10))

    val = pd.Series(history.history['val_'+key])
    val_smooth = val.rolling(window=window, center=True).mean().fillna(val)
    plt.plot(val_smooth, label='Validation Accuracy')

    train = pd.Series(history.history[key])
    train_smooth = train.rolling(window=window, center=True).mean().fillna(train)
    plt.plot(train_smooth, label='Training Accuracy')

    plt.xlabel('Epochs')
    plt.ylabel(key.replace('_',' ').title())
    plt.legend()

    # Find the maximum epoch for the x-axis limit
    max_epoch = max(history.epoch)
    plt.xlim([0, max_epoch])

    # Set the x-axis ticks to integers
    plt.xticks(np.arange(0, max_epoch+1, step=1))
    plt.title('Accuracy per Epoch')

# Plot the accuracy
plot_smooth_accuracy(history_tran)


In [None]:
from sklearn.metrics import confusion_matrix

# Assuming you have the true labels in `y_true` and the predicted labels in `y_pred`


cm = confusion_matrix(y_test_binary, y_pred_binary)

print(cm)


RNN


In [None]:
from tensorflow.keras import layers
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam

def create_model(input_shape=(5000, 12), num_classes=1):
    # Define an input for the series
    input_series = layers.Input(shape=input_shape)

    # Apply LSTM to the series
    x = layers.LSTM(64)(input_series)

    # Add a dense layer
    x = layers.Dense(64, activation="relu")(x)

    # Add a dropout layer for regularization
    x = layers.Dropout(0.5)(x)

    # Add the final classification (output) layer
    outputs = layers.Dense(num_classes, activation="sigmoid")(x)

    # Define the model
    model = Model(inputs=input_series, outputs=outputs)

    # Compile the model
    model.compile(optimizer=Adam(), loss='binary_crossentropy', metrics=['accuracy'])

    return model

# Call the function to create the model
model_rnn = create_model(input_shape=(5000, 12), num_classes=1)

# Print the model summary
model_rnn.summary()

# Then pass the data to the fit method
# Define early stopping
early_stopping = EarlyStopping(monitor='val_loss', patience=10, verbose=1)

# Pass it as a callback to the fit function
history_rnn = model_rnn.fit(X_ecg_train, y_train, batch_size=32, epochs=100, validation_data=(X_ecg_val, y_val), callbacks=[early_stopping])


In [None]:
def plot_smooth_loss(history, key='loss', window=5):
    plt.figure(figsize=(16,10))

    val = pd.Series(history.history['val_'+key])
    val_smooth = val.rolling(window=window, center=True).mean().fillna(val)
    plt.plot(val_smooth, label='Validation Loss')

    train = pd.Series(history.history[key])
    train_smooth = train.rolling(window=window, center=True).mean().fillna(train)
    plt.plot(train_smooth, label='Training Loss')

    plt.xlabel('Epochs')
    plt.ylabel(key.replace('_',' ').title())
    plt.legend()

    # Find the maximum epoch for the x-axis limit
    max_epoch = max(history.epoch)
    plt.xlim([0, max_epoch])

    # Set the x-axis ticks to integers
    plt.xticks(np.arange(0, max_epoch+1, step=1))
    plt.title('Loss per Epoch')

# Plot the loss
plot_smooth_loss(history_rnn)

def plot_smooth_accuracy(history, key='accuracy', window=5):
    plt.figure(figsize=(16,10))

    val = pd.Series(history.history['val_'+key])
    val_smooth = val.rolling(window=window, center=True).mean().fillna(val)
    plt.plot(val_smooth, label='Validation Accuracy')

    train = pd.Series(history.history[key])
    train_smooth = train.rolling(window=window, center=True).mean().fillna(train)
    plt.plot(train_smooth, label='Training Accuracy')

    plt.xlabel('Epochs')
    plt.ylabel(key.replace('_',' ').title())
    plt.legend()

    # Find the maximum epoch for the x-axis limit
    max_epoch = max(history.epoch)
    plt.xlim([0, max_epoch])

    # Set the x-axis ticks to integers
    plt.xticks(np.arange(0, max_epoch+1, step=1))
    plt.title('Accuracy per Epoch')

# Plot the accuracy
plot_smooth_accuracy(history_rnn)


## 10-Fold


In [None]:
from sklearn.model_selection import KFold
import numpy as np
import matplotlib.pyplot as plt

X = df_encoded['signal'].to_numpy()
X = np.array([np.array(x) for x in df_encoded['signal']])

y = df_encoded.drop(['record_name', 'signal', 'age', 'sex'], axis=1)

n_folds = 10
kfold = KFold(n_splits=n_folds, shuffle=True, random_state=42)
scores = []

for train_index, val_index in kfold.split(X):
    X_train, X_val = X[train_index], X[val_index]
    y_train, y_val = y.iloc[train_index], y.iloc[val_index]

    # Define the ECG branch of the model
    ecg_input = layers.Input(shape=(5000, 12))
    ecg_layer = layers.Conv1D(64, 7, activation='relu')(ecg_input)
    ecg_layer = layers.MaxPooling1D(3)(ecg_layer)
    ecg_layer = layers.Conv1D(64, 7, activation='relu')(ecg_layer)
    ecg_layer = layers.MaxPooling1D(3)(ecg_layer)
    ecg_layer = layers.Dropout(0.5)(ecg_layer)
    ecg_layer = layers.Flatten()(ecg_layer)


    # Add a couple of Dense layers
    output = layers.Dense(64, activation='relu')(ecg_layer)
    output = layers.Dropout(0.5)(output)
    output = layers.Dense(y_train.shape[1], activation='sigmoid')(output)

    model_cnn = tf.keras.Model(inputs=[ecg_input], outputs=output)
    model_cnn.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

    model_cnn.fit(X_train, y_train, epochs=10)
    score = model_cnn.evaluate(X_val, y_val)  # Set verbose=0 to not print evaluation results
    scores.append(score)

scores = np.array(scores)
mean_score = np.mean(scores[:, 1])
std_score = np.std(scores[:, 1])

# Compute the 95% confidence interval
confidence_interval = 1.96 * (std_score / np.sqrt(n_folds))
lower_bound = mean_score - confidence_interval
upper_bound = mean_score + confidence_interval

print(f'Mean validation accuracy over {n_folds}-fold cross-validation: {mean_score * 100:.2f}%')
print(f'Standard deviation: {std_score * 100:.2f}%')
print(f'95% confidence interval: ({lower_bound * 100:.2f}%, {upper_bound * 100:.2f}%)')

plt.hist(scores[:, 1], bins=10)
plt.title(f'Histogram of validation accuracy over {n_folds}-fold cross-validation')
plt.xlabel('Validation Accuracy')
plt.ylabel('Count')
plt.show()



In [None]:
plt.boxplot(scores[:, 1])
plt.title('Box plot of accuracies over 10-fold cross-validation')
plt.ylabel('Accuracy')
plt.show()

In [None]:
plt.errorbar(x=[1], y=[mean_score], yerr=[confidence_interval], fmt='o', color='blue', ecolor='red', capsize=5)
plt.title('Mean accuracy with 95% confidence interval')
plt.ylabel('Accuracy')
plt.xticks([])
plt.show()
print(confidence_interval)

## 8 leads

In [None]:
df_cleaned_8 = df_encoded
# Extract only the first eight leads from every ECG signal
df_cleaned_8['signal'] = df_cleaned_4['signal'].apply(lambda signal: signal[:, :8])

X_8 = df_cleaned_8['signal'].to_numpy()
X_8 = np.array([np.array(x) for x in df_cleaned_8['signal']])

y_8 = df_cleaned_8.drop(['record_name', 'signal', 'age', 'sex'], axis=1)

n_folds_8 = 10
kfold_8 = KFold(n_splits=n_folds_8, shuffle=True, random_state=42)
scores_8 = []

for train_index, val_index in kfold_8.split(X_8):
    X_train_8, X_val_8 = X_8[train_index], X_8[val_index]
    y_train_8, y_val_8 = y_8.iloc[train_index], y_8.iloc[val_index]

    # Define the ECG branch of the model
    ecg_input_8 = layers.Input(shape=(5000, 8))  # Adjust to match the number of leads
    ecg_layer_8 = layers.Conv1D(64, 7, activation='relu')(ecg_input_8)
    ecg_layer_8 = layers.MaxPooling1D(3)(ecg_layer_8)
    ecg_layer_8 = layers.Conv1D(64, 7, activation='relu')(ecg_layer_8)
    ecg_layer_8 = layers.MaxPooling1D(3)(ecg_layer_8)
    ecg_layer_8 = layers.Dropout(0.5)(ecg_layer_8)
    ecg_layer_8 = layers.Flatten()(ecg_layer_8)

    # Add a couple of Dense layers
    output_8 = layers.Dense(64, activation='relu')(ecg_layer_8)
    output_8 = layers.Dropout(0.5)(output_8)
    output_8 = layers.Dense(y_train_8.shape[1], activation='sigmoid')(output_8)

    model_cnn_8 = tf.keras.Model(inputs=[ecg_input_8], outputs=output_8)
    model_cnn_8.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

    model_cnn_8.fit(X_train_8, y_train_8, epochs=10)
    score_8 = model_cnn_8.evaluate(X_val_8, y_val_8, verbose=0)  # Set verbose=0 to not print evaluation results
    scores_8.append(score_8)

scores_8 = np.array(scores_8)
mean_score_8 = np.mean(scores_8[:, 1])
std_score_8 = np.std(scores_8[:, 1])

# Compute the 95% confidence interval
confidence_interval_8 = 1.96 * (std_score_8 / np.sqrt(n_folds_8))
lower_bound_8 = mean_score_8 - confidence_interval_8
upper_bound_8 = mean_score_8 + confidence_interval_8

print(f'Mean validation accuracy over {n_folds_8}-fold cross-validation: {mean_score_8 * 100:.2f}%')
print(f'Standard deviation: {std_score_8 * 100:.2f}%')
print(f'95% confidence interval: ({lower_bound_8 * 100:.2f}%, {upper_bound_8 * 100:.2f}%)')

plt.hist(scores_8[:, 1], bins=10)
plt.title(f'Histogram of validation accuracy over {n_folds_8}-fold cross-validation')
plt.xlabel('Validation Accuracy')
plt.ylabel('Count')
plt.show()


In [None]:
plt.boxplot(scores_8[:, 1])
plt.title('Box plot of accuracies over 10-fold cross-validation')
plt.ylabel('Accuracy')
plt.show()

In [None]:
plt.errorbar(x=[1], y=[mean_score_8], yerr=[confidence_interval_8], fmt='o', color='blue', ecolor='red', capsize=5)
plt.title('Mean accuracy with 95% confidence interval')
plt.ylabel('Accuracy')
plt.xticks([])
plt.show()
print(confidence_interval_8)

## 4 leads

In [None]:
df_cleaned_4 = df_encoded
# Extract only the first four leads from every ECG signal
df_cleaned_4['signal'] = df_cleaned_4['signal'].apply(lambda signal: signal[:, :4])

X_4 = df_cleaned_4['signal'].to_numpy()
X_4 = np.array([np.array(x) for x in df_cleaned_4['signal']])

y_4 = df_cleaned_4.drop(['record_name', 'signal', 'age', 'sex'], axis=1)

n_folds_4 = 10
kfold_4 = KFold(n_splits=n_folds_4, shuffle=True, random_state=42)
scores_4 = []

for train_index, val_index in kfold_4.split(X_4):
    X_train_4, X_val_4 = X_4[train_index], X_4[val_index]
    y_train_4, y_val_4 = y_4.iloc[train_index], y_4.iloc[val_index]

    # Define the ECG branch of the model
    ecg_input_4 = layers.Input(shape=(5000, 4))  # Adjust to match the number of leads
    ecg_layer_4 = layers.Conv1D(64, 7, activation='relu')(ecg_input_4)
    ecg_layer_4 = layers.MaxPooling1D(3)(ecg_layer_4)
    ecg_layer_4 = layers.Conv1D(64, 7, activation='relu')(ecg_layer_4)
    ecg_layer_4 = layers.MaxPooling1D(3)(ecg_layer_4)
    ecg_layer_4 = layers.Dropout(0.5)(ecg_layer_4)
    ecg_layer_4 = layers.Flatten()(ecg_layer_4)

    # Add a couple of Dense layers
    output_4 = layers.Dense(64, activation='relu')(ecg_layer_4)
    output_4 = layers.Dropout(0.5)(output_4)
    output_4 = layers.Dense(y_train_4.shape[1], activation='sigmoid')(output_4)

    model_cnn_4 = tf.keras.Model(inputs=[ecg_input_4], outputs=output_4)
    model_cnn_4.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

    model_cnn_4.fit(X_train_4, y_train_4, epochs=10)
    score_4 = model_cnn_4.evaluate(X_val_4, y_val_4, verbose=0)  # Set verbose=0 to not print evaluation results
    scores_4.append(score_4)

scores_4 = np.array(scores_4)
mean_score_4 = np.mean(scores_4[:, 1])
std_score_4 = np.std(scores_4[:, 1])

# Compute the 95% confidence interval
confidence_interval_4 = 1.96 * (std_score_4 / np.sqrt(n_folds_4))
lower_bound_4 = mean_score_4 - confidence_interval_4
upper_bound_4 = mean_score_4 + confidence_interval_4

print(f'Mean validation accuracy over {n_folds_4}-fold cross-validation: {mean_score_4 * 100:.2f}%')
print(f'Standard deviation: {std_score_4 * 100:.2f}%')
print(f'95% confidence interval: ({lower_bound_4 * 100:.2f}%, {upper_bound_4 * 100:.2f}%)')

plt.hist(scores_4[:, 1], bins=10)
plt.title(f'Histogram of validation accuracy over {n_folds_4}-fold cross-validation')
plt.xlabel('Validation Accuracy')
plt.ylabel('Count')
plt.show()


In [None]:
plt.boxplot(scores_4[:, 1])
plt.title('Box plot of accuracies over 10-fold cross-validation')
plt.ylabel('Accuracy')
plt.show()

In [None]:
plt.errorbar(x=[1], y=[mean_score_4], yerr=[confidence_interval_4], fmt='o', color='blue', ecolor='red', capsize=5)
plt.title('Mean accuracy with 95% confidence interval')
plt.ylabel('Accuracy')
plt.xticks([])
plt.show()
print(confidence_interval_4)

## 2 lead (II)

In [None]:
df_cleaned_2 = df_encoded
# Extract only the first two leads from every ECG signal
df_cleaned_2['signal'] = df_cleaned_2['signal'].apply(lambda signal: signal[:, :2])

X_2 = df_cleaned_2['signal'].to_numpy()
X_2 = np.array([np.array(x) for x in df_cleaned_2['signal']])

y_2 = df_cleaned_2.drop(['record_name', 'signal', 'age', 'sex'], axis=1)

n_folds_2 = 10
kfold_2 = KFold(n_splits=n_folds_2, shuffle=True, random_state=42)
scores_2 = []

for train_index, val_index in kfold_2.split(X_2):
    X_train_2, X_val_2 = X_2[train_index], X_2[val_index]
    y_train_2, y_val_2 = y_2.iloc[train_index], y_2.iloc[val_index]

    # Define the ECG branch of the model
    ecg_input_2 = layers.Input(shape=(5000, 2))
    ecg_layer_2 = layers.Conv1D(64, 7, activation='relu')(ecg_input_2)
    ecg_layer_2 = layers.MaxPooling1D(3)(ecg_layer_2)
    ecg_layer_2 = layers.Conv1D(64, 7, activation='relu')(ecg_layer_2)
    ecg_layer_2 = layers.MaxPooling1D(3)(ecg_layer_2)
    ecg_layer_2 = layers.Dropout(0.5)(ecg_layer_2)
    ecg_layer_2 = layers.Flatten()(ecg_layer_2)

    # Add a couple of Dense layers
    output_2 = layers.Dense(64, activation='relu')(ecg_layer_2)
    output_2 = layers.Dropout(0.5)(output_2)
    output_2 = layers.Dense(y_train_2.shape[1], activation='sigmoid')(output_2)

    model_cnn_2 = tf.keras.Model(inputs=[ecg_input_2], outputs=output_2)
    model_cnn_2.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

    model_cnn_2.fit(X_train_2, y_train_2, epochs=10)
    score_2 = model_cnn_2.evaluate(X_val_2, y_val_2, verbose=0)
    scores_2.append(score_2)

scores_2 = np.array(scores_2)
mean_score_2 = np.mean(scores_2[:, 1])
std_score_2 = np.std(scores_2[:, 1])

# Compute the 95% confidence interval
confidence_interval_2 = 1.96 * (std_score_2 / np.sqrt(n_folds_2))
lower_bound_2 = mean_score_2 - confidence_interval_2
upper_bound_2 = mean_score_2 + confidence_interval_2

print(f'Mean validation accuracy over {n_folds_2}-fold cross-validation: {mean_score_2 * 100:.2f}%')
print(f'Standard deviation: {std_score_2 * 100:.2f}%')
print(f'95% confidence interval: ({lower_bound_2 * 100:.2f}%, {upper_bound_2 * 100:.2f}%)')

plt.hist(scores_2[:, 1], bins=10)
plt.title(f'Histogram of validation accuracy over {n_folds_2}-fold cross-validation')
plt.xlabel('Validation Accuracy')
plt.ylabel('Count')
plt.show()


In [None]:
df_cleaned_2 = df_encoded
# Extract only the first two leads from every ECG signal
df_cleaned_2['signal'] = df_cleaned_2['signal'].apply(lambda signal: signal[:, :2])

X_2 = df_cleaned_2['signal'].to_numpy()
X_2 = np.array([np.array(x) for x in df_cleaned_2['signal']])

y_2 = df_cleaned_2.drop(['record_name', 'signal', 'age', 'sex'], axis=1)

n_folds_2 = 10
kfold_2 = KFold(n_splits=n_folds_2, shuffle=True, random_state=42)
scores_2 = []

for train_index, val_index in kfold_2.split(X_2):
    X_train_2, X_val_2 = X_2[train_index], X_2[val_index]
    y_train_2, y_val_2 = y_2.iloc[train_index], y_2.iloc[val_index]

    # Define the ECG branch of the model
    ecg_input_2 = layers.Input(shape=(5000, 2))
    ecg_layer_2 = layers.Conv1D(64, 7, activation='relu')(ecg_input_2)
    ecg_layer_2 = layers.MaxPooling1D(3)(ecg_layer_2)
    ecg_layer_2 = layers.Conv1D(64, 7, activation='relu')(ecg_layer_2)
    ecg_layer_2 = layers.MaxPooling1D(3)(ecg_layer_2)
    ecg_layer_2 = layers.Dropout(0.5)(ecg_layer_2)
    ecg_layer_2 = layers.Flatten()(ecg_layer_2)

    # Add a couple of Dense layers
    output_2 = layers.Dense(64, activation='relu')(ecg_layer_2)
    output_2 = layers.Dropout(0.5)(output_2)
    output_2 = layers.Dense(y_train_2.shape[1], activation='sigmoid')(output_2)

    model_cnn_2 = tf.keras.Model(inputs=[ecg_input_2], outputs=output_2)
    model_cnn_2.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

    model_cnn_2.fit(X_train_2, y_train_2, epochs=10)
    score_2 = model_cnn_2.evaluate(X_val_2, y_val_2, verbose=0)
    scores_2.append(score_2)

scores_2 = np.array(scores_2)
mean_score_2 = np.mean(scores_2[:, 1])
std_score_2 = np.std(scores_2[:, 1])

# Compute the 95% confidence interval
confidence_interval_2 = 1.96 * (std_score_2 / np.sqrt(n_folds_2))
lower_bound_2 = mean_score_2 - confidence_interval_2
upper_bound_2 = mean_score_2 + confidence_interval_2

print(f'Mean validation accuracy over {n_folds_2}-fold cross-validation: {mean_score_2 * 100:.2f}%')
print(f'Standard deviation: {std_score_2 * 100:.2f}%')
print(f'95% confidence interval: ({lower_bound_2 * 100:.2f}%, {upper_bound_2 * 100:.2f}%)')

plt.hist(scores_2[:, 1], bins=10)
plt.title(f'Histogram of validation accuracy over {n_folds_2}-fold cross-validation')
plt.xlabel('Validation Accuracy')
plt.ylabel('Count')
plt.show()


In [None]:
plt.errorbar(x=[1], y=[mean_score_2], yerr=[confidence_interval_2], fmt='o', color='blue', ecolor='red', capsize=5)
plt.title('Mean accuracy with 95% confidence interval')
plt.ylabel('Accuracy')
plt.xticks([])
plt.show()
print(confidence_interval_2)

In [None]:
plt.boxplot(scores_2[:, 1])
plt.title('Box plot of accuracies over 10-fold cross-validation')
plt.ylabel('Accuracy')
plt.show()

## 1 lead (II)

In [None]:
df_cleaned_1 = df_encoded
# Extract only the first lead from every ECG signal
df_cleaned_1['signal'] = df_cleaned_1['signal'].apply(lambda signal: signal[:, :1])

X_1 = df_cleaned_1['signal'].to_numpy()
X_1 = np.array([np.array(x) for x in df_cleaned_1['signal']])

y_1 = df_cleaned_1.drop(['record_name', 'signal', 'age', 'sex'], axis=1)

n_folds_1 = 10
kfold_1 = KFold(n_splits=n_folds_1, shuffle=True, random_state=42)
scores_1 = []

for train_index, val_index in kfold_1.split(X_1):
    X_train_1, X_val_1 = X_1[train_index], X_1[val_index]
    y_train_1, y_val_1 = y_1.iloc[train_index], y_1.iloc[val_index]

    # Define the ECG branch of the model
    ecg_input_1 = layers.Input(shape=(5000, 1))
    ecg_layer_1 = layers.Conv1D(64, 7, activation='relu')(ecg_input_1)
    ecg_layer_1 = layers.MaxPooling1D(3)(ecg_layer_1)
    ecg_layer_1 = layers.Conv1D(64, 7, activation='relu')(ecg_layer_1)
    ecg_layer_1 = layers.MaxPooling1D(3)(ecg_layer_1)
    ecg_layer_1 = layers.Dropout(0.5)(ecg_layer_1)
    ecg_layer_1 = layers.Flatten()(ecg_layer_1)

    # Add a couple of Dense layers
    output_1 = layers.Dense(64, activation='relu')(ecg_layer_1)
    output_1 = layers.Dropout(0.5)(output_1)
    output_1 = layers.Dense(y_train_1.shape[1], activation='sigmoid')(output_1)

    model_cnn_1 = tf.keras.Model(inputs=[ecg_input_1], outputs=output_1)
    model_cnn_1.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

    model_cnn_1.fit(X_train_1, y_train_1, epochs=10)
    score_1 = model_cnn_1.evaluate(X_val_1, y_val_1, verbose=0)
    scores_1.append(score_1)

scores_1 = np.array(scores_1)
mean_score_1 = np.mean(scores_1[:, 1])
std_score_1 = np.std(scores_1[:, 1])

# Compute the 95% confidence interval
confidence_interval_1 = 1.96 * (std_score_1 / np.sqrt(n_folds_1))
lower_bound_1 = mean_score_1 - confidence_interval_1
upper_bound_1 = mean_score_1 + confidence_interval_1

print(f'Mean validation accuracy over {n_folds_1}-fold cross-validation: {mean_score_1 * 100:.2f}%')
print(f'Standard deviation: {std_score_1 * 100:.2f}%')
print(f'95% confidence interval: ({lower_bound_1 * 100:.2f}%, {upper_bound_1 * 100:.2f}%)')

plt.hist(scores_1[:, 1], bins=10)
plt.title(f'Histogram of validation accuracy over {n_folds_1}-fold cross-validation')
plt.xlabel('Validation Accuracy')
plt.ylabel('Count')
plt.show()


In [None]:
plt.errorbar(x=[1], y=[mean_score_1], yerr=[confidence_interval_1], fmt='o', color='blue', ecolor='red', capsize=5)
plt.title('Mean accuracy with 95% confidence interval')
plt.ylabel('Accuracy')
plt.xticks([])
plt.show()
print(confidence_interval_1)

In [None]:
plt.boxplot(scores_1[:, 1])
plt.title('Box plot of accuracies over 10-fold cross-validation')
plt.ylabel('Accuracy')
plt.show()

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

# Extracting just the accuracies (assuming they are the second value in each list)
accuracies_1 = [s[1] for s in scores_1]
accuracies_2 = [s[1] for s in scores_2]
accuracies_4 = [s[1] for s in scores_4]
accuracies_8 = [s[1] for s in scores_8]
accuracies_12 = [s[1] for s in scores]

data = {
    '1-lead': accuracies_1,
    '2-lead': accuracies_2,
    '4-lead': accuracies_4,
    '8-lead': accuracies_8,
    '12-lead': accuracies_12
}

# Convert the dictionary to a format suitable for seaborn
labels, values = [], []
for k, v in data.items():
    labels.extend([k] * len(v))
    values.extend(v)

# Create a DataFrame
import pandas as pd
df = pd.DataFrame({'Lead Configuration': labels, 'Accuracy': values})

# Plot
plt.figure(figsize=(10, 6))
sns.boxplot(x='Lead Configuration', y='Accuracy', data=df, order=['1-lead', '2-lead', '4-lead', '8-lead', '12-lead'])
plt.title('Boxplots of Accuracies for Each Lead Configuration')
plt.show()

