Importing Libraries

In [1]:
import torch
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
import math
import numpy as np
from sklearn.metrics import roc_auc_score
import os
from pprint import pprint
from tqdm import tqdm

Setting path variables and dataset size and number of abnormalities to be classified. Also change model_name to the name of the model that is being tested.

In [2]:

model_path='/mnt/Velocity_Vault/ECG/Model/'
memmap_path='/mnt/Velocity_Vault/ECG/Dataset/'

model_name="final_model.pth"

disease_size=7
dataset_size=2184

If the dataset has not been splitted and is going to be entirely used for testing set org=True, if it has been splitted change org=False, also remember to change the respective dataset sizes

In [3]:

org=False

if org:
    signal_name=memmap_path+'ecg_signal'
    feature_name=memmap_path+'features'
    label_name=memmap_path+'labels'
else:
    signal_name=memmap_path+'test_signal'
    feature_name=memmap_path+'test_feat'
    label_name=memmap_path+'test_labels'

In [4]:
ecg_signal=np.memmap(signal_name, dtype='int16', mode='r', shape=(dataset_size, 12,5000))
features=np.memmap(feature_name, dtype='float32', mode='r', shape=(dataset_size, 12))
ground_labels=np.memmap(label_name, dtype='int', mode='r', shape=(dataset_size, disease_size))

# Convert np.memmap arrays to PyTorch tensors
ecg_signal = torch.tensor(ecg_signal, dtype=torch.float32)  # Convert to float32
features = torch.tensor(features, dtype=torch.float32)      # Convert to float32
labels = torch.tensor(ground_labels, dtype=torch.float32)          # Convert to float32 for multi-label classification


Code of the Model

In [5]:
# Define the Noam optimizer (as explained earlier)
class NoamOpt:
    def __init__(self, model, warmup_steps, factor=1, optimizer=None):
        self.model = model
        self.warmup_steps = warmup_steps
        self.factor = factor
        self.optimizer = optimizer if optimizer else optim.Adam(model.parameters(), lr=1e-4, weight_decay=0.0)
        self.step_num = 0

    def rate(self):
        """Calculate the learning rate based on the Noam scheduler."""
        if self.step_num < self.warmup_steps:
            return self.factor * (self.step_num + 1) / self.warmup_steps
        else:
            return self.factor * (self.step_num + 1) ** -0.5

    def step(self):
        """Update the model's parameters."""
        self.step_num += 1
        lr = self.rate()
        for param_group in self.optimizer.param_groups:
            param_group['lr'] = lr
        self.optimizer.step()

    def zero_grad(self):
        """Clear gradients for the optimizer."""
        self.optimizer.zero_grad()


# Define your model architecture (CTN model and Transformer as already defined)
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        #x = x + Variable(self.pe[:, :x.size(1)], requires_grad=False)
        x = x + self.pe[:, :x.size(1)].clone().detach().to(dtype=torch.float32, device=x.device)
        return self.dropout(x)

class Transformer(nn.Module):
    def __init__(self, d_model, nhead, d_ff, num_layers, dropout):
        super(Transformer, self).__init__()
        self.d_model = d_model
        self.h = nhead
        self.d_ff = d_ff
        self.num_layers = num_layers
        
        # Add dropout to the encoder layers
        self.pe = PositionalEncoding(d_model, dropout)

        # Transformer Encoder Layer
        encode_layer = nn.TransformerEncoderLayer(
            d_model=self.d_model, 
            nhead=self.h, 
            dim_feedforward=self.d_ff, 
            dropout=dropout,
            batch_first=True 
        )
        self.transformer_encoder = nn.TransformerEncoder(encode_layer, self.num_layers)

    def forward(self, x):
        # Permute dimensions to match Transformer expectations: (batch_size, seq_len, d_model)
        out = x.permute(0, 2, 1)
        out = self.pe(out)  # Add positional encoding
        out = out.permute(1, 0, 2)  # Permute back for transformer encoder
        out = self.transformer_encoder(out)
        out = out.mean(0)  # Global average pooling over sequence length
        return out


class CTN(nn.Module):
    def __init__(self, d_model, nhead, d_ff, num_layers, dropout_rate, deepfeat_sz, nb_feats, classes):
        super(CTN, self).__init__()
        
        self.encoder = nn.Sequential(
            nn.Conv1d(12, 128, kernel_size=14, stride=3, padding=2, bias=False),
            nn.BatchNorm1d(128),
            nn.ReLU(inplace=True),
            nn.Conv1d(128, 256, kernel_size=14, stride=3, padding=0, bias=False),
            nn.BatchNorm1d(256),
            nn.ReLU(inplace=True),
            nn.Conv1d(256, d_model, kernel_size=10, stride=2, padding=0, bias=False),
            nn.BatchNorm1d(d_model),
            nn.ReLU(inplace=True),
            nn.Conv1d(d_model, d_model, kernel_size=10, stride=2, padding=0, bias=False),
            nn.BatchNorm1d(d_model),
            nn.ReLU(inplace=True),
            nn.Conv1d(d_model, d_model, kernel_size=10, stride=1, padding=0, bias=False),
            nn.BatchNorm1d(d_model),
            nn.ReLU(inplace=True),
            nn.Conv1d(d_model, d_model, kernel_size=10, stride=1, padding=0, bias=False),
            nn.BatchNorm1d(d_model),
            nn.ReLU(inplace=True)
        )
        self.transformer = Transformer(d_model, nhead, d_ff, num_layers, dropout=0.1)
        self.fc1 = nn.Linear(d_model, deepfeat_sz)
        self.fc2 = nn.Linear(deepfeat_sz + nb_feats, len(classes))
        self.dropout = nn.Dropout(dropout_rate)

    def forward(self, x, wide_feats):
        z = self.encoder(x)  # Encoded sequence
        out = self.transformer(z)  # Transformer output
        out = self.dropout(F.relu(self.fc1(out)))
        out = self.fc2(torch.cat([wide_feats, out], dim=1))
        return out




The model is loaded

In [6]:
# 1. Initialize the model again (same as during training)
d_model = 256
nhead = 8
d_ff = 512
num_layers = 8
dropout_rate = 0.1
deepfeat_sz = 64
nb_feats = 12
classes = [0, 1, 2, 3, 4,5,6]  # 5 classes
model = CTN(d_model=d_model, nhead=nhead, d_ff=d_ff, num_layers=num_layers,
            dropout_rate=dropout_rate, deepfeat_sz=deepfeat_sz,
            nb_feats=nb_feats, classes=classes)

# 2. Load the saved model weights (make sure to specify the correct path)
model_save_path = model_path+model_name  # Example: Load the model saved at epoch 5
model.load_state_dict(torch.load(model_save_path))

# 3. Set the model to evaluation mode
model.eval()






CTN(
  (encoder): Sequential(
    (0): Conv1d(12, 128, kernel_size=(14,), stride=(3,), padding=(2,), bias=False)
    (1): BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
    (3): Conv1d(128, 256, kernel_size=(14,), stride=(3,), bias=False)
    (4): BatchNorm1d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): ReLU(inplace=True)
    (6): Conv1d(256, 256, kernel_size=(10,), stride=(2,), bias=False)
    (7): BatchNorm1d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (8): ReLU(inplace=True)
    (9): Conv1d(256, 256, kernel_size=(10,), stride=(2,), bias=False)
    (10): BatchNorm1d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (11): ReLU(inplace=True)
    (12): Conv1d(256, 256, kernel_size=(10,), stride=(1,), bias=False)
    (13): BatchNorm1d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (14): ReLU(inplace=True)
    (15): Co

The predictions are made and sigmoid functions are converted into boolean values so that they can be compared to the ground truths

In [7]:
test_dataset = TensorDataset(ecg_signal, features, labels)
test_loader = DataLoader(test_dataset, batch_size=128, shuffle=True,num_workers=0)


# 6. Testing the model
predictions = []
labels_list = []

with torch.no_grad():  # No need to track gradients during testing
    for batch in tqdm(test_loader):
        ecg_signal, wide_feats, labels = batch
        outputs = model(ecg_signal, wide_feats)
        
        # Store the predictions and ground truth
        predictions.append(torch.sigmoid(outputs).cpu().numpy())
        labels_list.append(labels.cpu().numpy())

# Convert predictions and labels to numpy arrays
predictions = np.concatenate(predictions, axis=0)
labels_list = np.concatenate(labels_list, axis=0)




100%|██████████| 18/18 [00:21<00:00,  1.17s/it]


In [8]:
pred_labels=np.array(labels_list,dtype=int)

pprint(ground_labels.shape)
pprint(pred_labels.shape)

(2184, 7)
(2184, 7)


Interpretation of ROC AUC score values:

    0.1 - 0.5 → Worse than Random Guessing
    0.5 → Random guessing (no discrimination)
    0.7 - 0.8 → Fair model
    0.8 - 0.9 → Good model
    0.9 - 1.0 → Excellent model
    1.0 → Perfect model (likely overfitting)

In [9]:
# 7. Calculate ROC AUC score for multi-label classification
roc_auc = roc_auc_score(labels_list, predictions, average='macro', multi_class='ovr')
print(f"Test ROC AUC: {roc_auc:.4f}")

Test ROC AUC: 0.9098
