In [None]:
!pip install wfdb PyWavelets


Collecting wfdb
  Downloading wfdb-4.3.0-py3-none-any.whl.metadata (3.8 kB)
Collecting PyWavelets
  Downloading pywavelets-1.8.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.0 kB)
Collecting pandas>=2.2.3 (from wfdb)
  Downloading pandas-2.2.3-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (89 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.9/89.9 kB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m
Downloading wfdb-4.3.0-py3-none-any.whl (163 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m163.8/163.8 kB[0m [31m9.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pywavelets-1.8.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (4.5 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.5/4.5 MB[0m [31m60.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pandas-2.2.3-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (13.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [None]:
import os
import numpy as np
import pandas as pd

import wfdb
import pywt
from scipy.signal import medfilt
from scipy.stats import skew, kurtosis

from sklearn.preprocessing  import StandardScaler
from sklearn.model_selection import GridSearchCV
from sklearn.svm           import SVC
from sklearn.metrics       import classification_report, confusion_matrix


In [None]:
# This will fetch all 48 records into "./mitbih-database"
if not os.path.isdir('mitbih-database'):
    wfdb.dl_database('mitdb', 'mitbih-database')


In [None]:
FS = 360    # sampling rate
W  = 180    # window length (samples)
H  = W // 2

# Inter-patient split IDs from the paper
DS1 = ["101","106","108","109","112","114","115","116","118","119","122","124",
       "201","203","205","207","208","209","215","220","223","230"]
DS2 = ["100","103","105","111","113","117","121","123","200","202","210","212",
       "213","219","221","222","228","231","232","233","234"]

# small prototype splits
DS1_small = DS1[:5]   # first 5 records for train
DS2_small = DS2[:5]   # first 5 records for test



In [None]:
def load_record(rec_id):
    rec = wfdb.rdrecord(f'mitbih-database/{rec_id}')
    ann = wfdb.rdann(   f'mitbih-database/{rec_id}', 'atr')
    sig = rec.p_signal[:,0]  # Lead II

    # Baseline removal: 200ms & 600ms median filters
    b1  = medfilt(sig, kernel_size=int(0.2*FS)|1)
    b2  = medfilt(b1, kernel_size=int(0.6*FS)|1)
    sig = sig - b2

    return sig, ann.sample, ann.symbol

train_raw = [load_record(r) for r in DS1_small]
test_raw  = [load_record(r) for r in DS2_small]


In [None]:
# Map MIT-BIH symbol → AAMI class index (0:N, 1:SVEB, 2:VEB, 3:Fusion)
def sym2cls(s):
    if   s in ['N','L','R']:           return 0
    elif s in ['A','a','J','S','e','j']: return 1
    elif s in ['V','E']:               return 2
    elif s == 'F':                     return 3
    else:                              return None

# R-R feature calculator (pre,post,local(10),global + normalize)
def rr_feats(peaks):
    rr = np.diff(peaks)/FS
    pre  = np.insert(rr, 0, rr[0])
    post = np.append(rr, rr[-1])
    local  = pd.Series(pre).rolling(10, min_periods=1).mean().values
    global_ = np.full_like(pre, pre.mean())
    feats = np.vstack([pre, post, local, global_,
                       pre/pre.mean(), post/post.mean(),
                       local/local.mean(), global_/global_.mean()]).T
    return feats  # shape = (len(peaks), 8)

def segment_record(sig, peaks, symbols):
    beats, labs, idxs = [], [], []
    for i,(p,s) in enumerate(zip(peaks, symbols)):
        cls = sym2cls(s)
        if cls is None or p-H < 0 or p+H > len(sig):
            continue
        beats.append(sig[p-H : p+H])
        labs .append(cls)
        idxs.append(i)
    return np.array(beats), np.array(labs), np.array(idxs)

# Process train
Xb_tr_list, y_tr_list, rr_tr_list = [], [], []
for sig, peaks, syms in train_raw:
    b, l, idx = segment_record(sig, peaks, syms)
    Xb_tr_list.append(b)
    y_tr_list.append(l)
    rr_tr_list.append(rr_feats(peaks)[idx])
Xb_tr = np.vstack(Xb_tr_list)
y_tr  = np.hstack(y_tr_list)
X_rr_tr = np.vstack(rr_tr_list)

# Process test
Xb_te_list, y_te_list, rr_te_list = [], [], []
for sig, peaks, syms in test_raw:
    b, l, idx = segment_record(sig, peaks, syms)
    Xb_te_list.append(b)
    y_te_list.append(l)
    rr_te_list.append(rr_feats(peaks)[idx])
Xb_te = np.vstack(Xb_te_list)
y_te  = np.hstack(y_te_list)
X_rr_te = np.vstack(rr_te_list)


In [None]:
# 6.1 Wavelets (db1 level 3 → ~23 coeffs)
def wave_feats(b):
    return np.hstack(pywt.wavedec(b, 'db1', level=3))

# 6.2 Higher-Order Stats on 5 segments → 10 dims
def hos_feats(b):
    segs = np.array_split(b, 5)
    out = []
    for s in segs:
        out += [ skew(s), kurtosis(s) ]
    return np.array(out)

# 6.3 1D-LBP radius=4 → 256-bin histogram
def lbp1d(b, r=4):
    codes = []
    for i in range(r, len(b)-r):
        left  = (b[i-r:i] > b[i]).astype(int)
        right = (b[i+1:i+r+1] > b[i]).astype(int)
        bits  = np.concatenate([left, right])
        codes.append(sum(bit<<j for j,bit in enumerate(bits)))
    hist, _ = np.histogram(codes, bins=256, range=(0,256))
    return hist

# 6.4 Morphological distances
def morph_feats(b):
    center = len(b)//2
    i1 = np.argmax(b[:40])
    i2 = 75 + np.argmin(b[75:85])
    i3 = 95 + np.argmin(b[95:105])
    i4 = 150 + np.argmax(b[150:])
    return np.array([
        np.hypot(i1-center, b[i1]),
        np.hypot(i2-center, b[i2]),
        np.hypot(i3-center, b[i3]),
        np.hypot(i4-center, b[i4]),
    ])


In [None]:
# Cell 7 – Extract & Scale All Feature Sets (Parallelized)
from joblib import Parallel, delayed
from sklearn.preprocessing import StandardScaler

scalers = {}

# Helper to parallelize vstack of feature-fn over beats
def extract_and_stack(feat_fn, beat_array):
    feats = Parallel(n_jobs=-1)(
        delayed(feat_fn)(b) for b in beat_array
    )
    return np.vstack(feats)

# 7.1 Wavelet features
X_wav_tr = extract_and_stack(wave_feats, Xb_tr)
X_wav_te = extract_and_stack(wave_feats, Xb_te)
scalers['wav'] = StandardScaler().fit(X_wav_tr)
X_wav_tr = scalers['wav'].transform(X_wav_tr)
X_wav_te = scalers['wav'].transform(X_wav_te)

# 7.2 HOS features
X_hos_tr = extract_and_stack(hos_feats, Xb_tr)
X_hos_te = extract_and_stack(hos_feats, Xb_te)
scalers['hos'] = StandardScaler().fit(X_hos_tr)
X_hos_tr = scalers['hos'].transform(X_hos_tr)
X_hos_te = scalers['hos'].transform(X_hos_te)

# 7.3 1D-LBP features
X_lbp_tr = extract_and_stack(lbp1d, Xb_tr)
X_lbp_te = extract_and_stack(lbp1d, Xb_te)
scalers['lbp'] = StandardScaler().fit(X_lbp_tr)
X_lbp_tr = scalers['lbp'].transform(X_lbp_tr)
X_lbp_te = scalers['lbp'].transform(X_lbp_te)

# 7.4 Morphological features
X_morph_tr = extract_and_stack(morph_feats, Xb_tr)
X_morph_te = extract_and_stack(morph_feats, Xb_te)
scalers['morph'] = StandardScaler().fit(X_morph_tr)
X_morph_tr = scalers['morph'].transform(X_morph_tr)
X_morph_te = scalers['morph'].transform(X_morph_te)

# 7.5 R-R Interval features
scalers['rr'] = StandardScaler().fit(X_rr_tr)
X_rr_tr = scalers['rr'].transform(X_rr_tr)
X_rr_te = scalers['rr'].transform(X_rr_te)

print("Feature extraction + scaling complete.")


Feature extraction + scaling complete.


In [None]:
# Auto-clean all features and align y_tr
X_clean = {}
y_clean = {}

# 1. RR: already separate
X_clean['rr'] = X_rr_tr
y_clean['rr'] = y_tr[:len(X_rr_tr)]

# 2. All beat-aligned features
X_feature_sources = {
    'wav': X_wav_tr,
    'hos': X_hos_tr,
    'lbp': X_lbp_tr,
    'morph': X_morph_tr
}

for name, X in X_feature_sources.items():
    N = min(len(X), len(y_tr))   # ensure no IndexError
    X = X[:N]
    y = y_tr[:N]

    mask = ~np.isnan(X).any(axis=1)
    X_clean[name] = X[mask]
    y_clean[name] = y[mask]

    print(f"{name.upper()}: removed {np.sum(~mask)} rows with NaNs")


WAV: removed 0 rows with NaNs
HOS: removed 0 rows with NaNs
LBP: removed 0 rows with NaNs
MORPH: removed 0 rows with NaNs


In [None]:
def train_svm(X, y):
    gamma = 1.0 / X.shape[1]
    gs = GridSearchCV(
        SVC(kernel='rbf', probability=True, gamma=gamma),
        {'C': [1e-3,1e-2,1e-1,1,10,100]},
        cv=3, n_jobs=-1
    )
    gs.fit(X, y)
    return gs.best_estimator_

svm_rr    = train_svm(X_rr_tr,    y_tr)
svm_wav   = train_svm(X_wav_tr,   y_tr)
svm_hos   = train_svm(X_hos_tr,   y_tr)
svm_lbp   = train_svm(X_lbp_tr,   y_tr)
svm_morph = train_svm(X_morph_tr, y_tr)


In [None]:
probs = np.stack([
    svm_rr   .predict_proba(X_rr_te),
    svm_wav  .predict_proba(X_wav_te),
    svm_hos  .predict_proba(X_hos_te),
    svm_lbp  .predict_proba(X_lbp_te),
    svm_morph.predict_proba(X_morph_te),
], axis=0)  # shape = (5, n_samples, 4)

prod    = np.prod(probs, axis=0)       # (n_samples, 4)
y_pred  = prod.argmax(axis=1)


In [None]:
from sklearn.metrics import (
    classification_report,
    confusion_matrix,
    accuracy_score
)
import numpy as np

# Automatically detect predicted class indices
present_classes = np.unique(np.concatenate([y_te, y_pred]))

# Full label names in order (0 = N, 1 = SVEB, 2 = VEB, 3 = Fusion)
label_names = ['N', 'SVEB', 'VEB', 'Fusion']
used_names = [label_names[i] for i in present_classes]

# 1. Classification report (precision/recall/f1)
print("📊 Classification Report:")
print(classification_report(
    y_te, y_pred,
    labels=present_classes,
    target_names=used_names,
    zero_division=0  # prevents divide-by-zero warnings
))

# 2. Confusion matrix
print("\n🧾 Confusion Matrix:")
print(confusion_matrix(
    y_te, y_pred,
    labels=present_classes
))

# 3. Accuracy
acc = accuracy_score(y_te, y_pred)
print(f"\n✅ Overall Accuracy: {acc:.4f}")


📊 Classification Report:
              precision    recall  f1-score   support

           N       0.99      1.00      1.00     10756
        SVEB       0.00      0.00      0.00        41
         VEB       0.79      0.44      0.57        43

    accuracy                           0.99     10840
   macro avg       0.60      0.48      0.52     10840
weighted avg       0.99      0.99      0.99     10840


🧾 Confusion Matrix:
[[10751     0     5]
 [   41     0     0]
 [   24     0    19]]

✅ Overall Accuracy: 0.9935
