Setup (imports, seeds, config)

In [1]:
import os
import random
import numpy as np
import joblib

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score

# reproducibility
RANDOM_STATE = 42
np.random.seed(RANDOM_STATE)
random.seed(RANDOM_STATE)
torch.manual_seed(RANDOM_STATE)

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", DEVICE)


Device: cpu


Load data & label-encoding

In [2]:
# Chunk 1: Load data and encode labels
NPZ_PATH = "../src/Preprocessed_data/ecg_data.npz"   # adjust if needed
ENCODED_LABELS_OUT = "ecg_labels_encoded.npy"       # saved integer labels
BINARY_LABELS_OUT = "ecg_binary_labels.npy"         # optional binary 0/1 file

data = np.load(NPZ_PATH, allow_pickle=True)
beats = data['beats']   # expected shapes: (N, 90), (N, 90, 1) or (N,1,90)
labels_raw = data['labels']  # could be strings like 'N','V',... or ints

# Make sure string bytes decode properly
if labels_raw.dtype.type is np.bytes_:
    labels_raw = labels_raw.astype(str)

print("Raw beats shape:", beats.shape)
print("Raw labels dtype:", labels_raw.dtype, "unique:", np.unique(labels_raw)[:10])

# 1) Label encode (multi-class)
le = LabelEncoder()
labels_encoded = le.fit_transform(labels_raw)  # integers 0..K-1
np.save(ENCODED_LABELS_OUT, labels_encoded)
print("Saved integer-encoded labels ->", ENCODED_LABELS_OUT)
print("Label classes (index->class):", {i:c for i,c in enumerate(le.classes_)})

# 2) Create binary labels (Normal vs Abnormal) if you want binary task
#    Convention: 'N' means Normal. If your labels are integer-only and you don't know which int==N,
#    use the string labels (labels_raw) or modify map.
binary_labels = np.array([0 if str(l) == 'N' else 1 for l in labels_raw])
np.save(BINARY_LABELS_OUT, binary_labels)
print("Saved binary labels (0=Normal,1=Abnormal) ->", BINARY_LABELS_OUT)
print("Binary counts: normal=", int((binary_labels==0).sum()), "abnormal=", int((binary_labels==1).sum()))


Raw beats shape: (109487, 90, 1)
Raw labels dtype: <U1 unique: ['F' 'N' 'Q' 'S' 'V']
Saved integer-encoded labels -> ecg_labels_encoded.npy
Label classes (index->class): {0: np.str_('F'), 1: np.str_('N'), 2: np.str_('Q'), 3: np.str_('S'), 4: np.str_('V')}
Saved binary labels (0=Normal,1=Abnormal) -> ecg_binary_labels.npy
Binary counts: normal= 90625 abnormal= 18862


Stratified split into Train / Val / Test

In [3]:
# - For binary classification: use `binary_labels`
# - For multiclass classification: use `labels_encoded`

USE_BINARY = True   # set False if you want multiclass
y_all = binary_labels if USE_BINARY else labels_encoded

# Ensure beats shape -> (N, 1, L) for Conv1d
if beats.ndim == 2:
    beats = beats[:, np.newaxis, :]          # (N,1,L)
elif beats.ndim == 3 and beats.shape[-1] == 1:
    beats = np.transpose(beats, (0,2,1))     # (N,1,L)
# else assume already (N,1,L)

print("Prepared beats shape:", beats.shape)

# Split: 60% train / 20% val / 20% test (stratified)
X_trainval, X_test, y_trainval, y_test = train_test_split(
    beats, y_all, test_size=0.20, stratify=y_all, random_state=RANDOM_STATE
)

# from trainval, split 75/25 -> train 60% total, val 20% total
X_train, X_val, y_train, y_val = train_test_split(
    X_trainval, y_trainval, test_size=0.25, stratify=y_trainval, random_state=RANDOM_STATE
)

print("Train / Val / Test shapes:", X_train.shape, X_val.shape, X_test.shape)
print("Train label counts:", np.bincount(y_train))
print("Val   label counts:", np.bincount(y_val))
print("Test  label counts:", np.bincount(y_test))


Prepared beats shape: (109487, 1, 90)
Train / Val / Test shapes: (65691, 1, 90) (21898, 1, 90) (21898, 1, 90)
Train label counts: [54374 11317]
Val   label counts: [18126  3772]
Test  label counts: [18125  3773]


Prepare PyTorch DataLoaders for Autoencoder (train + val only)

In [4]:
# DataLoaders for AE training (only uses beats, no labels)
BATCH_SIZE = 256

X_train_t = torch.tensor(X_train, dtype=torch.float32).to(DEVICE)
X_val_t   = torch.tensor(X_val, dtype=torch.float32).to(DEVICE)

train_ds = TensorDataset(X_train_t)   # AE doesn't need labels
val_ds   = TensorDataset(X_val_t)

train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True)
val_loader   = DataLoader(val_ds, batch_size=BATCH_SIZE, shuffle=False)

print("Train loader batches:", len(train_loader), "Val loader batches:", len(val_loader))


Train loader batches: 257 Val loader batches: 86


Define the 1D Conv Autoencoder (flexible, extractable encoder)

In [10]:
# --- Chunk 4 ---
import torch
import torch.nn as nn

class Conv1dAutoencoder(nn.Module):
    def __init__(self, input_length=90, latent_dim=64):
        super().__init__()
        self.target_len = input_length  # Store for padding/trimming later

        # ===== ENCODER =====
        self.encoder_conv = nn.Sequential(
            nn.Conv1d(1, 16, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv1d(16, 32, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv1d(32, 64, kernel_size=3, stride=2, padding=1),
            nn.ReLU()
        )

        # Determine flattened size dynamically
        with torch.no_grad():
            dummy = torch.zeros(1, 1, input_length)
            enc_out = self.encoder_conv(dummy)
            self._enc_out_shape = enc_out.shape
            flat_dim = enc_out.numel()

        self.fc_enc = nn.Linear(flat_dim, latent_dim)
        self.fc_dec = nn.Linear(latent_dim, flat_dim)

        # ===== DECODER =====
        self.decoder_conv = nn.Sequential(
            nn.ConvTranspose1d(64, 32, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.ReLU(),
            nn.ConvTranspose1d(32, 16, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.ReLU(),
            nn.ConvTranspose1d(16, 1, kernel_size=3, stride=2, padding=1, output_padding=1)
        )

    def encode(self, x):
        x = self.encoder_conv(x)
        x = x.view(x.size(0), -1)
        z = self.fc_enc(x)
        return z

    def decode(self, z):
        x = self.fc_dec(z)
        x = x.view(z.size(0), *self._enc_out_shape[1:])
        x = self.decoder_conv(x)
        # Ensure exact length match
        if x.size(-1) > self.target_len:
            x = x[..., :self.target_len]
        elif x.size(-1) < self.target_len:
            pad_amt = self.target_len - x.size(-1)
            x = torch.nn.functional.pad(x, (0, pad_amt))
        return x

    def forward(self, x):
        z = self.encode(x)
        return self.decode(z)


# Define input length from data
INPUT_LEN = beats.shape[-1]

# Instantiate AE
ae = Conv1dAutoencoder(input_length=INPUT_LEN, latent_dim=64).to(DEVICE)
print("AE instantiated. Latent dim:", 64, "Encoder output shape:", ae._enc_out_shape)


AE instantiated. Latent dim: 64 Encoder output shape: torch.Size([1, 64, 12])


Train the autoencoder (with validation & best-checkpoint save)

In [11]:
# --- Chunk 5 ---
EPOCHS = 30
LR = 1e-3
best_val = float('inf')
optimizer = optim.Adam(ae.parameters(), lr=LR)
criterion = nn.MSELoss()

for epoch in range(1, EPOCHS+1):
    ae.train()
    train_loss = 0.0
    for (xb,) in train_loader:
        optimizer.zero_grad()
        recon = ae(xb)
        loss = criterion(recon, xb)
        loss.backward()
        optimizer.step()
        train_loss += loss.item() * xb.size(0)
    train_loss /= len(train_loader.dataset)

    ae.eval()
    val_loss = 0.0
    with torch.no_grad():
        for (xb,) in val_loader:
            recon = ae(xb)
            val_loss += criterion(recon, xb).item() * xb.size(0)
    val_loss /= len(val_loader.dataset)

    print(f"Epoch {epoch:02d} Train MSE: {train_loss:.6f}  Val MSE: {val_loss:.6f}")
    if val_loss < best_val:
        best_val = val_loss
        torch.save(ae.state_dict(), "best_autoencoder.pth")

ae.load_state_dict(torch.load("best_autoencoder.pth", map_location=DEVICE))


Epoch 01 Train MSE: 0.035765  Val MSE: 0.001683
Epoch 02 Train MSE: 0.000986  Val MSE: 0.000585
Epoch 03 Train MSE: 0.000501  Val MSE: 0.000390
Epoch 04 Train MSE: 0.000380  Val MSE: 0.000477
Epoch 05 Train MSE: 0.000320  Val MSE: 0.000255
Epoch 06 Train MSE: 0.000267  Val MSE: 0.000224
Epoch 07 Train MSE: 0.000246  Val MSE: 0.000267
Epoch 08 Train MSE: 0.000214  Val MSE: 0.000181
Epoch 09 Train MSE: 0.000215  Val MSE: 0.000167
Epoch 10 Train MSE: 0.000183  Val MSE: 0.000902
Epoch 11 Train MSE: 0.000183  Val MSE: 0.000158
Epoch 12 Train MSE: 0.000209  Val MSE: 0.000135
Epoch 13 Train MSE: 0.000139  Val MSE: 0.000129
Epoch 14 Train MSE: 0.000146  Val MSE: 0.000156
Epoch 15 Train MSE: 0.000137  Val MSE: 0.000249
Epoch 16 Train MSE: 0.000174  Val MSE: 0.000089
Epoch 17 Train MSE: 0.000098  Val MSE: 0.000082
Epoch 18 Train MSE: 0.000110  Val MSE: 0.000075
Epoch 19 Train MSE: 0.000100  Val MSE: 0.000068
Epoch 20 Train MSE: 0.000108  Val MSE: 0.000064
Epoch 21 Train MSE: 0.000078  Val MSE: 0

<All keys matched successfully>

Extract latent features for train / val / test

In [12]:
# Extract latents
def extract_latents(model, X_np, batch_size=512):
    model.eval()
    device = next(model.parameters()).device
    ds = TensorDataset(torch.tensor(X_np, dtype=torch.float32))
    loader = DataLoader(ds, batch_size=batch_size, shuffle=False)
    latents = []
    with torch.no_grad():
        for (xb,) in loader:
            xb = xb.to(device)
            z = model.encode(xb)  # shape (B, LATENT_DIM)
            latents.append(z.cpu().numpy())
    return np.vstack(latents)

X_train_latent = extract_latents(ae, X_train)
X_val_latent   = extract_latents(ae, X_val)
X_test_latent  = extract_latents(ae, X_test)

print("Latent shapes:", X_train_latent.shape, X_val_latent.shape, X_test_latent.shape)


Latent shapes: (65691, 64) (21898, 64) (21898, 64)


Scale the latent features (important for many classifiers)

In [13]:
#  Scale features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train_latent)
X_val_scaled   = scaler.transform(X_val_latent)
X_test_scaled  = scaler.transform(X_test_latent)

# Save scaler for later inference
joblib.dump(scaler, "latent_scaler.joblib")
print("Scaler saved -> latent_scaler.joblib")


Scaler saved -> latent_scaler.joblib


Train Random Forest (with simple validation-based tuning)

In [14]:
# Train RandomForest and tune using validation set
import itertools
param_grid = {
    "n_estimators": [100, 200],
    "max_depth": [None, 10, 20],
}
best_rf = None
best_score = -1.0
best_params = None

for n_est, md in itertools.product(param_grid["n_estimators"], param_grid["max_depth"]):
    rf = RandomForestClassifier(n_estimators=n_est, max_depth=md, class_weight='balanced', random_state=RANDOM_STATE, n_jobs=-1)
    rf.fit(X_train_scaled, y_train)   # train on train set
    val_pred = rf.predict(X_val_scaled)
    # choose metric; here F1 (macro) or accuracy — choose what matters (we use f1_score macro for imbalanced)
    from sklearn.metrics import f1_score
    score = f1_score(y_val, val_pred, average='macro')
    print(f"Params: n_est={n_est}, max_depth={md} -> val macro-F1={score:.4f}")
    if score > best_score:
        best_score = score
        best_rf = rf
        best_params = {"n_estimators": n_est, "max_depth": md}

print("Best RF params:", best_params, "val macro-F1:", best_score)
joblib.dump(best_rf, "rf_best.joblib")
print("Saved best RF -> rf_best.joblib")


Params: n_est=100, max_depth=None -> val macro-F1=0.9621
Params: n_est=100, max_depth=10 -> val macro-F1=0.9475
Params: n_est=100, max_depth=20 -> val macro-F1=0.9606
Params: n_est=200, max_depth=None -> val macro-F1=0.9628
Params: n_est=200, max_depth=10 -> val macro-F1=0.9465
Params: n_est=200, max_depth=20 -> val macro-F1=0.9613
Best RF params: {'n_estimators': 200, 'max_depth': None} val macro-F1: 0.9628343066073933
Saved best RF -> rf_best.joblib


Final evaluation on test set (report & metrics)

In [22]:
# Final evaluation on test set
# Load classifier (we saved best_rf)
clf = joblib.load("rf_best.joblib")

y_test_pred = clf.predict(X_test_scaled)
print("Classification report (test):")
print(classification_report(y_test, y_test_pred, digits=4))
print("Confusion matrix:")
print(confusion_matrix(y_test, y_test_pred))

# If binary, compute ROC-AUC
if len(np.unique(y_all)) == 2:
    y_proba = clf.predict_proba(X_test_scaled)[:, 1]
    auc = roc_auc_score(y_test, y_proba)
    print("Test ROC-AUC:", auc)


Classification report (test):
              precision    recall  f1-score   support

           0     0.9773    0.9960    0.9866     18125
           1     0.9790    0.8889    0.9318      3773

    accuracy                         0.9776     21898
   macro avg     0.9782    0.9425    0.9592     21898
weighted avg     0.9776    0.9776    0.9771     21898

Confusion matrix:
[[18053    72]
 [  419  3354]]
Test ROC-AUC: 0.9933185227158484
