# Final code to build and compare resnet and autoencoder models to predict covid 19 using Electrocardiogram (ECG) data from wearable devices

## Import needed libraries and basic setup

In [4]:
import tensorflow as tf
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import Input, Dense, Conv1D, BatchNormalization, Activation
from tensorflow.keras.layers import Add, GlobalAveragePooling1D, Reshape, Flatten, Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, EarlyStopping
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, confusion_matrix

In [5]:
# Set random seed for reproducibility
tf.random.set_seed(42)
np.random.seed(42)

## Functions to load and preprocess data

In [7]:
# Load and preprocess data
def load_data(file_path):
    """
    Load ECG data from CSV file
    
    Args:
        file_path: Path to the CSV file
        
    Returns:
        X: ECG signals
        y: Labels
    """
    df = pd.read_csv(file_path, header=None)
    
    # Assuming the last column contains the labels
    X = df.iloc[:, :-1].values
    y = df.iloc[:, -1].values
    
    return X, y

In [8]:
# Data preprocessing
def preprocess_data(X, y, X_test, y_test, validation_size=0.2):
    """
    Preprocess data: normalize signals and split into train/validation/test sets
    
    Args:
        X: ECG signals
        y: Labels
        X_test: ECG signals for testing
        y_test: Test labels
        test_size: Proportion of data for test set
        validation_size: Proportion of training data for validation set
        
    Returns:
        X_train, X_val, X_test: Preprocessed signal data
        y_train, y_val, y_test: Labels
    """
    # Normalize the data
    scaler = StandardScaler()
    X = scaler.fit_transform(X)
    
    # Split into train and validation
    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=validation_size, 
                                                        random_state=42, stratify=y)
    
    # Reshape for Conv1D layers: (batch_size, signal_length, channels)
    X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], 1)
    X_val = X_val.reshape(X_val.shape[0], X_val.shape[1], 1)
    X_test = X_test.reshape(X_test.shape[0], X_test.shape[1], 1)
    
    # Convert labels to categorical (one-hot encoding)
    y_train = tf.keras.utils.to_categorical(y_train, NUM_CLASSES)
    y_val = tf.keras.utils.to_categorical(y_val, NUM_CLASSES)
    y_test = tf.keras.utils.to_categorical(y_test, NUM_CLASSES)
    
    return X_train, X_val, X_test, y_train, y_val, y_test

## Function to build Autoencoder

In [9]:
# 1. Autoencoder Model
def build_autoencoder(input_shape):
    """
    Build an autoencoder for ECG signal classification
    
    Args:
        input_shape: Shape of input signal (signal_length, channels)
        
    Returns:
        encoder: Encoder model
        autoencoder: Full autoencoder model
        classifier: Classifier model using encoded features
    """
    # Encoder
    inputs = Input(shape=input_shape)
    
    # Encoder layers
    x = Conv1D(32, 3, padding='same', activation='relu')(inputs)
    x = BatchNormalization()(x)
    x = Conv1D(16, 3, padding='same', activation='relu')(x)
    x = BatchNormalization()(x)
    x = Conv1D(8, 3, padding='same', activation='relu')(x)
    
    # Bottleneck layer
    encoded = Conv1D(4, 3, padding='same', activation='relu', name='encoded')(x)
    
    # Decoder layers
    x = Conv1D(8, 3, padding='same', activation='relu')(encoded)
    x = BatchNormalization()(x)
    x = Conv1D(16, 3, padding='same', activation='relu')(x)
    x = BatchNormalization()(x)
    x = Conv1D(32, 3, padding='same', activation='relu')(x)
    decoded = Conv1D(1, 3, padding='same', activation='linear')(x)
    
    # Define autoencoder model
    autoencoder = Model(inputs, decoded, name='autoencoder')
    
    # Define encoder model
    encoder = Model(inputs, encoded, name='encoder')
    
    # Build classifier on top of encoded features
    encoded_inputs = Input(shape=(input_shape[0], 4))
    c = Flatten()(encoded_inputs)
    c = Dense(64, activation='relu')(c)
    c = Dropout(0.5)(c)
    c = Dense(32, activation='relu')(c)
    c = Dropout(0.3)(c)
    outputs = Dense(NUM_CLASSES, activation='softmax')(c)
    
    classifier = Model(encoded_inputs, outputs, name='classifier')
    
    # Combined model for fine-tuning
    encoded_features = encoder(inputs)
    predictions = classifier(encoded_features)
    combined_model = Model(inputs, predictions, name='autoencoder_classifier')
    
    return encoder, autoencoder, classifier, combined_model

## Function to build RESNET

In [10]:
# 2. ResNet Model
def residual_block(x, filters, kernel_size=3, stride=1):
    """
    Create a residual block for ResNet
    
    Args:
        x: Input tensor
        filters: Number of filters for Conv1D layers
        kernel_size: Size of the kernel
        stride: Stride for the first Conv1D layer
        
    Returns:
        Output tensor for the residual block
    """
    shortcut = x
    
    # First convolution layer
    x = Conv1D(filters, kernel_size, strides=stride, padding='same')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    
    # Second convolution layer
    x = Conv1D(filters, kernel_size, padding='same')(x)
    x = BatchNormalization()(x)
    
    # Shortcut connection (identity mapping or projection)
    if stride != 1 or shortcut.shape[-1] != filters:
        shortcut = Conv1D(filters, 1, strides=stride, padding='same')(shortcut)
        shortcut = BatchNormalization()(shortcut)
    
    # Add shortcut to output
    x = Add()([x, shortcut])
    x = Activation('relu')(x)
    
    return x

In [11]:
def build_resnet(input_shape):
    """
    Build a ResNet model for ECG signal classification
    
    Args:
        input_shape: Shape of input signal (signal_length, channels)
        
    Returns:
        model: ResNet model
    """
    inputs = Input(shape=input_shape)
    
    # Initial convolution
    x = Conv1D(64, 7, strides=2, padding='same')(inputs)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    
    # Residual blocks
    x = residual_block(x, 64)
    x = residual_block(x, 64)
    
    x = residual_block(x, 128, stride=2)
    x = residual_block(x, 128)
    
    x = residual_block(x, 256, stride=2)
    x = residual_block(x, 256)
    
    # Global average pooling and classifier
    x = GlobalAveragePooling1D()(x)
    x = Dense(128, activation='relu')(x)
    x = Dropout(0.5)(x)
    outputs = Dense(NUM_CLASSES, activation='softmax')(x)
    
    model = Model(inputs, outputs, name='resnet')
    
    return model

## Functions for model training and evaluation


In [12]:
# Training procedure
def train_models(X_train, X_val, y_train, y_val, input_shape):
    """
    Train both autoencoder and ResNet models
    
    Args:
        X_train, X_val: Training and validation signals
        y_train, y_val: Training and validation labels
        input_shape: Shape of input signals
        
    Returns:
        trained_models: Dictionary containing trained models and training history
    """
    # Common callbacks
    callbacks = [
        EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True),
        ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=5, min_lr=1e-6)
    ]
    
    # 1. Train Autoencoder
    print("Training Autoencoder...")
    encoder, autoencoder, classifier, combined_model = build_autoencoder(input_shape)
    
    # Step 1: Train autoencoder for reconstruction
    autoencoder.compile(optimizer=Adam(), loss='mse')
    autoencoder_history = autoencoder.fit(
        X_train, X_train,
        epochs=50,
        batch_size=32,
        validation_data=(X_val, X_val),
        callbacks=callbacks,
        verbose=1
    )
    
    # Step 2: Get encoded features
    encoded_train = encoder.predict(X_train)
    encoded_val = encoder.predict(X_val)
    
    # Step 3: Train classifier on encoded features
    classifier.compile(optimizer=Adam(learning_rate=0.001),
                      loss='categorical_crossentropy',
                      metrics=['accuracy'])
    classifier_history = classifier.fit(
        encoded_train, y_train,
        epochs=50,
        batch_size=32,
        validation_data=(encoded_val, y_val),
        callbacks=callbacks,
        verbose=1
    )
    
    # Step 4: Fine-tune the combined model
    combined_model.compile(optimizer=Adam(learning_rate=0.0001),
                          loss='categorical_crossentropy',
                          metrics=['accuracy'])
    combined_history = combined_model.fit(
        X_train, y_train,
        epochs=50,
        batch_size=32,
        validation_data=(X_val, y_val),
        callbacks=callbacks,
        verbose=1
    )
    
    # 2. Train ResNet
    print("Training ResNet...")
    resnet = build_resnet(input_shape)
    resnet.compile(optimizer=Adam(),
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])
    resnet_history = resnet.fit(
        X_train, y_train,
        epochs=50,
        batch_size=32,
        validation_data=(X_val, y_val),
        callbacks=callbacks,
        verbose=1
    )
    
    return {
        'autoencoder': autoencoder,
        'encoder': encoder,
        'classifier': classifier,
        'combined_model': combined_model,
        'resnet': resnet,
        'autoencoder_history': autoencoder_history.history,
        'classifier_history': classifier_history.history,
        'combined_history': combined_history.history,
        'resnet_history': resnet_history.history
    }

In [13]:
# Evaluate models
def evaluate_models(models, X_test, y_test):
    """
    Evaluate models on test data
    
    Args:
        models: Dictionary containing trained models
        X_test: Test signals
        y_test: Test labels
        
    Returns:
        results: Dictionary containing evaluation metrics
    """
    # Evaluate combined autoencoder model
    print("\nEvaluating Autoencoder + Classifier...")
    autoencoder_loss, autoencoder_acc = models['combined_model'].evaluate(X_test, y_test)
    print(f"Autoencoder + Classifier - Test Accuracy: {autoencoder_acc:.4f}")
    
    # Evaluate ResNet
    print("\nEvaluating ResNet...")
    resnet_loss, resnet_acc = models['resnet'].evaluate(X_test, y_test)
    print(f"ResNet - Test Accuracy: {resnet_acc:.4f}")
    
    # Generate predictions
    autoencoder_preds = models['combined_model'].predict(X_test)
    resnet_preds = models['resnet'].predict(X_test)
    
    # Convert from one-hot to class indices
    y_true = np.argmax(y_test, axis=1)
    autoencoder_pred_classes = np.argmax(autoencoder_preds, axis=1)
    resnet_pred_classes = np.argmax(resnet_preds, axis=1)
    
    # Classification reports
    autoencoder_report = classification_report(y_true, autoencoder_pred_classes, 
                                              target_names=['Normal', 'Abnormal', 'History of MI', 'MI', 'COVID-19'])
    resnet_report = classification_report(y_true, resnet_pred_classes,
                                         target_names=['Normal', 'Abnormal', 'History of MI', 'MI', 'COVID-19'])
    
    # Confusion matrices
    autoencoder_cm = confusion_matrix(y_true, autoencoder_pred_classes)
    resnet_cm = confusion_matrix(y_true, resnet_pred_classes)
    
    return {
        'autoencoder_accuracy': autoencoder_acc,
        'resnet_accuracy': resnet_acc,
        'autoencoder_report': autoencoder_report,
        'resnet_report': resnet_report,
        'autoencoder_cm': autoencoder_cm,
        'resnet_cm': resnet_cm
    }

In [14]:
# Plot training history
def plot_training_history(history_dict):
    """
    Plot training and validation metrics
    
    Args:
        history_dict: Dictionary containing training history
    """
    # Plot autoencoder reconstruction loss
    plt.figure(figsize=(12, 4))
    plt.subplot(1, 2, 1)
    plt.plot(history_dict['autoencoder_history']['loss'], label='Train')
    plt.plot(history_dict['autoencoder_history']['val_loss'], label='Validation')
    plt.title('Autoencoder Reconstruction Loss')
    plt.xlabel('Epoch')
    plt.ylabel('MSE Loss')
    plt.legend()
    
    # Plot classifier accuracy
    plt.subplot(1, 2, 2)
    plt.plot(history_dict['classifier_history']['accuracy'], label='Train')
    plt.plot(history_dict['classifier_history']['val_accuracy'], label='Validation')
    plt.title('Classifier Accuracy (on encoded features)')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.tight_layout()
    plt.show()
    
    # Plot combined model and ResNet accuracy
    plt.figure(figsize=(12, 4))
    plt.subplot(1, 2, 1)
    plt.plot(history_dict['combined_history']['accuracy'], label='Train')
    plt.plot(history_dict['combined_history']['val_accuracy'], label='Validation')
    plt.title('Autoencoder + Classifier Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    
    plt.subplot(1, 2, 2)
    plt.plot(history_dict['resnet_history']['accuracy'], label='Train')
    plt.plot(history_dict['resnet_history']['val_accuracy'], label='Validation')
    plt.title('ResNet Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.tight_layout()
    plt.show()

In [15]:
# Plot confusion matrices
def plot_confusion_matrices(results):
    """
    Plot confusion matrices for model evaluation
    
    Args:
        results: Dictionary containing evaluation results
    """
    class_names = ['Normal', 'Abnormal', 'History of MI', 'MI', 'COVID-19']
    
    fig, axes = plt.subplots(1, 2, figsize=(12, 5))
    
    # Autoencoder confusion matrix
    im = axes[0].imshow(results['autoencoder_cm'], interpolation='nearest', cmap=plt.cm.Blues)
    axes[0].set_title('Autoencoder + Classifier')
    fig.colorbar(im, ax=axes[0])
    
    # Set tick labels
    axes[0].set_xticks(np.arange(len(class_names)))
    axes[0].set_yticks(np.arange(len(class_names)))
    axes[0].set_xticklabels(class_names)
    axes[0].set_yticklabels(class_names)
    
    # Rotate x tick labels
    plt.setp(axes[0].get_xticklabels(), rotation=45, ha="right", rotation_mode="anchor")
    
    # Add text annotations
    for i in range(len(class_names)):
        for j in range(len(class_names)):
            axes[0].text(j, i, results['autoencoder_cm'][i, j],
                        ha="center", va="center", color="white" if results['autoencoder_cm'][i, j] > results['autoencoder_cm'].max() / 2 else "black")
    
    # ResNet confusion matrix
    im = axes[1].imshow(results['resnet_cm'], interpolation='nearest', cmap=plt.cm.Blues)
    axes[1].set_title('ResNet')
    fig.colorbar(im, ax=axes[1])
    
    # Set tick labels
    axes[1].set_xticks(np.arange(len(class_names)))
    axes[1].set_yticks(np.arange(len(class_names)))
    axes[1].set_xticklabels(class_names)
    axes[1].set_yticklabels(class_names)
    
    # Rotate x tick labels
    plt.setp(axes[1].get_xticklabels(), rotation=45, ha="right", rotation_mode="anchor")
    
    # Add text annotations
    for i in range(len(class_names)):
        for j in range(len(class_names)):
            axes[1].text(j, i, results['resnet_cm'][i, j],
                        ha="center", va="center", color="white" if results['resnet_cm'][i, j] > results['resnet_cm'].max() / 2 else "black")
    
    axes[0].set_ylabel('True Label')
    axes[0].set_xlabel('Predicted Label')
    axes[1].set_xlabel('Predicted Label')
    
    plt.tight_layout()
    plt.show()

In [16]:
# Compare model performances
def compare_models(results):
    """
    Compare model performances and print summary
    
    Args:
        results: Dictionary containing evaluation metrics
    """
    print("\n" + "="*50)
    print("MODEL COMPARISON")
    print("="*50)
    print(f"Autoencoder + Classifier Accuracy: {results['autoencoder_accuracy']:.4f}")
    print(f"ResNet Accuracy: {results['resnet_accuracy']:.4f}")
    print("\nAutoencoder + Classifier Classification Report:")
    print(results['autoencoder_report'])
    print("\nResNet Classification Report:")
    print(results['resnet_report'])
    
    # Determine best model
    if results['autoencoder_accuracy'] > results['resnet_accuracy']:
        print("\n=> Autoencoder + Classifier outperformed ResNet")
    elif results['resnet_accuracy'] > results['autoencoder_accuracy']:
        print("\n=> ResNet outperformed Autoencoder + Classifier")
    else:
        print("\n=> Both models performed equally well")

In [17]:
# 1. Load data
print("Loading and preprocessing data...")
train_path = '/kaggle/input/ecg-heartbeat-covid-19/heartbeat_125_lead_ii_train_validation_dataset.csv'
test_path = '/kaggle/input/ecg-heartbeat-covid-19/heartbeat_125_lead_ii_test_dataset.csv'
X, y = load_data(train_path)
X_test, y_test = load_data(test_path)

Loading and preprocessing data...


In [21]:
# Define parameters
SIGNAL_LENGTH = X.shape[1]
NUM_CLASSES = 5      # 0: Normal, 1: Abnormal, 2: History of MI, 3: MI, 4: COVID-19

In [22]:
# 2. Preprocess data
X_train, X_val, X_test, y_train, y_val, y_test = preprocess_data(X, y, X_test, y_test)
input_shape = (SIGNAL_LENGTH, 1)

In [None]:
# 3. Train models
print("\nTraining models...")
trained_models = train_models(X_train, X_val, y_train, y_val, input_shape)


Training models...
Training Autoencoder...
Epoch 1/50
[1m39546/39546[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m123s[0m 3ms/step - loss: 0.0102 - val_loss: 5.3538e-04 - learning_rate: 0.0010
Epoch 2/50
[1m39546/39546[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m113s[0m 3ms/step - loss: 7.1291e-04 - val_loss: 5.6770e-04 - learning_rate: 0.0010
Epoch 3/50
[1m39546/39546[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m115s[0m 3ms/step - loss: 4.8966e-04 - val_loss: 6.0520e-04 - learning_rate: 0.0010
Epoch 4/50
[1m39546/39546[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m112s[0m 3ms/step - loss: 3.9631e-04 - val_loss: 7.4586e-04 - learning_rate: 0.0010
Epoch 5/50
[1m39546/39546[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m112s[0m 3ms/step - loss: 3.4140e-04 - val_loss: 5.7587e-04 - learning_rate: 0.0010
Epoch 6/50
[1m39546/39546[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m112s[0m 3ms/step - loss: 2.9335e-04 - val_loss: 5.7824e-04 - learning_rate: 0.0010
Epoch 7/50


In [None]:
# 4. Evaluate models
print("\nEvaluating models...")
evaluation_results = evaluate_models(trained_models, X_test, y_test)

In [None]:
# 5. Plot results
print("\nPlotting training history...")
plot_training_history(trained_models)

In [None]:
print("\nPlotting confusion matrices...")
plot_confusion_matrices(evaluation_results)

In [None]:
# 6. Compare models
compare_models(evaluation_results)