In [1]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import wfdb
import torch
import os
from pathlib import Path
from collections import Counter
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.utils import to_categorical
import re
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split, WeightedRandomSampler
from collections import Counter
import random
from mealpy.swarm_based.AO import OriginalAO
from torch.nn.functional import cosine_similarity

In [2]:
file_path = Path('C:/Users/vinay/Downloads/mit-bih-arrhythmia-database-1.0.0/mit-bih-arrhythmia-database-1.0.0')

In [3]:
data_files=[]
annot_files=[]
for file in os.listdir(file_path):
    if('.dat' in file):
        data_files.append(file[:-4])
    elif('.atr' in file):
        annot_files.append(file[:-4])

In [4]:
all_signals = []
all_labels = []

for i in range(48):
    data, field = wfdb.rdsamp(os.path.join(file_path, data_files[i]))
    data = data[:, 0]
    
    annot = wfdb.rdann(os.path.join(file_path, annot_files[i]), 'atr')
    segmented_signals = [data[max(0, peak - 100):min(len(data), peak + 100)] for peak in annot.sample]
    
    segmented_array = np.array([
        np.pad(signal, (0, 200 - len(signal)), mode='edge') if len(signal) < 200 else signal
        for signal in segmented_signals
    ])
    
    labels = annot.symbol[:len(segmented_array)]  

    all_signals.append(segmented_array)
    all_labels.append(labels)

In [5]:
all_signals = np.concatenate(all_signals, axis=0)
all_labels = np.concatenate(all_labels, axis=0)

print(f"Final Signal Shape: {all_signals.shape}")  # (Total Samples, 200)
print(f"Total Labels: {len(all_labels)}")  # Should match number of signals

Final Signal Shape: (112647, 200)
Total Labels: 112647


In [6]:
char_to_int = {}
count = 0 

for file in annot_files:
    path_file = os.path.join(file_path, file)
    annotation = wfdb.rdann(path_file, 'atr') 
    
    for symbol in annotation.symbol:
        if symbol not in char_to_int: 
            char_to_int[symbol] = count
            count += 1 

In [7]:
# Convert symbolic labels to numerical classes
numeric_labels = np.array([char_to_int[label] for label in all_labels if label in char_to_int])

# Remove signals that don't have a mapped label
valid_indices = [i for i, label in enumerate(all_labels) if label in char_to_int]
filtered_signals = all_signals[valid_indices]

print(f"Filtered Signal Shape: {filtered_signals.shape}")
print(f"Filtered Labels Shape: {numeric_labels.shape}")

Filtered Signal Shape: (112647, 200)
Filtered Labels Shape: (112647,)


In [8]:
class ECGDataset(Dataset):
    def __init__(self, signals, labels):
        # Reshape signals to (num_samples, 1, sequence_length) to add channel dim
        self.data = torch.tensor(signals, dtype=torch.float32).unsqueeze(1)  
        self.labels = torch.tensor(labels, dtype=torch.long)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]

# Create dataset
ecg_dataset = ECGDataset(filtered_signals, numeric_labels)

In [10]:
# Split dataset into training (80%), validation (10%), and test (10%)
total_samples = len(ecg_dataset)
train_size = int(0.8 * total_samples)
val_size = int(0.1 * total_samples)
test_size = total_samples - train_size - val_size  # Ensures exact split

train_dataset, val_dataset, test_dataset = random_split(ecg_dataset, [train_size, val_size, test_size])

# Compute class weights for balanced training sampling
label_counts = Counter(numeric_labels)
total_samples = sum(label_counts.values())
class_weights = {label: total_samples / count for label, count in label_counts.items()}

# Assign sample weights based on class distribution
train_labels = [train_dataset[i][1].item() for i in range(len(train_dataset))]  # Extract labels from training set
sample_weights = np.array([class_weights[label] for label in train_labels])

# Convert to torch tensor
sample_weights = torch.tensor(sample_weights, dtype=torch.float32)

# Weighted sampler for balanced training
sampler = WeightedRandomSampler(sample_weights, num_samples=len(sample_weights), replacement=True)

# Create DataLoaders
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, sampler=sampler)  # Balanced training
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)  # No sampler for validation
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)  # No sampler for testing

In [11]:
class HybridModel(nn.Module):
    def __init__(self, config, num_classes):
        super(HybridModel, self).__init__()

        # Extract hyperparameters from config
        feature_extractor = config[8]  # CNN (0) or RCNN (1)
        sequence_model = config[9]  # BiLSTM (0) or GRU (1)
        num_cnn_layers = int(config[0])
        num_rnn_layers = int(config[1])
        dropout = config[3]
        initial_filters = 2 ** int(config[4])  # Convert to power of 2
        initial_kernel = int(config[5])  # Initial kernel size
        stride = int(config[6])
        initial_hidden_size = 2 ** int(config[10])  # Convert to power of 2

        # 🟢 Convolutional Feature Extractor (CNN Layers)
        self.conv_layers = nn.ModuleList()
        num_filters = initial_filters
        kernel_size = initial_kernel
        in_channels = 1  # ECG has 1 channel

        for _ in range(num_cnn_layers):
            kernel_size = max(2, min(kernel_size, in_channels))  # Ensure kernel size is valid
            stride = min(stride, kernel_size)  # Ensure stride is not larger than kernel
            padding = max(0, (kernel_size - stride) // 2)  # Ensure non-negative padding
            
            self.conv_layers.append(nn.Conv1d(in_channels, num_filters, kernel_size, stride=stride, padding=padding))
            self.conv_layers.append(nn.BatchNorm1d(num_filters))
            self.conv_layers.append(nn.ReLU())
            self.conv_layers.append(nn.Dropout(dropout))

            in_channels = num_filters  # Update for next layer
            num_filters = min(256, num_filters * 2)  # Cap filters at 256
            kernel_size = max(3, kernel_size - 1)  # Decrease kernel size

        # 🟢 Handle RCNN (CNN + 1 LSTM/GRU Layer if enabled)
        self.use_rcnn = feature_extractor == 1
        rnn_input_size = in_channels  # Ensure input size matches CNN output

        if self.use_rcnn:
            self.rnn_layers_rcnn = nn.ModuleList()
            hidden_size = initial_hidden_size

            # RCNN should contain ONLY ONE LSTM/GRU layer
            rnn_layer = (nn.LSTM if sequence_model == 0 else nn.GRU)(
                rnn_input_size, hidden_size, bidirectional=True, batch_first=True
            )
            self.rnn_layers_rcnn.append(rnn_layer)
            self.rnn_layers_rcnn.append(nn.Dropout(dropout))

            # Update input size for the next LSTM/GRU layers
            rnn_input_size = hidden_size * 2  # Account for bidirectional RNN

        # 🟢 Sequence Model (BiLSTM or GRU)
        self.rnn_layers = nn.ModuleList()
        hidden_size = initial_hidden_size

        for _ in range(num_rnn_layers):
            rnn_layer = (nn.LSTM if sequence_model == 0 else nn.GRU)(
                rnn_input_size, hidden_size, bidirectional=True, batch_first=True
            )
            self.rnn_layers.append(rnn_layer)
            self.rnn_layers.append(nn.Dropout(dropout))

            # Update input size for next layers
            rnn_input_size = hidden_size * 2  # Account for bidirectional RNN
            hidden_size = max(16, hidden_size // 2)  # Reduce hidden size

        # 🟢 Fully Connected Layers
        self.global_pool = nn.AdaptiveAvgPool1d(1)  # Reduce time dimension to 1
        self.fc = nn.Linear(rnn_input_size, rnn_input_size // 2)  # Use dynamic size
        self.output_layer = nn.Linear(rnn_input_size // 2, num_classes)  # Final layer

        # 🟢 Initialize weights
        self._initialize_weights()

    def forward(self, x):
        # 🟢 CNN Feature Extraction
        for layer in self.conv_layers:
            x = layer(x)

        x = x.permute(0, 2, 1)  # (Batch, TimeSteps, Features)

        # 🟢 Handle RCNN (Reshape CNN output for RNN)
        if self.use_rcnn:            
            for layer in self.rnn_layers_rcnn:
                if isinstance(layer, (nn.LSTM, nn.GRU)):
                    x, _ = layer(x)  # Apply LSTM/GRU
                else:
                    x = layer(x)  # Apply Dropout

        for layer in self.rnn_layers:
            if isinstance(layer, (nn.LSTM, nn.GRU)):
                x, _ = layer(x)  # Get only output
            else:
                x = layer(x)  # Apply Dropout

        # 🟢 Global Pooling & Fully Connected Layers
        x = x.permute(0, 2, 1)  # (Batch, Features, TimeSteps)
        x = self.global_pool(x)  # (Batch, Features, 1)
        x = x.squeeze(-1)  # (Batch, Features)

        x = F.relu(self.fc(x))
        x = self.output_layer(x)

        return x

    def _initialize_weights(self):
        """Initialize weights using Xavier initialization."""
        for m in self.modules():
            if isinstance(m, nn.Conv1d) or isinstance(m, nn.Linear):
                nn.init.xavier_uniform_(m.weight)
                if m.bias is not None:
                    nn.init.zeros_(m.bias)

In [12]:
config = [3, 3, 0.0002, 0.3707, 7, 3, 3, 5, 1, 1, 8]
model = HybridModel(config, 23)  # Your model initialization
model.load_state_dict(torch.load("models/model_30000_30000_00002_03707_70000_30000_30000_50000_10000_10000_80000.pt", weights_only=True))

# Move model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

HybridModel(
  (conv_layers): ModuleList(
    (0): Conv1d(1, 128, kernel_size=(2,), stride=(2,))
    (1): BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): Dropout(p=0.3707, inplace=False)
    (4): Conv1d(128, 256, kernel_size=(3,), stride=(2,))
    (5): BatchNorm1d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (6): ReLU()
    (7): Dropout(p=0.3707, inplace=False)
    (8): Conv1d(256, 256, kernel_size=(3,), stride=(2,))
    (9): BatchNorm1d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (10): ReLU()
    (11): Dropout(p=0.3707, inplace=False)
  )
  (rnn_layers_rcnn): ModuleList(
    (0): GRU(256, 256, batch_first=True, bidirectional=True)
    (1): Dropout(p=0.3707, inplace=False)
  )
  (rnn_layers): ModuleList(
    (0): GRU(512, 256, batch_first=True, bidirectional=True)
    (1): Dropout(p=0.3707, inplace=False)
    (2): GRU(512, 128, batch_first=True, bidirectional=True)
    (3)

In [13]:
class FocalLoss(nn.Module):
    def __init__(self, alpha=None, gamma=2.0, reduction='mean'):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.reduction = reduction
    
    def forward(self, inputs, targets):
        ce_loss = F.cross_entropy(inputs, targets, reduction='none')
        pt = torch.exp(-ce_loss)
        focal_loss = (1 - pt) ** self.gamma * ce_loss

        if self.alpha is not None:
            alpha_factor = self.alpha[targets]
            focal_loss *= alpha_factor
        
        return focal_loss.mean() if self.reduction == 'mean' else focal_loss.sum()

In [14]:
class ContrastiveLoss(nn.Module):
    def __init__(self, margin=1.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin
    
    def forward(self, output1, output2, label):
        euclidean_distance = F.pairwise_distance(output1, output2)
        loss = (1 - label) * torch.pow(euclidean_distance, 2) + \
               label * torch.pow(torch.clamp(self.margin - euclidean_distance, min=0.0), 2)
        return loss.mean()

In [15]:
def hard_negative_mining(model, dataloader, criterion, device):
    model.eval()  
    hard_samples = []
    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)

            loss_per_sample = criterion(outputs, labels)
            loss_per_sample = loss_per_sample.unsqueeze(0) if loss_per_sample.dim() == 0 else loss_per_sample  

            loss_mean = loss_per_sample.mean()
            misclassified = loss_per_sample > loss_mean
            misclassified_indices = misclassified.nonzero(as_tuple=True)[0]

            for i in misclassified_indices:
                hard_samples.append((inputs[i], labels[i]))

    return hard_samples

In [16]:
def train_model(model, train_loader, val_loader, optimizer, criterion, contrastive_loss, num_epochs=20, device='cuda'):
    model.to(device)
    best_val_loss = float('inf')
    checkpoint_path = "best_model.pth"

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs = inputs.view(inputs.shape[0], 1, inputs.shape[-1])
            inputs, labels = inputs.to(device), labels.to(device)
            
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
        
        # Hard Negative Mining every 5 epochs
        if epoch % 5 == 0:
            hard_samples = hard_negative_mining(model, train_loader, criterion, device)
            if hard_samples:
                model.train()
                for input_hard, label_hard in hard_samples:
                    input_hard, label_hard = input_hard.unsqueeze(0).to(device), torch.tensor([label_hard], device=device)
                    optimizer.zero_grad()
                    output_hard = model(input_hard)
                    hard_loss = criterion(output_hard, label_hard)
                    hard_loss.backward()
                    optimizer.step()

        # Validation Step
        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for val_inputs, val_labels in val_loader:
                val_inputs, val_labels = val_inputs.to(device), val_labels.to(device)
                val_outputs = model(val_inputs)
                loss = criterion(val_outputs, val_labels)
                val_loss += loss.item()

        val_loss /= len(val_loader)
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}, Val Loss: {val_loss:.4f}")

        # Save Best Model
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), checkpoint_path)
            print("Saved best model:", checkpoint_path)

    return model

In [17]:
def aoa_optimizer(model, train_loader, val_loader, device='cuda'):
    def fitness_function(params):
        """ Fitness function for AOA, evaluating hyperparameters """
        learning_rate = params[0]
        batch_size = int(params[1])
        dropout_rate = params[2]

        # Apply dropout rate
        model.apply(lambda m: setattr(m, 'p', dropout_rate) if hasattr(m, 'p') else None)

        # Define loss and optimizer
        criterion = FocalLoss()
        optimizer = optim.Adam(model.parameters(), lr=learning_rate)

        # Train for a few epochs
        trained_model = train_model(model, train_loader, val_loader, optimizer, criterion, None, num_epochs=5, device=device)

        # Compute validation loss
        val_loss = 0.0
        model.eval()
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
        return val_loss / len(val_loader)

    # Define AOA parameters
    problem_dict = {
        "fit_func": fitness_function,
        "lb": [0.0001, 16, 0.2],  # Lower bounds [Learning Rate, Batch Size, Dropout]
        "ub": [0.002, 64, 0.5],   # Upper bounds [Learning Rate, Batch Size, Dropout]
        "minmax": "min",
        "log_to": None
    }

    # Run AOA
    optimizer = OriginalAO(epoch=10, pop_size=10)
    best_params, best_fitness = optimizer.solve(problem_dict)

    print(f"Best Hyperparameters Found: {best_params}, Best Validation Loss: {best_fitness}")

    return best_params

In [22]:
def run_pipeline(model, train_loader, val_loader, aoa_optimizer, device='cuda'):
    config = [3, 3, 0.0002, 0.3707, 7, 3, 3, 5, 1, 1, 8]
    model = HybridModel(config, 23)
    checkpoint_path = "models/model_30000_30000_00002_03707_70000_30000_30000_50000_10000_10000_80000.pt"

    # Load Model if Checkpoint Exists
    if os.path.exists(checkpoint_path):
        model.load_state_dict(torch.load(checkpoint_path, weights_only=True, map_location=device))
        print("Loaded model from checkpoint:", checkpoint_path)

    # Compute Class Weights for Focal Loss
    class_counts = Counter(train_loader.dataset.labels.numpy())
    total_samples = sum(class_counts.values())
    class_weights = {label: total_samples / count for label, count in class_counts.items()}
    alpha = torch.tensor([class_weights[i] for i in range(len(class_counts))], dtype=torch.float32).to(device)
    
    # Define Loss & Optimizer
    criterion = FocalLoss(alpha=alpha)
    contrastive_loss = ContrastiveLoss()
    
    # Use AOA for Hyperparameter Optimization
    print("AO initializing")
    best_hyperparams = aoa_optimizer.optimize()  # Assuming AOA is implemented separately
    lr = best_hyperparams["learning_rate"]
    
    optimizer = optim.Adam(model.parameters(), lr=lr)
    
    # Train the Model
    trained_model = train_model(model, train_loader, val_loader, optimizer, criterion, contrastive_loss, num_epochs=40, device=device)
    
    # Save the Best Model
    torch.save(trained_model.state_dict(), checkpoint_path)
    print("Saved best model:", checkpoint_path)

    # Load Best Model & Set to Eval for Testing
    model.load_state_dict(torch.load(checkpoint_path, map_location=device))
    model.eval()
    
    return model

In [21]:
trained_model = run_pipeline(model, train_loader, val_loader, aoa_optimizer)

Loaded model from checkpoint: models/model_30000_30000_00002_03707_70000_30000_30000_50000_10000_10000_80000.pt


AttributeError: 'Subset' object has no attribute 'labels'