In [1]:
import os
import itertools
import time
import random

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file 
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
import seaborn as sns

from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_recall_curve
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.metrics import accuracy_score, auc, f1_score, precision_score, recall_score


import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, models
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import LearningRateScheduler, Callback
from tensorflow.data import Dataset

In [None]:
mit_train_df = pd.read_csv('../Dataset/mitbih_train.csv', header=None)
mit_test_df = pd.read_csv('../Dataset/mitbih_test.csv', header=None)

mit_df = pd.concat([mit_train_df, mit_test_df], axis=0)

In [None]:
mit_df.rename(columns={187: 'class'}, inplace=True)

In [None]:
labels = {
    0: "Normal",
    1: "Artial Premature",
    2: "Premature ventricular contraction",
    3: "Fusion of ventricular and normal",
    4: "Fusion of paced and normal"
}

mit_df['label'] = mit_df['class'].map(labels)
mit_df.info()

In [None]:
mit_df.to_csv('../Dataset/mitbih.csv', index=False)

# Basic EDA

In [None]:
# Loading Dataset
mit_df = pd.read_csv('../Dataset/mitbih.csv')

label_counts = mit_df['label'].value_counts()
label_percentages = label_counts / label_counts.sum() * 100

ax = label_counts.plot(kind='bar', color=plt.cm.Paired(range(len(label_percentages))), width=0.9)

plt.xlabel('Labels')
plt.ylabel('Frequency')
plt.title('Frequency of Labels in LabelColumn')

# Annotate each bar with its percentage and frequency on top
for i, (percentage, frequency) in enumerate(zip(label_percentages, label_counts)):
    ax.annotate(f'{percentage:.2f}%\n({frequency})', (i, percentage + 0.5), ha='center', va='bottom', fontsize=8)

# Rotate x-axis labels for better visibility
plt.xticks(rotation=20, ha='right')
plt.show()

plt.savefig('data_dist.png', facecolor='w', edgecolor='w', format='png',
        transparent=False, bbox_inches='tight', pad_inches=0.1)
plt.savefig('data_dist.svg', facecolor='w', edgecolor='w', format='svg',
        transparent=False, bbox_inches='tight', pad_inches=0.1)

In [None]:
N = 5
samples = [mit_df.loc[mit_df['class'] == cls].sample(N) for cls in range(N)]
titles = [labels[cls] for cls in range(5)]

with plt.style.context('_classic_test_patch'):
    fig, axs = plt.subplots(3, 2, figsize=(20, 7))
    for i in range(5):
        ax = axs.flat[i]
        ax.plot(samples[i].values[:,:-2].transpose())
        ax.set_title(titles[i])
        #plt.ylabel("Amplitude")

    plt.tight_layout()
    plt.suptitle("ECG Signals", fontsize=20, y=1.05, weight="bold")
    # plt.savefig(f"signals_per_class.svg",
    #                 format="svg",bbox_inches='tight', pad_inches=0.2)
        
    # plt.savefig(f"signals_per_class.png", 
    #                 format="png",bbox_inches='tight', pad_inches=0.2) 

In [None]:
%%time
signals = [' '.join(df_mitbih.iloc[i, :-1].apply(str).values) for i in range(df_mitbih.shape[0])]
y = df_mitbih.iloc[:, -1].values.tolist()
print(len(signals), len(y))

print(f'data has {len(set([sig for line in signals for sig in line.split()]))} out of 16 372 411 unique values.')

## Dataset and DataLoader

In [None]:
class ECGDataset:
    def __init__(self, df):
        self.df = df
        # Assuming the last column is the target 'class' and the second last is not needed
        self.features = df.iloc[:, :-2].values.astype('float32')
        self.labels = df.iloc[:, -1].values.astype('int32')  # Assuming 'class' column is the last one

    def generator(self):
        for features, label in zip(self.features, self.labels):
            yield features, label

def create_tf_dataset(df, batch_size=32):
    dataset_creator = ECGDataset(df)
    return tf.data.Dataset.from_generator(
        dataset_creator.generator,
        output_types=(tf.float32, tf.int32),
        output_shapes=((None,), ())  # Adjust the shape according to your data
    ).batch(batch_size)


In [None]:
def create_tf_dataset(df, batch_size=32):
    dataset_creator = ECGDataset(df)
    return tf.data.Dataset.from_generator(
        dataset_creator.generator,
        output_types=(tf.float32, tf.int32),
        output_shapes=((None,), ())
    ).batch(batch_size)

def get_tf_dataset(phase: str, batch_size: int = 96):
    '''
    Prepare TensorFlow dataset.
    Parameters:
        phase: 'train' or 'validation' to specify the dataset phase.
        batch_size: Number of samples per batch.
    Returns:
        A TensorFlow dataset ready for training or validation.
    '''
    # Assuming config.train_csv_path and config.seed are defined elsewhere
    df = pd.read_csv(config.train_csv_path)
    train_df, val_df = train_test_split(
        df, test_size=0.15, random_state=config.seed, stratify=df['label']
    )
    train_df, val_df = train_df.reset_index(drop=True), val_df.reset_index(drop=True)
    
    # Selecting the appropriate dataframe based on the phase
    df = train_df if phase == 'train' else val_df
    
    # Creating the TensorFlow dataset
    tf_dataset = create_tf_dataset(df, batch_size)
    
    return tf_dataset

# Models

In [None]:
# Define Swish as a custom TensorFlow function
def swish(x):
    return x * tf.sigmoid(x)

# Generate a range of values for plotting
x = np.linspace(-10.0, 10.0, 100)
x_tensor = tf.constant(x, dtype=tf.float32)

# Apply the Swish and ReLU functions
swish_out = swish(x_tensor)
relu_out = tf.nn.relu(x_tensor)

# Plotting
plt.title('Swish function')
plt.plot(x, swish_out.numpy(), label='Swish')  # Convert TensorFlow tensors to numpy for plotting
plt.plot(x, relu_out.numpy(), label='ReLU')
plt.legend()
plt.show()


In [None]:
class ConvNormPool(tf.keras.Model):
    """Conv Skip-connection module in TensorFlow"""
    def __init__(self, input_size, hidden_size, kernel_size, norm_type='batchnorm'):
        super().__init__()
        self.kernel_size = kernel_size
        self.conv_1 = layers.Conv1D(hidden_size, kernel_size, padding='valid')
        self.conv_2 = layers.Conv1D(hidden_size, kernel_size, padding='valid')
        self.conv_3 = layers.Conv1D(hidden_size, kernel_size, padding='valid')
        self.swish_1 = layers.Activation(swish)
        self.swish_2 = layers.Activation(swish)
        self.swish_3 = layers.Activation(swish)

        if norm_type == 'group':
            self.normalization_1 = layers.LayerNormalization(axis=-1) 
            self.normalization_2 = layers.LayerNormalization(axis=-1)
            self.normalization_3 = layers.LayerNormalization(axis=-1)
        else:
            self.normalization_1 = layers.BatchNormalization()
            self.normalization_2 = layers.BatchNormalization()
            self.normalization_3 = layers.BatchNormalization()

        self.pool = layers.MaxPooling1D(pool_size=2)

    def call(self, inputs):
        x = self.conv_1(inputs)
        x = self.normalization_1(x)
        x = self.swish_1(x)
        x = tf.pad(x, paddings=[[0, 0], [self.kernel_size - 1, 0], [0, 0]], mode='CONSTANT')

        x = self.conv_2(x)
        x = self.normalization_2(x)
        x = self.swish_2(x)
        x = tf.pad(x, paddings=[[0, 0], [self.kernel_size - 1, 0], [0, 0]], mode='CONSTANT')

        conv3 = self.conv_3(x)
        x = self.normalization_3(conv3)
        x = self.swish_3(x)
        x = tf.pad(x, paddings=[[0, 0], [self.kernel_size - 1, 0], [0, 0]], mode='CONSTANT')

        x = self.pool(x)
        return x

# Define the Swish function
def swish(x):
    return x * tf.sigmoid(x)


In [None]:
class CNN(tf.keras.Model):
    def __init__(self, input_size=1, hid_size=256, kernel_size=5, num_classes=5):
        super().__init__()
        
        # Define the convolutional blocks
        self.conv1 = ConvNormPool(input_size=input_size, hidden_size=hid_size, kernel_size=kernel_size)
        self.conv2 = ConvNormPool(input_size=hid_size, hidden_size=hid_size // 2, kernel_size=kernel_size)
        self.conv3 = ConvNormPool(input_size=hid_size // 2, hidden_size=hid_size // 4, kernel_size=kernel_size)
        
        # Define the adaptive average pooling equivalent in TensorFlow
        self.avgpool = layers.GlobalAveragePooling1D()
        
        # Define the fully connected layer
        self.fc = layers.Dense(num_classes, activation='softmax')
        
    def call(self, inputs):
        x = self.conv1(inputs)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.avgpool(x)
        x = self.fc(x)
        return x


# Training Stage

In [None]:
class Meter:
    def __init__(self, n_classes=5):
        self.metrics = {}
        self.confusion = np.zeros((n_classes, n_classes))
    
    def update(self, x, y, loss):
        x = np.argmax(x.numpy(), axis=1)  # Assuming x is a TensorFlow tensor
        y = y.numpy()  # Assuming y is a TensorFlow tensor
        self.metrics['loss'] += loss.numpy()  # Assuming loss is a TensorFlow tensor or value
        self.metrics['accuracy'] += accuracy_score(x, y)
        self.metrics['f1'] += f1_score(x, y, average='macro')
        self.metrics['precision'] += precision_score(x, y, average='macro', zero_division=1)
        self.metrics['recall'] += recall_score(x, y, average='macro', zero_division=1)
        
        self._compute_cm(x, y)
        
    def _compute_cm(self, x, y):
        for prob, target in zip(x, y):
            if prob == target:
                self.confusion[target][target] += 1
            else:
                self.confusion[target][prob] += 1
    
    def init_metrics(self):
        self.metrics['loss'] = 0
        self.metrics['accuracy'] = 0
        self.metrics['f1'] = 0
        self.metrics['precision'] = 0
        self.metrics['recall'] = 0
        
    def get_metrics(self):
        return self.metrics
    
    def get_confusion_matrix(self):
        return self.confusion


In [None]:
class Trainer:
    def __init__(self, net, lr, batch_size, num_epochs):
        self.net = net
        self.num_epochs = num_epochs
        self.criterion = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
        self.optimizer = tf.keras.optimizers.Adam(learning_rate=lr)
        self.scheduler = tf.keras.optimizers.schedules.CosineDecay(initial_learning_rate=lr, decay_steps=num_epochs, alpha=5e-6)
        self.best_loss = float('inf')
        self.phases = ['train', 'val']
        self.dataloaders = {
            phase: get_tf_dataset(phase, batch_size) for phase in self.phases
        }
        self.train_df_logs = pd.DataFrame()
        self.val_df_logs = pd.DataFrame()

    @tf.function
    def _train_step(self, data, target):
        with tf.GradientTape() as tape:
            output = self.net(data, training=True)
            loss = self.criterion(target, output)
        gradients = tape.gradient(loss, self.net.trainable_variables)
        self.optimizer.apply_gradients(zip(gradients, self.net.trainable_variables))
        return loss

    @tf.function
    def _val_step(self, data, target):
        output = self.net(data, training=False)
        loss = self.criterion(target, output)
        return loss
    
    def _train_epoch(self, phase):
        print(f"{phase} mode | time: {time.strftime('%H:%M:%S')}")
        meter = Meter(n_classes=5)  # Assuming n_classes is defined
        meter.init_metrics()

        if phase == 'train':
            for data, target in self.dataloaders[phase]:
                loss = self._train_step(data, target)
                meter.update(output.numpy(), target.numpy(), loss.numpy())
        else:
            for data, target in self.dataloaders[phase]:
                loss = self._val_step(data, target)
                meter.update(output.numpy(), target.numpy(), loss.numpy())

        metrics = meter.get_metrics()
        metrics = {k: v / len(self.dataloaders[phase]) for k, v in metrics.items()}
        df_logs = pd.DataFrame([metrics])
        confusion_matrix = meter.get_confusion_matrix()

        # Update logs
        if phase == 'train':
            self.train_df_logs = pd.concat([self.train_df_logs, df_logs], axis=0)
        else:
            self.val_df_logs = pd.concat([self.val_df_logs, df_logs], axis=0)

        # Show logs
        print('{}: {}, {}: {}, {}: {}, {}: {}, {}: {}'
              .format(*(x for kv in metrics.items() for x in kv)))

        # Plot confusion matrix
        fig, ax = plt.subplots(figsize=(5, 5))
        cm_ = ax.imshow(confusion_matrix, cmap='hot')
        ax.set_title('Confusion matrix', fontsize=15)
        plt.colorbar(cm_)
        plt.show()

        return loss.numpy()
    
    def run(self):
        for epoch in range(self.num_epochs):
            print(f"Epoch {epoch+1}/{self.num_epochs}")
            self.optimizer.learning_rate = self.scheduler(epoch)
            self._train_epoch(phase='train')
            val_loss = self._train_epoch(phase='val')
            
            if val_loss < self.best_loss:
                self.best_loss = val_loss
                print('\nNew checkpoint\n')
                self.net.save_weights(f"best_model_epoch{epoch}.tf")


In [None]:
model = CNN(num_classes=5, hid_size=128)

In [None]:
trainer = Trainer(net=model, lr=1e-3, batch_size=96, num_epochs=10)
trainer.run()

In [None]:
train_logs = trainer.train_df_logs
train_logs.columns = ["train_"+ colname for colname in train_logs.columns]
val_logs = trainer.val_df_logs
val_logs.columns = ["val_"+ colname for colname in val_logs.columns]

logs = pd.concat([train_logs,val_logs], axis=1)
logs.reset_index(drop=True, inplace=True)
logs = logs.loc[:, [
    'train_loss', 'val_loss', 
    'train_accuracy', 'val_accuracy', 
    'train_f1', 'val_f1',
    'train_precision', 'val_precision',
    'train_recall', 'val_recall']
                                 ]
logs.head()
logs.to_csv('cnn.csv', index=False)

# Experiments and Results

In [None]:
cnn_model = CNN(num_classes=5, hid_size=128)  # Model initialization
cnn_model.load_weights(config.cnn_state_path)  # Load the pretrained weights

logs = pd.read_csv(config.cnn_logs)


In [None]:
color_scheme = ['#C042FF', '#03C576FF', '#FF355A', '#03C5BF', '#96C503', '#C5035B']
plot_palettes = [
    sns.color_palette(color_scheme, n_colors=2),
    sns.color_palette(color_scheme, n_colors=4), 
    sns.color_palette([color_scheme[0], color_scheme[1], color_scheme[-2], color_scheme[-1]] + color_scheme[2:4], n_colors=6)
]

figure, axes = plt.subplots(1, 2, figsize=(12, 4))

sns.lineplot(data=logs.loc[:, ['train_loss', 'val_loss']], palette=plot_palettes[0], markers=True, ax=axes[0], linewidth=2.5)
axes[0].set_title("Model Training Loss", fontsize=14)
axes[0].set_xlabel("Training Epoch", fontsize=14)

sns.lineplot(data=logs.loc[:, ['train_accuracy', 'val_accuracy', 'train_f1', 'val_f1']], palette=plot_palettes[1], markers=True, ax=axes[1], linewidth=2.5, legend="full")
axes[1].set_title("Performance Metrics Through Training", fontsize=15)
axes[1].set_xlabel("Training Epoch", fontsize=14)

plt.suptitle('Performance of the CNN Model', fontsize=18)
plt.tight_layout()

figure.savefig("model_performance.png", format="png", pad_inches=0.2, transparent=False, bbox_inches='tight')
figure.savefig("model_performance.svg", format="svg", pad_inches=0.2, transparent=False, bbox_inches='tight')


In [None]:
model_lstm = RNNModel(input_size=1, hid_size=64, rnn_type='lstm', bidirectional=True).to(device=config.device)
model_lstm.load_weights(config.lstm_state_path)
model_lstm.eval()

training_logs = pd.read_csv(filepath_or_buffer=config.lstm_logs)


In [None]:
custom_colors = ['#C042FF', '#03C576FF', '#FF355A', '#03C5BF', '#96C503', '#C5035B']
plot_colors = [
    sns.color_palette(custom_colors, n_colors=2),
    sns.color_palette(custom_colors, n_colors=4), 
    sns.color_palette(custom_colors[:2] + custom_colors[-2:] + custom_colors[2:4], n_colors=6)
]

figure, axes = plt.subplots(1, 2, figsize=(12, 4))

sns.lineplot(data=logs.iloc[:, 0:2], palette=plot_colors[0], markers=True, ax=axes[0], linewidth=2.5)
axes[0].set_title("Model Training Loss Evolution", fontsize=14)
axes[0].set_xlabel("Training Epochs", fontsize=14)

sns.lineplot(data=logs.iloc[:, 2:6], palette=plot_colors[1], markers=True, ax=axes[1], linewidth=2.5, legend=True)
axes[1].set_title("Training Performance Metrics", fontsize=15)
axes[1].set_xlabel("Training Epochs", fontsize=14)

plt.suptitle('Performance Overview: CNN+LSTM Model', fontsize=18)

plt.tight_layout()

figure.savefig("model_evaluation_cnn_lstm.png", format="png", pad_inches=0.2, transparent=False, bbox_inches='tight')
figure.savefig("model_evaluation_cnn_lstm.svg", format="svg", pad_inches=0.2, transparent=False, bbox_inches='tight')


In [None]:
attention_rnn_model = RNNAttentionModel(input_size=1, hid_size=64, rnn_type='lstm', bidirectional=False)

# Load the pretrained model weights into the attention-based RNN model
attention_rnn_model.load_weights(config.attn_state_path)

# Load the training logs
training_logs = pd.read_csv(config.attn_logs)

In [None]:
unique_colors = ['#C042FF', '#03C576FF', '#FF355A', '#03C5BF', '#96C503', '#C5035B']
visualization_palettes = [
    sns.color_palette(unique_colors, n_colors=2),
    sns.color_palette(unique_colors, n_colors=4), 
    sns.color_palette(unique_colors[:2] + unique_colors[-2:] + unique_colors[2:4], n_colors=6)
]

figure, axes = plt.subplots(1, 2, figsize=(12, 4))

sns.lineplot(data=logs.iloc[:, 0:2], palette=visualization_palettes[0], markers=True, ax=axes[0], linewidth=2.5)
axes[0].set_title("Training Loss Over Time", fontsize=14)
axes[0].set_xlabel("Training Epoch", fontsize=14)

sns.lineplot(data=logs.iloc[:, 2:6], palette=visualization_palettes[1], markers=True, ax=axes[1], linewidth=2.5, legend=True)
axes[1].set_title("Key Training Metrics Evolution", fontsize=15)
axes[1].set_xlabel("Training Epoch", fontsize=14)

plt.suptitle('Evaluation of CNN+LSTM+Attention Model', fontsize=18)
plt.tight_layout()

figure.savefig("model_attn_evaluation.png", format="png", pad_inches=0.2, transparent=False, bbox_inches='tight')
figure.savefig("model_attn_evaluation.svg", format="svg", pad_inches=0.2, transparent=False, bbox_inches='tight')


## Experiments and Results for Test Stage

In [None]:
test_df = pd.read_csv(config.test_csv_path)

test_data = tf.data.Dataset.from_tensor_slices((test_df.iloc[:, :-1].values, test_df.iloc[:, -1].values))

def preprocess(features, label):
    return features, label

test_data = test_data.map(preprocess)
test_data = test_data.batch(96)

In [None]:
def make_test_stage(dataloader, model, probs=False):
    cls_predictions = []
    cls_ground_truths = []

    for data, cls_target in dataloader:
        cls_prediction = model(data, training=False)  # Ensure the model is in inference mode
        
        # If not returning probabilities, extract class with highest probability
        if not probs:
            cls_prediction = tf.argmax(cls_prediction, axis=1)
        
        cls_predictions.append(cls_prediction.numpy())
        cls_ground_truths.append(cls_target.numpy())

    predictions_cls = np.concatenate(cls_predictions)
    ground_truths_cls = np.concatenate(cls_ground_truths)
    
    return predictions_cls, ground_truths_cls


models = [cnn_model, lstm_model, attn_model]

### cnn+lstm model report

In [None]:
y_pred, y_true = make_test_stage(test_dataloader, models[1])
y_pred.shape, y_true.shape

report = pd.DataFrame(classification_report(y_pred,y_true,output_dict=True)).transpose()

In [None]:
chosen_colors = ['#00FA9A', '#D2B48C', '#FF69B4']  # Colors selected for the plot

percentage_report = report.apply(lambda x: x * 100)

figure, plot_ax = plt.subplots(figsize=(13, 4))
percentage_report[["precision", "recall", "f1-score"]].plot(kind='bar',
                                                            color=chosen_colors,
                                                            legend=True,
                                                            fontsize=15,
                                                            ax=plot_ax)

plot_ax.set_xlabel("Classifiers", fontsize=15)
plot_ax.set_ylabel("Percent", fontsize=15)
plot_ax.set_xticklabels(
    [label for label in list(id_to_label.values()) + ["average accuracy", "macro avg", "weighted avg"]],
    rotation=15, fontsize=11)
plt.title("Performance Metrics for the CNN+LSTM Model", fontsize=20)

for values, patch in zip(report[['precision', 'recall', 'f1-score']].values, plot_ax.patches):
    annotation_text = " ".join([f"{round(val * 100, 2)}%" for val in values])

    patch_x = patch.get_x() + patch.get_width() - 0.4
    patch_y = patch.get_y() + patch.get_height() / 4
    plot_ax.annotate(annotation_text, (patch_x, patch_y), fontsize=8, rotation=15, fontweight='bold')

figure.savefig("model_performance_metrics.png", format="png", pad_inches=0.2, transparent=False, bbox_inches='tight')
figure.savefig("model_performance_metrics.svg", format="svg", pad_inches=0.2, transparent=False, bbox_inches='tight')
plt.show()
