### In this study we perform a multiclass analysis combining both datasets - training2017 and ECGData.mat

In [3]:
# ==============================================================
# 1 – Imports + global config
# ==============================================================

import os, math, random
from pathlib import Path
from typing import List, Tuple

import numpy as np
import pandas as pd
from scipy.io import loadmat
from scipy.signal import resample_poly

from sklearn.model_selection import GroupShuffleSplit
from sklearn.metrics import classification_report, confusion_matrix

import tensorflow as tf
from tensorflow.keras.layers import (Input, Conv1D, BatchNormalization, ReLU,
                                     GlobalAveragePooling1D, Dense, Dropout)
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam

SEED = 42
random.seed(SEED); np.random.seed(SEED); tf.random.set_seed(SEED)

# ---- unified sampling rate (both datasets already 300 Hz) ----
FS_TARGET = 300        # Hz
WIN_SEC   = 15         # length of each segment in seconds
WIN_SAMPLES = WIN_SEC * FS_TARGET   # 4 500 samples
STEP      = WIN_SAMPLES // 2        # 50 % overlap


In [4]:
# ==============================================================
# 2 – Shared helpers
# ==============================================================

def zscore(x: np.ndarray) -> np.ndarray:
    return (x - x.mean()) / (x.std() + 1e-7)

def resample_to_target(x: np.ndarray, fs_src: int) -> np.ndarray:
    if fs_src == FS_TARGET:
        return x
    gcd  = math.gcd(fs_src, FS_TARGET)
    up   = FS_TARGET // gcd
    down = fs_src   // gcd
    return resample_poly(x, up, down)

def segment_signal(x: np.ndarray,
                   win: int = WIN_SAMPLES,
                   step: int = STEP) -> List[np.ndarray]:
    if len(x) < win:
        x = np.pad(x, (0, win - len(x)))
    return [x[s:s+win] for s in range(0, len(x)-win+1, step)]


In [5]:
# ==============================================================
# 3 – Load PhysioNet-2017 (4-class) → binary
# ==============================================================

def load_physionet2017_binary(root: str) -> Tuple[List[np.ndarray], List[int], List[str]]:
    """Return signals, labels (0=Normal,1=Arrhythmia), groups(rec-id)."""
    ref = pd.read_csv(Path(root) / "REFERENCE.csv", header=None,
                      names=["record", "label"])
    map4to2 = {"N": 0, "A": 1, "O": 1, "~": 1}
    sigs, ys, gids = [], [], []
    for rec, lbl4 in zip(ref.record, ref.label):
        mat = loadmat(Path(root) / f"{rec}.mat")["val"][0]
        sig = zscore(mat.astype(np.float32))
        sigs.append(resample_to_target(sig, fs_src=300))   # 2017 is 300 Hz
        ys.append(map4to2[lbl4])
        gids.append(rec)                                   # group by record
    return sigs, ys, gids


# ==============================================================
# 3b – Load ECGData.mat subset → binary
# ==============================================================

def load_ecgdata_binary(mat_path: str) -> Tuple[List[np.ndarray], List[int], List[str]]:
    """Map: NSR→0, ARR/CHF→1, others dropped."""
    d      = loadmat(mat_path)["ECGData"][0,0]
    signals = d["Data"]        # (162, 65536)
    labels  = [lbl[0] for lbl in d["Labels"][:,0]]  # to list of strings

    keep_mask = [lbl in ("NSR","ARR","CHF") for lbl in labels]
    signals = signals[keep_mask]
    labels  = np.array(labels)[keep_mask]

    sigs, ys, gids = [], [], []
    for i, (sig_raw, lbl) in enumerate(zip(signals, labels)):
        sig = zscore(sig_raw.astype(np.float32))
        sigs.append(resample_to_target(sig, fs_src=300))   # file documented 300 Hz
        ys.append(0 if lbl=="NSR" else 1)
        gids.append(f"ECGData_{i}")
    return sigs, ys, gids


In [6]:
# ==============================================================
# 4 – Merge, segment, stack
# ==============================================================

def build_dataset() -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    # --- paths ---
    phys_root = "training2017"
    ecg_mat   = "ECGData.mat"

    sig_a, y_a, gid_a = load_physionet2017_binary(phys_root)
    sig_b, y_b, gid_b = load_ecgdata_binary(ecg_mat)

    sigs   = sig_a + sig_b
    labels = y_a   + y_b
    gids   = gid_a + gid_b

    segments, y_seg, g_seg = [], [], []
    for sig, y, gid in zip(sigs, labels, gids):
        for seg in segment_signal(sig):
            segments.append(seg)
            y_seg.append(y)
            g_seg.append(gid)   # propagate record id

    X = np.stack(segments, dtype=np.float32)      # (N, 4500)
    y = np.array(y_seg, dtype=np.int32)           # (N,)
    g = np.array(g_seg)
    return X, y, g


In [7]:
# ==============================================================
# 5 – Generator & tiny baseline CNN
# ==============================================================

class BatchGen(tf.keras.utils.Sequence):
    def __init__(self, X, y, batch=64, shuffle=True):
        self.X, self.y = X, y
        self.batch = batch
        self.idxs  = np.arange(len(X))
        self.shuffle = shuffle
        if shuffle:
            np.random.shuffle(self.idxs)

    def __len__(self): return math.ceil(len(self.X)/self.batch)
    def __getitem__(self, i):
        idx = self.idxs[i*self.batch:(i+1)*self.batch]
        x   = self.X[idx][...,None]     # add channel dim
        y   = self.y[idx][:,None]       # shape (B,1)
        return x, y
    def on_epoch_end(self):
        if self.shuffle: np.random.shuffle(self.idxs)


def build_tiny_cnn(input_len: int = WIN_SAMPLES) -> Model:
    inp = Input(shape=(input_len,1))
    x   = Conv1D(32, 15, strides=2, padding='same', activation='relu')(inp)
    x   = BatchNormalization()(x)
    x   = Conv1D(64,15,strides=2,padding='same',activation='relu')(x)
    x   = BatchNormalization()(x)
    x   = GlobalAveragePooling1D()(x)
    out = Dense(1, activation='sigmoid')(x)
    return Model(inp, out)


In [8]:
# ==============================================================
# 6 – Split, train, evaluate
# ==============================================================

X, y, groups = build_dataset()
print("Segments shape:", X.shape, "| Positive ratio:", y.mean().round(3))

gss  = GroupShuffleSplit(test_size=0.2, n_splits=1, random_state=SEED)
tr_i, te_i = next(gss.split(X, y, groups))
gss2 = GroupShuffleSplit(test_size=0.2, n_splits=1, random_state=SEED)
tr_i, va_i = next(gss2.split(X[tr_i], y[tr_i], groups[tr_i]))

X_tr, y_tr = X[tr_i], y[tr_i]
X_va, y_va = X[va_i], y[va_i]
X_te, y_te = X[te_i], y[te_i]

train_gen = BatchGen(X_tr, y_tr, batch=64, shuffle=True)
val_gen   = BatchGen(X_va, y_va, batch=64, shuffle=False)

model = build_tiny_cnn()
model.compile(optimizer=Adam(1e-3),
              loss='binary_crossentropy',
              metrics=['accuracy',
                       tf.keras.metrics.AUC(curve='PR', name='AUPRC')])
model.summary()

cb_es = tf.keras.callbacks.EarlyStopping(patience=5,
                                         restore_best_weights=True,
                                         monitor='val_AUPRC',
                                         mode='max')

hist = model.fit(train_gen, epochs=30, validation_data=val_gen,
                 callbacks=[cb_es], verbose=1)


Segments shape: (32455, 4500) | Positive ratio: 0.468


Epoch 1/30


  self._warn_if_super_not_called()


[1m325/325[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m25s[0m 73ms/step - AUPRC: 0.5476 - accuracy: 0.6208 - loss: 0.6515 - val_AUPRC: 0.5785 - val_accuracy: 0.6243 - val_loss: 0.6588
Epoch 2/30
[1m325/325[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m27s[0m 84ms/step - AUPRC: 0.6256 - accuracy: 0.6605 - loss: 0.6268 - val_AUPRC: 0.6485 - val_accuracy: 0.6421 - val_loss: 0.6430
Epoch 3/30
[1m325/325[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m26s[0m 80ms/step - AUPRC: 0.6442 - accuracy: 0.6775 - loss: 0.6110 - val_AUPRC: 0.6608 - val_accuracy: 0.6090 - val_loss: 0.7234
Epoch 4/30
[1m325/325[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m27s[0m 84ms/step - AUPRC: 0.6565 - accuracy: 0.6824 - loss: 0.6040 - val_AUPRC: 0.6619 - val_accuracy: 0.6724 - val_loss: 0.6166
Epoch 5/30
[1m325/325[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m26s[0m 80ms/step - AUPRC: 0.6908 - accuracy: 0.6990 - loss: 0.5860 - val_AUPRC: 0.6830 - val_accuracy: 0.6859 - val_loss: 0.6074
Epoch 6/30


In [9]:
# ==============================================================
# 7 – Test metrics
# ==============================================================

y_pred_prob = model.predict(X_te[...,None], batch_size=128).ravel()
y_pred = (y_pred_prob > 0.5).astype(int)

print("\nConfusion matrix:")
print(confusion_matrix(y_te, y_pred))

print("\nClassification report:")
print(classification_report(y_te, y_pred, digits=4))


[1m51/51[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 26ms/step

Confusion matrix:
[[3030  525]
 [1167 1797]]

Classification report:
              precision    recall  f1-score   support

           0     0.7219    0.8523    0.7817      3555
           1     0.7739    0.6063    0.6799      2964

    accuracy                         0.7405      6519
   macro avg     0.7479    0.7293    0.7308      6519
weighted avg     0.7456    0.7405    0.7354      6519

