## Data Preparation

The purpose of this notebook is to prepare and understand the working dataset for ML purposes. Important part is to split the data into modeling data  
and testing data - this is important as we must completely isolate the testing data in order to avoid any data and statistical leaks.

#### Goals:
- understand data structure
- load and compile data into one dataset
- map metadata to corresponding ECG signals

### Data structure

Shape:
- 45 152 patients -> signals 10-second 12-lead ECG at 500 Hz -> 5000 samples per lead
- WFDB format (WaveForm Database)

ECG = pair of files:
- JS0000X.hea - ASCII WFDB header + metadata -> description of the data
- JS0000X.mat - binary data matrix (val matrix in raw units) - 10-second 12-lead ECG samples in binary form

In [115]:
'''
ecg-arrhythmia/
    WFDBRecords/
        01/
            010/
                JS00001.hea
                JS00001.mat
                JS00002.hea
                JS00002.mat
                ...
        02/
        ...
        46/
'''
pass

### Implementation

In [116]:
import os
import sys

sys.dont_write_bytecode = True
root_dir = os.path.abspath(os.pardir)
if root_dir not in sys.path:
    sys.path.append(root_dir)

In [117]:
import wfdb
import json
import numpy as np
import pandas as pd
from configs.constants import *

configuration

In [118]:
data_dir_folder_path = os.path.abspath(os.path.join(root_dir, DATA_DIR_FOLDER))
data_dir_path = os.path.abspath(os.path.join(data_dir_folder_path, DATA_DIR))

#### Data Load Pipeline

1. Load pipeline
2. Feature Extraction pipeline

common load - metadata

In [119]:
def iter_header_paths(root_dir, dirname="WFDBRecords"):
    wfdb_root = os.path.join(root_dir, dirname)

    for d1 in sorted(os.listdir(wfdb_root)):
        p1 = os.path.join(wfdb_root, d1)
        if not os.path.isdir(p1):
            continue

        for d2 in sorted(os.listdir(p1)):
            p2 = os.path.join(p1, d2)
            if not os.path.isdir(p2):
                continue

            for fname in sorted(os.listdir(p2)):
                if fname.endswith(".hea"):
                    yield os.path.join(p2, fname)

In [120]:
def parse_header(header_path):
    header_path = os.fspath(header_path)
    with open (header_path, encoding='utf-8') as f:
        lines = f.read().splitlines()

    if not lines:
        raise ValueError(f'Empty header file: {header_path}')
    
    first = lines[0].strip().split()

    # bugfix -> ked chyba prvy metadata riadok
    offset = 1
    try:
        record_name = first[0]
        n_signals = first[1]
        freq = int(first[2])
        n_samples = int(first[3])
    except:
        record_name = None
        n_signals = None
        freq = None
        n_samples = None
        offset = 0

    lines = lines[offset:]

    age = None
    sex = None
    y_dx_codes = []

    for line in lines:
        line = line.strip()
        if line.startswith('#Age:'):
            _, v = line.split(':', 1)
            v = v.strip()
            if v and v.lower() not in ['unknown', 'nan']:
                try:
                    age = int(v)
                except ValueError:
                    age = None
        elif line.startswith('#Sex:'):
            _, v = line.split(":", 1)
            sex = v.strip() or None

        elif line.startswith("#Dx:"):
            _, v = line.split(":", 1)
            codes = [c.strip() for c in v.split(",") if c.strip()]
            dx_codes = []
            for c in codes:
                try:
                    dx_codes.append(int(c))
                except ValueError:
                    pass

    base, _ = os.path.splitext(header_path)
    record_path = base

    return {
        "record": record_name,
        "hea_path": header_path,
        "record_path": record_path,
        "n_sig": n_signals,
        "fs": freq,
        "n_samples": n_samples,
        "age": age,
        "sex": sex,
        "dx_codes": dx_codes,
    }


label loading function

In [121]:
def load_labels(dirpath, filepath, as_index=False, index_column='Snomed_CT'):
    filepath = os.path.join(dirpath, filepath)
    data = pd.read_csv(filepath)
    if as_index:
        data = data.set_index(index_column)
    return data

In [122]:
load_labels(data_dir_folder_path, 'ConditionNames_SNOMED-CT.csv', as_index=True).head()

Unnamed: 0_level_0,Acronym Name,Full Name
Snomed_CT,Unnamed: 1_level_1,Unnamed: 2_level_1
270492004,1AVB,1 degree atrioventricular block
195042002,2AVB,2 degree atrioventricular block
54016002,2AVB1,2 degree atrioventricular block(Type one)
28189009,2AVB2,2 degree atrioventricular block(Type two)
27885002,3AVB,3 degree atrioventricular block


In [123]:
run_ = False
if run_:
    rows = []
    for hea in iter_header_paths(data_dir_folder_path):
        rows.append(parse_header(hea))

    meta_df = pd.DataFrame(rows)
    print("Number of records:", len(meta_df))
    print(meta_df.head())

In [124]:
save_ = False
if save_:
    meta_df.to_csv('../data/results/complete_metadata_mapping_2.csv', index=False)

common load - ECG signal

In [125]:
head_paths = list(iter_header_paths(data_dir_folder_path))

In [126]:
parse_header(head_paths[0])

{'record': 'JS00001',
 'hea_path': 'c:\\Users\\samue\\Desktop\\VS\\Mgr\\mAIN\\Strojove Ucenie\\projekt\\data\\ecg-arrhythmia\\WFDBRecords\\01\\010\\JS00001.hea',
 'record_path': 'c:\\Users\\samue\\Desktop\\VS\\Mgr\\mAIN\\Strojove Ucenie\\projekt\\data\\ecg-arrhythmia\\WFDBRecords\\01\\010\\JS00001',
 'n_sig': '12',
 'fs': 500,
 'n_samples': 5000,
 'age': 85,
 'sex': 'Male',
 'dx_codes': [164889003, 59118001, 164934002]}

### Working with loaded compiled metadata

Workflow:
1. Build metadata mapping dataset -> each row = 1 patient and their recording
2. Map ECG signals to the corresponding -> ECG signal data of the patient (row to signal)
3. Utilize streaming -> data on disk has 6gb size

4. Multiple modeling pipelines -> model dependent, e.g. we will pass raw ECG to CNN but we need to do some feature extraction first for models such as SVM


Resources:
- Models:
    - scikit-multiflow (incremental learning): https://scikit-multiflow.readthedocs.io/en/stable/index.html
    - sklearn streaming: https://scikit-learn.org/stable/computing/scaling_strategies.html

- Features:
    - ECG data processing tips: https://www.frontiersin.org/journals/digital-health/articles/10.3389/fdgth.2025.1649923/full?utm_source=chatgpt.com


Approaches:
1. ML Pipeline - traditional ML models - e.g. Log regression, SVM, forests, ... - doesnt need streaming we compute features and discard signals
2. DL Pipeline - deep learning algos - e.g. CNN, Transformers, ... - we will need to somehow stream rows/batches for fitting


#### Research Questions (ideas)

1. Build custom rhytm classification models - evaluate performance and compare to ECG-FM
2. Adversarial inputs and explanability - test adversarial inputs and critical intervals (which part of ECG is the most critical for output change), compare to ECG-FM
    - the idea here is to find which "regions" of the ECG signal is the most critical and also compare with explainability (due dilligence) -> does critical region match param importance?
    

implementation:

In [127]:
def load_ecg_signal(record_path, dataframe=False):
    sig, fields = wfdb.rdsamp(record_path)
    sig = sig.T.astype(np.float32)
    fs = fields['fs']
    lead_names = fields.get('sig_name')

    if dataframe:
        return pd.DataFrame(sig.T, columns=lead_names)

    return sig, fs, lead_names


working on metadata df

In [128]:
file = '../data/results/complete_metadata_mapping_2.csv'

In [129]:
data = pd.read_csv(file)

In [130]:
ecg_example = load_ecg_signal(data.loc[0]['record_path'], dataframe=True)

In [131]:
ecg_example

Unnamed: 0,I,II,III,aVR,aVL,aVF,V1,V2,V3,V4,V5,V6
0,-0.254,0.264,0.517,-0.005,-0.386,0.390,-0.098,-0.312,-0.098,0.810,0.810,0.527
1,-0.254,0.264,0.517,-0.005,-0.386,0.390,-0.098,-0.312,-0.098,0.810,0.810,0.527
2,-0.254,0.264,0.517,-0.005,-0.386,0.390,-0.098,-0.312,-0.098,0.810,0.810,0.527
3,-0.254,0.264,0.517,-0.005,-0.386,0.390,-0.098,-0.312,-0.098,0.810,0.810,0.527
4,-0.264,0.244,0.508,0.010,-0.386,0.376,-0.083,-0.259,-0.063,0.756,0.756,0.517
...,...,...,...,...,...,...,...,...,...,...,...,...
4995,-0.044,-0.044,0.000,0.044,-0.024,-0.024,-0.029,0.590,0.151,-0.185,-0.190,0.122
4996,-0.034,-0.063,-0.029,0.049,-0.005,-0.049,0.000,0.620,0.166,-0.181,-0.176,0.122
4997,-0.034,-0.068,-0.034,0.054,0.000,-0.054,-0.024,0.595,0.137,-0.205,-0.200,0.102
4998,0.024,-0.049,-0.073,0.015,0.049,-0.063,-0.015,0.590,0.132,-0.200,-0.195,0.093


### Project Pipeline

Workflow:
1. common load pipeline
2. train/test split - in order to avoid stat leaks we split at the beginning
3. ML/DL pipelines
4. Model fit -> other notebook

configuration

In [132]:
file = '../data/results/complete_metadata_mapping_2.csv'
target_file = os.path.join(data_dir_folder_path, 'ConditionNames_SNOMED-CT.csv')

**1. common load pipeline**

In [133]:
def load_row_metadata(dirpath):
    rows = []
    for hea in iter_header_paths(dirpath):
        rows.append(parse_header(hea))

    meta_df = pd.DataFrame(rows)
    print("Number of records:", len(meta_df))
    print(meta_df.head())

    return meta_df

In [134]:
meta_df = pd.read_csv(file)
meta_df['dx_codes'] = meta_df['dx_codes'].map(json.loads)

label encoding pipeline

In [135]:
from sklearn.preprocessing import MultiLabelBinarizer

In [136]:
y = meta_df['dx_codes']

label_df = pd.read_csv(target_file)
classes = label_df['Acronym Name'].unique()

these are mappings in some medical db

In [137]:
label_df['Snomed_CT'].nunique()

55

This is the actual class

In [138]:
label_df['Acronym Name'].nunique()

63

we will predict snomed CT codes as they map similiar conditions to one ID

In [139]:
codes = np.concat(meta_df['dx_codes'].map(np.array).to_numpy())

In [140]:
codes = set(codes)

In [141]:
len(codes)

94

basic target encoding

In [142]:
mlb = MultiLabelBinarizer(classes=classes, sparse_output=False)

In [143]:
y_transformed = mlb.fit_transform(y)



In [144]:
label_df['Snomed_CT'].duplicated().sum()

np.int64(8)

**2. train/test split**

- test split will be created only once and we will only report metrics once
- validation sets during cross-validation can be changed

In [145]:
from sklearn.model_selection import train_test_split

In [146]:
X = meta_df.drop('dx_codes', axis=1)
y = meta_df['dx_codes']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=TEST_SIZE, random_state=TTS_SEED)

In [147]:
print("train shape:", X_train.shape)
print("test shape:", X_test.shape)

train shape: (40636, 8)
test shape: (4516, 8)


**3. ML/DL pipeline**

In [148]:
import neurokit2 as nk

ML pipeline transformations

In [149]:
LEADS_12 = ['I', 'II', 'III', 'aVR', 'aVL', 'aVF', 'V1', 'V2', 'V3', 'V4', 'V5', 'V6']

In [150]:
import numpy as np

def pick_reference_lead(sig: np.ndarray, lead_names):
    """
    Always pick Lead II (exactly 'II', case-insensitive).
    Raises if Lead II is not present in lead_names.
    """
    if lead_names is None:
        raise ValueError("lead_names is None; cannot select Lead II")

    lead_names = [str(x).strip() for x in lead_names]

    # case-insensitive match for "II"
    for i, nm in enumerate(lead_names):
        if nm.lower() == "ii":
            return i, nm

    raise ValueError(f"Lead II ('II') not found in lead_names={lead_names}")


def compute_beat_features(sig: np.ndarray, fs: float, lead_names=None):
    """
    Beat-timing features from Lead II using NeuroKit2:
      - qrs_count
      - vent_rate_bpm
      - RR interval stats
      - first/last R-peak time (seconds)

    NeuroKit2 exposes R-peak sample indices via info['ECG_R_Peaks']
    """
    if fs is None or fs <= 0:
        raise ValueError("fs must be a positive sampling rate in Hz")

    lead_idx, lead_used = pick_reference_lead(sig, lead_names)
    x = np.asarray(sig[lead_idx], dtype=np.float32)

    duration_s = x.shape[0] / float(fs)

    x_clean = nk.ecg_clean(x, sampling_rate=fs)
    _, info = nk.ecg_peaks(x_clean, sampling_rate=fs)
    rpeaks = np.asarray(info["ECG_R_Peaks"], dtype=int)

    qrs_count = int(rpeaks.size)
    vent_rate_bpm = (qrs_count / duration_s) * 60.0 if duration_s > 0 else np.nan

    rr_s = (np.diff(rpeaks) / float(fs)) if rpeaks.size >= 2 else np.array([], dtype=float)

    return {
        "beat_lead": lead_used,
        "qrs_count": qrs_count,
        "vent_rate_bpm": float(vent_rate_bpm),
        "rr_count": int(rr_s.size),
        "rr_mean_s": float(np.mean(rr_s)) if rr_s.size else np.nan,
        "rr_var_s": float(np.var(rr_s)) if rr_s.size else np.nan,
        "rr_min_s": float(np.min(rr_s)) if rr_s.size else np.nan,
        "rr_max_s": float(np.max(rr_s)) if rr_s.size else np.nan,
        "first_rpeak_s": float(rpeaks[0] / fs) if rpeaks.size else np.nan,
        "last_rpeak_s": float(rpeaks[-1] / fs) if rpeaks.size else np.nan,
    }


In [151]:
def compute_signal_stats_1d(x: np.ndarray):
    x = np.asarray(x, dtype=np.float32)
    x = np.nan_to_num(x, nan=0.0)

    xmax = float(np.max(x))
    xmin = float(np.min(x))
    return {
        "mean": float(np.mean(x)),
        "std": float(np.std(x)),
        "min": xmin,
        "max": xmax,
        "ptp": float(xmax - xmin),
        "rms": float(np.sqrt(np.mean(x * x))),
        "energy": float(np.sum(x * x)),
        "zc": int(np.sum((x[:-1] * x[1:]) < 0)),
    }


def compute_signal_features(sig: np.ndarray, fs: float, lead_names):
    """
    Per-lead features for all 12 standard lead names if available.
    Missing leads are filled with NaN.
    """
    if lead_names is None:
        lead_names = [f"lead{i}" for i in range(sig.shape[0])]
    lead_names = [str(x) for x in lead_names]
    name_to_idx = {nm: i for i, nm in enumerate(lead_names)}

    feats = {
        "fs": float(fs),
        "n_leads": int(sig.shape[0]),
        "n_samples": int(sig.shape[1]),
        "duration_s": float(sig.shape[1] / float(fs)) if fs else np.nan,
    }

    for lead in LEADS_12:
        if lead in name_to_idx:
            st = compute_signal_stats_1d(sig[name_to_idx[lead]])
        else:
            st = {k: np.nan for k in ["mean","std","min","max","ptp","rms","energy","zc"]}
        for k, v in st.items():
            feats[f"{lead}__{k}"] = v

    return feats

In [None]:
def load_meta_df_stream(meta_df, **kwargs):
    save_to_file = kwargs.get('save_to_file', False)
    dirpath = kwargs.get('dirpath', './')
    dataframe = kwargs.get('dataframe', True)

    rows = []
    logs = {}

    for row in meta_df.itertuples(index=False):
        record_path = getattr(row, 'record_path')
        record_id = getattr(row, 'record')

        data = {
                'record_id' : record_id,
                'age' : getattr(row, 'age', None),
                'sex' : getattr(row, 'sex', None)
            }

        try:
            sig, fs, lead_names = load_ecg_signal(record_path)

            ecg_beat_features = compute_beat_features(sig, fs, lead_names)
            ecg_signal_features = compute_signal_features(sig, fs, lead_names)

            data.update(ecg_beat_features)
            data.update(ecg_signal_features)
        except Exception as e:
            log = {
                'error' : str(e)
            }
            data.update(log)
            logs.update(log)

        rows.append(data)

    if dataframe:
        rows = pd.DataFrame(rows)

    if save_to_file:
        file_df = pd.DataFrame(rows)
        file_df.to_csv(dirpath, index=False)

    return rows

testing extraction

In [186]:
test_record = meta_df.loc[0]['record_path']

In [187]:
sig, fs, lead_names = load_ecg_signal(test_record, dataframe=False)

In [188]:
beat_features = compute_beat_features(sig, fs, lead_names)
signal_features = compute_signal_features(sig, fs, lead_names)

full dataset processing

In [189]:
dirpath = '../data/preprocessing/ML/meta_df_result_1.csv'

In [None]:
result = load_meta_df_stream(meta_df, save_to_file=False, dirpath=dirpath)

In [194]:
result

Unnamed: 0,record_id,age,sex,beat_lead,qrs_count,vent_rate_bpm,rr_count,rr_mean_s,rr_var_s,rr_min_s,...,V5__energy,V5__zc,V6__mean,V6__std,V6__min,V6__max,V6__ptp,V6__rms,V6__energy,V6__zc
0,JS00001,85.0,Male,II,19.0,114.0,18.0,0.512444,0.002408,0.380,...,361.236450,85.0,-0.032806,0.755254,-1.649,2.752,4.401,0.755966,2857.425293,43.0
1,JS00002,59.0,Female,II,8.0,48.0,7.0,1.160571,0.000699,1.128,...,155.006012,61.0,0.006308,0.125265,-0.156,1.035,1.191,0.125424,78.655914,78.0
2,JS00004,66.0,Male,II,9.0,54.0,8.0,1.125500,0.000268,1.098,...,342.756165,44.0,0.018101,0.257159,-0.361,1.752,2.113,0.257795,332.291321,24.0
3,JS00005,73.0,Female,II,27.0,162.0,26.0,0.369923,0.000029,0.364,...,432.174774,227.0,-0.001625,0.165089,-0.332,0.888,1.220,0.165097,136.284698,239.0
4,JS00006,46.0,Female,II,9.0,54.0,8.0,1.059750,0.000556,1.020,...,199.287277,28.0,0.010836,0.185355,-0.142,1.254,1.396,0.185671,172.368942,21.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
45147,JS45547,34.0,Male,II,19.0,114.0,18.0,0.494222,0.005276,0.310,...,2021.443848,78.0,-0.013376,0.508324,-2.284,2.084,4.368,0.508500,1292.862305,79.0
45148,JS45548,41.0,Female,II,0.0,0.0,0.0,,,,...,630.414673,58.0,0.002603,0.261121,-0.781,0.400,1.181,0.261134,340.954041,106.0
45149,JS45549,36.0,Male,II,20.0,120.0,19.0,0.483368,0.000109,0.454,...,1860.205200,105.0,-0.003526,0.478872,-1.742,0.820,2.562,0.478885,1146.652710,73.0
45150,JS45550,30.0,Female,II,17.0,102.0,16.0,0.574500,0.000013,0.568,...,119.843964,65.0,0.002437,0.099351,-0.468,0.351,0.819,0.099381,49.382824,56.0


DL pipeline transformations
 - mostly passing raw ECG leads signals (all 12)

common ECG load -> per row loading

**4. Model fit**

**full pipeline**