In [1]:
import numpy as np
from sklearn.model_selection import train_test_split

# 1. Load dataset
data = np.load("mitdb_windows_5s_binary_afib.npz")
X, y = data["X"], data["y"]

print("Raw dataset shape:", X.shape, y.shape)  # e.g. (936, 1800), (936,)

# 2. Split into Train (70%), Temp (30%)
X_train, X_temp, y_train, y_temp = train_test_split(
    X, y, test_size=0.3, stratify=y, random_state=42
)

# 3. Split Temp into Validation (15%) and Test (15%)
X_val, X_test, y_val, y_test = train_test_split(
    X_temp, y_temp, test_size=0.5, stratify=y_temp, random_state=42
)

# 4. Print split sizes
print("Train:", X_train.shape, y_train.shape)
print("Validation:", X_val.shape, y_val.shape)
print("Test:", X_test.shape, y_test.shape)

# 5. Class balance check
def summarize_split(name, y):
    n_total = len(y)
    n_afib  = int(y.sum())
    n_norm  = n_total - n_afib
    print(f"{name}: {n_total} samples → AFib {n_afib} ({n_afib/n_total:.2%}), Normal {n_norm} ({n_norm/n_total:.2%})")

summarize_split("Train", y_train)
summarize_split("Validation", y_val)
summarize_split("Test", y_test)


Raw dataset shape: (28085, 1800) (28085,)
Train: (19659, 1800) (19659,)
Validation: (4213, 1800) (4213,)
Test: (4213, 1800) (4213,)
Train: 19659 samples → AFib 2224 (11.31%), Normal 17435 (88.69%)
Validation: 4213 samples → AFib 477 (11.32%), Normal 3736 (88.68%)
Test: 4213 samples → AFib 476 (11.30%), Normal 3737 (88.70%)


In [None]:

import numpy as np
from scipy.signal import welch
from scipy.stats import skew, kurtosis
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler


DATA_PATH = "mitdb_windows_5s_binary_afib.npz"
FS = 360.0   # sampling rate for MIT-BIH
SAVE_PATH = "mitdb_features_v1.npz"

# Welch PSD params (tuned for ~5s windows)
WELCH_NPERSEG = 512
WELCH_NOVERLAP = 256

# load data and split x/y
d = np.load(DATA_PATH)
X, y = d["X"], d["y"]
print("Raw:", X.shape, y.shape)

# Stratified 70/15/15 split
X_train, X_temp, y_train, y_temp = train_test_split(
    X, y, test_size=0.30, stratify=y, random_state=42
)
X_val, X_test, y_val, y_test = train_test_split(
    X_temp, y_temp, test_size=0.50, stratify=y_temp, random_state=42
)

def summarize(name, yv):
    n = len(yv)
    af = int(yv.sum())
    print(f"{name}: {n} | AFib {af} ({af/n:.2%}), Normal {n-af} ({1-af/n:.2%})")

summarize("Train", y_train)
summarize("Val", y_val)
summarize("Test", y_test)

# feature extraction
def zero_crossing_rate(x: np.ndarray) -> float:
    # Count sign changes normalized by length
    s = np.signbit(x)
    return np.count_nonzero(s[:-1] ^ s[1:]) / len(x)

def welch_psd(x: np.ndarray, fs: float):
    f, Pxx = welch(x, fs=fs, nperseg=WELCH_NPERSEG, noverlap=WELCH_NOVERLAP)
    return f, Pxx

def bandpower(f: np.ndarray, Pxx: np.ndarray, fmin: float, fmax: float) -> float:
    mask = (f >= fmin) & (f <= fmax)
    # Trapezoidal integral of PSD over the band
    return np.trapz(Pxx[mask], f[mask]) if np.any(mask) else 0.0

def spectral_centroid(f: np.ndarray, Pxx: np.ndarray) -> float:
    num = np.trapz(f * Pxx, f)
    den = np.trapz(Pxx, f) + 1e-12
    return num / den

def dominant_frequency(f: np.ndarray, Pxx: np.ndarray) -> float:
    return float(f[np.argmax(Pxx)]) if len(Pxx) else 0.0

def extract_features_batch(Xbatch: np.ndarray, fs: float) -> np.ndarray:
    """
    Vectorized-ish batch extractor (loops over windows, but each window’s
    ops are NumPy/Scipy). Returns feature matrix [n_samples, n_features].
    """
    feats = []
    for x in Xbatch:
    
        x_mean = float(np.mean(x))
        x_std  = float(np.std(x))
        x_min  = float(np.min(x))
        x_max  = float(np.max(x))
        x_ptp  = float(np.ptp(x))
        x_rms  = float(np.sqrt(np.mean(x**2)))
        x_zcr  = float(zero_crossing_rate(x))
        x_skew = float(skew(x, bias=False))
        x_kurt = float(kurtosis(x, bias=False))

        # ---- Frequency-domain (Welch PSD) ----
        f, Pxx = welch_psd(x, fs)
        bp_lo   = bandpower(f, Pxx, 0.5, 5.0)    # baseline/respiratory-ish
        bp_mid  = bandpower(f, Pxx, 5.0, 15.0)   # P/QRS dominant band
        bp_hi   = bandpower(f, Pxx, 15.0, 40.0)  # higher freq (QRS sharpness)
        spec_c  = spectral_centroid(f, Pxx)
        f_dom   = dominant_frequency(f, Pxx)
        ratio_hl = bp_hi / (bp_lo + 1e-12)

        feats.append([
            x_mean, x_std, x_min, x_max, x_ptp, x_rms, x_zcr, x_skew, x_kurt,
            bp_lo, bp_mid, bp_hi, spec_c, f_dom, ratio_hl
        ])
    return np.array(feats, dtype=np.float32)

feature_names = [
    "mean","std","min","max","ptp","rms","zcr","skew","kurt",
    "bp_0.5_5","bp_5_15","bp_15_40","spec_centroid","f_dominant","ratio_hi_lo"
]

print("Extracting train features...")
Xtr_feat = extract_features_batch(X_train, FS)
print("Extracting val features...")
Xva_feat = extract_features_batch(X_val, FS)
print("Extracting test features...")
Xte_feat = extract_features_batch(X_test, FS)

print("Feature shapes:", Xtr_feat.shape, Xva_feat.shape, Xte_feat.shape)
print("Num features:", len(feature_names))

# avoid leak
scaler = StandardScaler()
Xtr_scaled = scaler.fit_transform(Xtr_feat)
Xva_scaled = scaler.transform(Xva_feat)
Xte_scaled = scaler.transform(Xte_feat)


np.savez_compressed(
    SAVE_PATH,
    X_train_feat=Xtr_scaled,
    X_val_feat=Xva_scaled,
    X_test_feat=Xte_scaled,
    y_train=y_train,
    y_val=y_val,
    y_test=y_test,
    feature_names=np.array(feature_names, dtype=object),
    scaler_mean_=scaler.mean_,
    scaler_scale_=scaler.scale_
)
print(f"Saved features → {SAVE_PATH}")


Raw: (28085, 1800) (28085,)
Train: 19659 | AFib 2224 (11.31%), Normal 17435 (88.69%)
Val: 4213 | AFib 477 (11.32%), Normal 3736 (88.68%)
Test: 4213 | AFib 476 (11.30%), Normal 3737 (88.70%)
Extracting train features...


  return np.trapz(Pxx[mask], f[mask]) if np.any(mask) else 0.0
  num = np.trapz(f * Pxx, f)
  den = np.trapz(Pxx, f) + 1e-12


Extracting val features...
Extracting test features...
Feature shapes: (19659, 15) (4213, 15) (4213, 15)
Num features: 15
Saved features → mitdb_features_v1.npz
