In [49]:
import pandas as pd
import numpy as np
import os
import wfdb
from pathlib import Path
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.utils import to_categorical
import json
from mealpy.evolutionary_based.GA import BaseGA
from mealpy.evolutionary_based import GA
from mealpy import FloatVar
from mealpy import IntegerVar
import re
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import time
from sklearn.metrics import accuracy_score

In [50]:
file_path = Path('C:/Users/vinay/Downloads/mit-bih-arrhythmia-database-1.0.0/mit-bih-arrhythmia-database-1.0.0')

In [51]:
data_files=[]
annot_files=[]
for file in os.listdir(file_path):
    if('.dat' in file):
        data_files.append(file[:-4])
    elif('.atr' in file):
        annot_files.append(file[:-4])

In [52]:
char_to_int = {}
count = 0 

for file in annot_files:
    path_file = os.path.join(file_path, file)
    annotation = wfdb.rdann(path_file, 'atr') 
    
    for symbol in annotation.symbol:
        if symbol not in char_to_int: 
            char_to_int[symbol] = count
            count += 1 

print(char_to_int)

{'+': 0, 'N': 1, 'A': 2, 'V': 3, '~': 4, '|': 5, 'Q': 6, '/': 7, 'f': 8, 'x': 9, 'F': 10, 'j': 11, 'L': 12, 'a': 13, 'J': 14, 'R': 15, '[': 16, '!': 17, ']': 18, 'E': 19, 'S': 20, '"': 21, 'e': 22}


In [53]:
for file in annot_files:
    path_file = os.path.join(file_path, file)
    annotation = wfdb.rdann(path_file, 'atr')
    unique, counts = np.unique(annotation.symbol, return_counts=True)
    print(file, dict(zip(unique, counts)), counts.sum(), len(annotation.sample))

100 {'+': 1, 'A': 33, 'N': 2239, 'V': 1} 2274 2274
101 {'+': 1, 'A': 3, 'N': 1860, 'Q': 2, '|': 4, '~': 4} 1874 1874
102 {'+': 5, '/': 2028, 'N': 99, 'V': 4, 'f': 56} 2192 2192
103 {'+': 1, 'A': 2, 'N': 2082, '~': 6} 2091 2091
104 {'+': 45, '/': 1380, 'N': 163, 'Q': 18, 'V': 2, 'f': 666, '~': 37} 2311 2311
105 {'+': 1, 'N': 2526, 'Q': 5, 'V': 41, '|': 30, '~': 88} 2691 2691
106 {'+': 41, 'N': 1507, 'V': 520, '~': 30} 2098 2098
107 {'+': 1, '/': 2078, 'V': 59, '~': 2} 2140 2140
108 {'+': 1, 'A': 4, 'F': 2, 'N': 1739, 'V': 17, 'j': 1, 'x': 11, '|': 8, '~': 41} 1824 1824
109 {'+': 1, 'F': 2, 'L': 2492, 'V': 38, '~': 2} 2535 2535
111 {'+': 1, 'L': 2123, 'V': 1, '~': 8} 2133 2133
112 {'+': 1, 'A': 2, 'N': 2537, '~': 10} 2550 2550
113 {'+': 1, 'N': 1789, 'a': 6} 1796 1796
114 {'+': 3, 'A': 10, 'F': 4, 'J': 2, 'N': 1820, 'V': 43, '|': 1, '~': 7} 1890 1890
115 {'+': 1, 'N': 1953, '|': 6, '~': 2} 1962 1962
116 {'+': 1, 'A': 1, 'N': 2302, 'V': 109, '~': 8} 2421 2421
117 {'+': 1, 'A': 1, 'N': 153

In [72]:
all_signals = []
all_labels = []

for i in range(48):
    data, field = wfdb.rdsamp(os.path.join(file_path, data_files[i]))
    data = data[:, 0]
    
    annot = wfdb.rdann(os.path.join(file_path, annot_files[i]), 'atr')
    segmented_signals = [data[max(0, peak - 100):min(len(data), peak + 100)] for peak in annot.sample]
    
    segmented_array = np.array([
        np.pad(signal, (0, 200 - len(signal)), mode='edge') if len(signal) < 200 else signal
        for signal in segmented_signals
    ])
    
    labels = annot.symbol[:len(segmented_array)]  

    all_signals.append(segmented_array)
    all_labels.append(labels)

In [73]:
f_X_train, f_X_test, f_y_train, f_y_test = [], [], [], []
for i in range(48):
    X = np.array(all_signals[i]).reshape(-1, 200)
    y = np.array(all_labels[i]).flatten()
    X_train, X_test, y_train, y_test = train_test_split(
        X,
        y,
        test_size=0.25,
        random_state=42
    )
    f_X_train.append(X_train)
    f_X_test.append(X_test)
    f_y_train.extend(y_train)  
    f_y_test.extend(y_test)

In [71]:
f_X_train = np.vstack(f_X_train).reshape(-1, 200, 1)
f_X_test = np.vstack(f_X_test).reshape(-1, 200, 1)
y_train_int = [char_to_int.get(char, -1) for char in f_y_train]
y_test_int = [char_to_int.get(char, -1) for char in f_y_test]
y_train_int = [y for y in y_train_int if y != -1]
y_test_int = [y for y in y_test_int if y != -1]
num_classes = len(char_to_int)
f_y_train = to_categorical(y_train_int, num_classes=num_classes)
f_y_test = to_categorical(y_test_int, num_classes=num_classes)
print(f"Final Train Shape: {f_X_train.shape}, Test Shape: {f_X_test.shape}")
print(f"Final Train Labels Shape: {f_y_train.shape}, Test Labels Shape: {f_y_test.shape}")

Final Train Shape: (84470, 200, 1), Test Shape: (28177, 200, 1)
Final Train Labels Shape: (84470, 23), Test Labels Shape: (28177, 23)


In [74]:
import numpy as np
from sklearn.utils import resample

# Stack and reshape the data
f_X_train = np.vstack(f_X_train).reshape(-1, 200, 1)
f_X_test = np.vstack(f_X_test).reshape(-1, 200, 1)

# Convert labels to integers
y_train_int = [char_to_int.get(char, -1) for char in f_y_train]
y_test_int = [char_to_int.get(char, -1) for char in f_y_test]
y_train_int = [y for y in y_train_int if y != -1]
y_test_int = [y for y in y_test_int if y != -1]

# One-hot encode the labels
num_classes = len(char_to_int)
f_y_train = to_categorical(y_train_int, num_classes=num_classes)
f_y_test = to_categorical(y_test_int, num_classes=num_classes)

# Print original shapes
print(f"Original Train Shape: {f_X_train.shape}, Test Shape: {f_X_test.shape}")
print(f"Original Train Labels Shape: {f_y_train.shape}, Test Labels Shape: {f_y_test.shape}")

# Reduce dataset size to 1%
def reduce_dataset_size(X, y, fraction=0.01):
    num_samples = int(X.shape[0] * fraction)
    indices = np.random.choice(X.shape[0], num_samples, replace=False)
    return X[indices], y[indices]

# Reduce training and testing data
f_X_train, f_y_train = reduce_dataset_size(f_X_train, f_y_train, fraction=0.25)
f_X_test, f_y_test = reduce_dataset_size(f_X_test, f_y_test, fraction=0.1)

# Print reduced shapes
print(f"Reduced Train Shape: {f_X_train.shape}, Test Shape: {f_X_test.shape}")
print(f"Reduced Train Labels Shape: {f_y_train.shape}, Test Labels Shape: {f_y_test.shape}")

Original Train Shape: (84470, 200, 1), Test Shape: (28177, 200, 1)
Original Train Labels Shape: (84470, 23), Test Labels Shape: (28177, 23)
Reduced Train Shape: (21117, 200, 1), Test Shape: (2817, 200, 1)
Reduced Train Labels Shape: (21117, 23), Test Labels Shape: (2817, 23)


In [57]:
os.makedirs("models", exist_ok=True)
os.makedirs("results", exist_ok=True)
os.makedirs("history", exist_ok=True)

In [58]:
class ECGModel(nn.Module):
    def __init__(self, config, num_classes, input_size=200):  # ✅ Explicit input size
        super(ECGModel, self).__init__()

        # Extract hyperparameters from config
        feature_extractor = config[8]  # CNN (0) or RCNN (1)
        sequence_model = config[9]  # BiLSTM (0) or GRU (1)
        num_cnn_layers = int(config[0])
        num_rnn_layers = int(config[1])
        dropout = config[3]

        # Ensure CNN filters, hidden size, and FC neurons follow power-of-2 convention
        num_filters = max(32, 2 ** (int(config[4])))
        hidden_size = max(64, 2 ** (int(config[10])))
        fc_neurons = max(32, 2 ** (int(config[11])))

        kernel_size = int(config[5])
        stride = int(config[6])

        # ✅ Set initial input size explicitly
        self.initial_input_size = input_size  

        # ✅ Feature Extractor (CNN)
        layers = []
        in_channels = 1  # First CNN layer expects 1D ECG input (batch, 1, time_steps)

        # ✅ Dynamically apply CNN layers and adjust input size
        for i in range(num_cnn_layers):
            out_channels = max(16, num_filters // (2 ** i))  # Reduce channels gradually
            layers.append(nn.Conv1d(in_channels, out_channels, kernel_size=kernel_size, stride=stride, padding=1))
            layers.append(nn.BatchNorm1d(out_channels))  # Batch normalization for stability
            layers.append(nn.ReLU())
            layers.append(nn.Dropout(dropout))  # Dropout for regularization
            in_channels = out_channels  # Update for next CNN layer

            # ✅ Update input size correctly after each CNN layer
            input_size = max(4, (input_size - kernel_size + 2 * 1) // stride + 1)  

        self.feature_extractor = nn.Sequential(*layers)
        self.final_cnn_output_size = input_size  # ✅ Store the shrunk size for RNN input
        self.final_num_filters = out_channels  # ✅ Last CNN layer's channels

        # ✅ If RCNN, apply LSTM immediately after CNN
        if feature_extractor == 1:
            self.rnn = nn.LSTM(input_size=self.final_num_filters, hidden_size=hidden_size, num_layers=num_rnn_layers, batch_first=True, dropout=dropout)
        else:
            self.rnn = None  # No extra LSTM if pure CNN is selected

        # ✅ Sequence Model (RNN: BiLSTM or GRU)
        if sequence_model == 0:
            self.rnn_layer = nn.LSTM(input_size=self.final_num_filters, hidden_size=hidden_size, num_layers=num_rnn_layers, batch_first=True, dropout=dropout)
        else:
            self.rnn_layer = nn.GRU(input_size=self.final_num_filters, hidden_size=hidden_size, num_layers=num_rnn_layers, batch_first=True, dropout=dropout)

        # ✅ Fully Connected Layers
        self.fc = nn.Linear(hidden_size, fc_neurons)
        self.output = nn.Linear(fc_neurons, num_classes)  # Final classification layer

        # ✅ Weight Initialization
        self.apply(self.init_weights)

    def forward(self, x):
        # ✅ Ensure input shape is correct
        assert x.shape[-1] == self.initial_input_size, f"Expected input size {self.initial_input_size}, but got {x.shape[-1]}"

        # ✅ Feature extraction (CNN)
        x = self.feature_extractor(x)  # (batch, channels, time_steps)

        # ✅ Ensure RNN gets correct input size
        assert x.shape[-1] == self.final_cnn_output_size, f"Expected RNN input size {self.final_cnn_output_size}, but got {x.shape[-1]}"

        # ✅ Permute CNN output for RNN (batch, channels, time_steps) → (batch, time_steps, features)
        x = x.permute(0, 2, 1)  

        # ✅ If RCNN (CNN + RNN), process through additional LSTM layer
        if self.rnn is not None:
            x, _ = self.rnn(x)

        # ✅ Process through final sequence model (LSTM/GRU)
        x, _ = self.rnn_layer(x)

        # ✅ Take the last time step's output for classification
        x = self.fc(x[:, -1, :])  
        x = self.output(x)  
        return x

    def init_weights(self, m):
        """ Initialize weights for all layers """
        if isinstance(m, nn.Linear) or isinstance(m, nn.Conv1d):
            nn.init.xavier_uniform_(m.weight)
        elif isinstance(m, nn.LSTM) or isinstance(m, nn.GRU):
            for name, param in m.named_parameters():
                if 'weight' in name:
                    nn.init.xavier_uniform_(param)
                elif 'bias' in name:
                    nn.init.constant_(param, 0)

In [59]:
def format_filename(config):
    """ Convert model config into a valid filename """
    formatted_config = "_".join([f"{x:.4f}" if isinstance(x, float) else str(x) for x in config])
    formatted_config = re.sub(r'[^\w\-_]', '', formatted_config)  # Remove special characters
    return formatted_config

In [60]:
def measure_latency(model, dataloader, device="cpu"):
    """
    Measures average inference latency (in milliseconds) per sample.
    """
    model.eval()
    total_time = 0.0
    num_samples = 0

    with torch.no_grad():
        for inputs, _ in dataloader:  # We only need inputs, not labels
            inputs = inputs.to(device)
            batch_size = inputs.shape[0]
            
            # Start time measurement
            if device == "cuda":
                torch.cuda.synchronize()  # Ensure GPU operations are finished
                start_time = time.time()
            else:
                start_time = time.perf_counter()

            _ = model(inputs)  # Forward pass

            # End time measurement
            if device == "cuda":
                torch.cuda.synchronize()
                end_time = time.time()
            else:
                end_time = time.perf_counter()

            total_time += (end_time - start_time) * 1000  # Convert to milliseconds
            num_samples += batch_size

    return total_time / num_samples

In [61]:
def convert_to_serializable(obj):
    """ Converts NumPy arrays and other non-serializable objects to lists for JSON serialization """
    if isinstance(obj, np.ndarray):
        return obj.tolist()  # Convert NumPy array to a list
    elif isinstance(obj, list):
        return [convert_to_serializable(item) for item in obj]  # Recursively process lists
    return obj  # Return normal values unchanged

In [62]:
def train_and_evaluate(config):
    """
    Train and evaluate the model using real ECG data.
    """
    device = "cuda" if torch.cuda.is_available() else "cpu"
    print(config)
    # Load preprocessed dataset (Ensure you have f_X_train, f_X_test, f_y_train, f_y_test)
    X_train, X_test = torch.tensor(f_X_train, dtype=torch.float32), torch.tensor(f_X_test, dtype=torch.float32)
    y_train, y_test = torch.tensor(f_y_train, dtype=torch.float32), torch.tensor(f_y_test, dtype=torch.float32)

    # Reshape input to match PyTorch format: (batch, channels, time_steps)
    X_train = X_train.permute(0, 2, 1)  # Convert from (batch, time_steps, 1) → (batch, 1, time_steps)
    X_test = X_test.permute(0, 2, 1)

    # Use batch size from config
    batch_size = int(config[7]) if len(config) > 11 else 64
    train_loader = DataLoader(TensorDataset(X_train, y_train), batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(TensorDataset(X_test, y_test), batch_size=batch_size, shuffle=False)

    # Initialize model
    num_classes = f_y_train.shape[1]
    model = ECGModel(config, num_classes).to(device)

    # Define loss function & optimizer with weighted loss for imbalance
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=config[2])

    # Training loop
    num_epochs = 10
    model.train()
    for epoch in range(num_epochs):
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
        
        print(f"Epoch [{epoch+1}/{num_epochs}] - Loss: {running_loss/len(train_loader):.4f}")

    # Evaluate on test set
    model.eval()
    all_preds, all_labels = [], []
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            preds = torch.argmax(outputs, dim=1).cpu().numpy()
            true_labels = torch.argmax(labels, dim=1).cpu().numpy()

            all_preds.extend(preds)
            all_labels.extend(true_labels)

    accuracy = accuracy_score(all_labels, all_preds)

    # Measure real inference latency
    latency = measure_latency(model, test_loader, device)

    # Define fitness score
    fitness = accuracy - (latency / 100)

    # Save model
    model_name = f"model_{format_filename(config)}.pt"
    torch.save(model.state_dict(), f"models/{model_name}")

    # Save results
    serializable_config = convert_to_serializable(config)

    result = {
        "config": serializable_config,
        "accuracy": accuracy,
        "latency": latency,
        "fitness": fitness
    }
    result_filename = f"results/{model_name}.json"
    with open(result_filename, "w") as f:
        json.dump(result, f, indent=4)

    print(f"✅Accuracy: {accuracy:.4f}, Latency: {latency:.2f}ms, Fitness: {fitness:.4f} | Model Saved: {model_name}")
    return fitness

In [63]:
# Define the search space for the GA
bounds = [
    IntegerVar(lb=1, ub=5),         # 0: Number of CNN layers (Integer)
    IntegerVar(lb=1, ub=5),         # 1: Number of RNN layers (Integer)
    FloatVar(lb=0.0001, ub=0.01),   # 2: Learning rate (Float)
    FloatVar(lb=0.1, ub=0.5),       # 3: Dropout rate (Float)
    IntegerVar(lb=5, ub=8),         # 4: Number of filters in CNN (Integer)
    IntegerVar(lb=3, ub=6),         # 5: Kernel size for CNN (Integer)
    IntegerVar(lb=1, ub=3),         # 6: Stride for CNN (Integer)
    IntegerVar(lb=4, ub=7),         # 7: Batch size (Integer)
    IntegerVar(lb=0, ub=1),         # 8: CNN/RCNN (Integer: 0 = CNN, 1 = RCNN)
    IntegerVar(lb=0, ub=1),         # 9: GRU/BiLSTM (Integer: 0 = BiLSTM, 1 = GRU)
    IntegerVar(lb=6, ub=9),         # 10: Hidden size for RNN (Integer)
    IntegerVar(lb=5, ub=8),         # 11: Fully connected layer neurons (Integer)
]

In [75]:
# Define problem dictionary for Mealpy
problem_dict = {
    "obj_func": train_and_evaluate,
    "bounds": bounds,
    "minmax": "max"  # We want to maximize the fitness function
}

# Run Genetic Algorithm optimization
print("\nRunning Genetic Algorithm for Model Optimization...\n")
ga_model = GA.BaseGA(epoch=10, pop_size=5, pc=0.9, pm=0.05)  # 10 generations, 5 models per generation
best_solution = ga_model.solve(problem_dict)

# Extract best architecture and fitness score
best_architecture = best_solution.solution
best_fitness = best_solution.target.fitness

# Save best model details
best_model_info = {
    "best_architecture": best_architecture,
    "best_fitness": best_fitness
}
with open("results/best_model.json", "w") as f:
    json.dump(best_model_info, f, indent=4)

# Print best model
print(f"\n🏆 Best Model Configuration: {best_architecture}, Fitness Score: {best_fitness}")


Running Genetic Algorithm for Model Optimization...

[1.00000000e+00 3.00000000e+00 7.94793848e-03 4.48172803e-01
 7.00000000e+00 3.00000000e+00 1.00000000e+00 7.00000000e+00
 0.00000000e+00 1.00000000e+00 9.00000000e+00 7.00000000e+00]


KeyboardInterrupt: 