## Testing (NeuroKit2) library to find ECG peaks


### ECG Peaks and it Bounds processing:


### Process ECG pipeline


In [None]:
import neurokit2 as nk
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import json

In [None]:
def find_crossing(ecg_signal, start, end, treshhold, direction="up"):

    if start < 0:
        start = 0
    if end >= len(ecg_signal):
        end = len(ecg_signal) - 1

    segment = ecg_signal[start:end+1]

    if direction == "up":
        crossing_indexes = np.where((segment[:-1] < treshhold) & (segment[1:] >= treshhold))[0]

    else:
        crossing_indexes = np.where((segment[:-1] > treshhold) & (segment[1:] <= treshhold))[0]

    if len(crossing_indexes) > 0:
        i = crossing_indexes[0]
        x0, x1 = start + i, start + i + 1
        y0, y1 = segment[i], segment[i+1]

        if y1 == y0:
            return x0

        precise_crossing = x0 + (treshhold - y0) * (x1 - x0) / (y1 - y0)
        return precise_crossing
    return None

def plot_qrs_analysis(ecg_signal, search_start, search_end, r_idx, baseline,
                      half_amplitude, left_idx, right_idx, qrs_width_ms, beat_number):

    plt.figure(figsize=(12, 5))

    plot_segment = ecg_signal[search_start:search_end]

    x_relative = np.arange(len(plot_segment))

    plt.plot(x_relative, plot_segment, 'k-', label="ECG")

    plt.axhline(baseline, color='gray', linestyle='-', label="Baseline (PR)")
    plt.axhline(half_amplitude, color='orange', linestyle='--', label="Half R amplitude")

    r_idx_relative = r_idx - search_start
    plt.scatter([r_idx_relative], [ecg_signal[r_idx]], color='red', zorder=5,
                s=100, label=f"R-Peak (index {r_idx})")

    if left_idx is not None:
        left_idx_relative = left_idx - search_start
        plt.axvline(left_idx_relative, color='blue', linestyle=':',
                    label=f"Left Cross (abs: {left_idx:.1f})")

    if right_idx is not None:
        right_idx_relative = right_idx - search_start
        plt.axvline(right_idx_relative, color='green', linestyle=':',
                    label=f"Right Cross (abs: {right_idx:.1f})")

    plt.title(f"QRS with at half R Amplitude (Beat {beat_number}): {qrs_width_ms:.1f} ms")
    plt.xlabel("Sample (Relative to Segment)")
    plt.ylabel("Voltaje (mV)")
    plt.legend()
    plt.grid(True, linestyle='--', alpha=0.6)
    plt.show()

def calculate_qrs_width_at_half_r_amplitude(
        ecg_signal,
        qrs_complex,
        sampling_frecuency,
        plot_first_beat,
        plot_all_beats = False):

    results = []

    q_peaks = np.array(qrs_complex.get("Q_Peaks", []))
    r_peaks = np.array(qrs_complex.get("R_Peaks", []))
    s_peaks = np.array(qrs_complex.get("S_Peaks", []))

    r_peaks = r_peaks.flatten()
    q_peaks = q_peaks.flatten()
    s_peaks = s_peaks.flatten()

    for i, r_idx in enumerate(r_peaks):
        try:
            q_peaks_before_r = q_peaks[q_peaks < r_idx]
            if q_peaks_before_r.size == 0 :
                raise IndexError("No Q-peaks found before R-peaks")

            q_idx = int(q_peaks_before_r.max())

            s_peaks_after_r = s_peaks[s_peaks > r_idx]
            if s_peaks_after_r.size == 0:
                raise IndexError("No S-peaks found after R-peaks")

            s_idx = int(s_peaks_after_r.min())

            pr_window_start = max(0, q_idx - int(0.08 * sampling_frecuency))
            pr_window_end = q_idx
            if pr_window_end - pr_window_start < 5:
                continue
            baseline = np.median(ecg_signal[pr_window_start:pr_window_end])

            r_amplitude = ecg_signal[r_idx] - baseline
            half_amplitude = baseline + r_amplitude / 2

            search_start = max(0, int(q_idx) - 10)
            search_end = min(len(ecg_signal), int(s_idx) + 10)

            left_idx = find_crossing(ecg_signal, search_start, r_idx,
                                     half_amplitude, direction="up")
            right_idx = find_crossing(ecg_signal, r_idx, search_end,
                                     half_amplitude, direction="down")

            if left_idx is not None and right_idx is not None:
                qrs_samples = right_idx - left_idx
                qrs_width_ms = (qrs_samples / sampling_frecuency) * 1000

                results.append({
                    "r_peak_index": r_idx,
                    "qrs_width_ms": qrs_width_ms,
                    "baseline": baseline,
                    "half_amplitude": half_amplitude,
                    "left_crossing_index": left_idx,
                    "right_crossing_index": right_idx
                })

                if plot_all_beats:
                    print(f"Ploting beat {i+1}...")
                    plot_qrs_analysis(ecg_signal, search_start, search_end, r_idx, baseline,
                                      half_amplitude, left_idx, right_idx, qrs_width_ms, i+1)
                if i == 0 and plot_first_beat:
                    print("Ploting the first beat...")
                    plot_qrs_analysis(ecg_signal, search_start, search_end, r_idx, baseline,
                                      half_amplitude, left_idx, right_idx, qrs_width_ms, i+1)
            else:
                print(f"Could not find both crossing points for the beat at index {r_idx}")

        except IndexError:
            print(f"Skiping beat in the index {r_idx} by leak peaks Q/S associated.")
            continue

    return results

In [439]:
def convert_numpy_to_native(obj):
    if isinstance(obj, np.ndarray):
        return obj.tolist()
    elif isinstance(obj, np.integer):
        return int(obj)
    elif isinstance(obj, np.floating):
        return float(obj)
    elif isinstance(obj, dict):
        return {key: convert_numpy_to_native(value) for key, value in obj.items()}
    elif isinstance(obj, list):
        return [convert_numpy_to_native(item) for item in obj]
    else:
        return obj

In [None]:
def export_as_json(to_export):
    file_name = f'ecg_processed.json'
    ecg_processed = []

    content = {}

    json_export = convert_numpy_to_native(to_export)

    ecg_processed = load_JSON_data(file_name)

    ecg_processed.append(json_export)

    try:
        content["ecg"] = ecg_processed

        with open(file_name, 'w', encoding='utf-8') as f:
            json.dump(content, f, ensure_ascii=False, indent=4)
            print(f'JSON file added successfully')
    except Exception as e:
        print(f'Error at save JSON: {e}')

def load_JSON_data(file_name):
    ecg_processed = []
    try:
        with open(file_name, 'r', encoding='utf-8') as f:
            ecg_processed = json.load(f).get("ecg", [])
    except json.JSONDecodeError:
        print(f'Warning: {file_name} does not contains a valid JSON. A new one will be created')
        ecg_processed = []
    except Exception as e:
        print(f'Error at read existing {file_name} JSON file: {e}')
        raise

    return ecg_processed

In [None]:
column_names = ['lead_1', 'lead_2', 'lead_3', 'lead_4', 'lead_5', 'lead_6',
                'lead_7', 'lead_8', 'lead_9', 'lead_10', 'lead_11', 'lead_12']

SAMPLING_RATE = 500

schema_report = {
    "file_name": "",
    "deriveds": []
}

r_peaks_methods = {}
waves_methods = {}

folder_path = './ECG_BY_HEARTH_DISEASES/SR/'
full_path = folder_path

csv_list = [
    "MUSE_20180111_160053_89000.csv",
    "MUSE_20180111_160140_78000.csv",
    "MUSE_20180111_160159_36000.csv",
    "MUSE_20180111_160355_88000.csv",
    "MUSE_20180111_160410_42000.csv",
]

for file_name in csv_list:
    full_path += file_name

    schema_report['file_name'] = file_name

    ecg_df = pd.read_csv(full_path, header=0, names=column_names, encoding='utf-8')

    for ecg_lead in ecg_df.columns:
        derived = {}
        samples = {}

        derived["name"] = ecg_lead

        ecg_signal = list(ecg_df.loc[:, ecg_lead])

        signals, info = nk.ecg_process(ecg_signal, sampling_rate=SAMPLING_RATE)

        clean_signal = signals["ECG_Clean"].to_numpy()
        r_peaks = info["ECG_R_Peaks"]
        q_peaks = info["ECG_Q_Peaks"]
        s_peaks = info["ECG_S_Peaks"]

        qrs_complex = {}
        qrs_complex["Q_Peaks"] = q_peaks
        qrs_complex["R_Peaks"] = r_peaks
        qrs_complex["S_Peaks"] = s_peaks

        samples['R-Peaks'] = r_peaks

        waves = {}
        waves["P_Wave"] = [info["ECG_P_Onsets"], info["ECG_P_Peaks"], info["ECG_P_Offsets"]]
        waves["Q"] = q_peaks
        waves["S"] = s_peaks

        samples['Waves'] = waves

        qrs_w_hra = calculate_qrs_width_at_half_r_amplitude(clean_signal,
                                                  qrs_complex,
                                                  SAMPLING_RATE,
                                                  False,
                                                  False)

        samples['qrs_w_hra'] = qrs_w_hra

        derived['record'] = samples

        schema_report['deriveds'].append(derived)

    export_as_json(schema_report)
    full_path = folder_path

Could not find both crossing points for the beat at index 1356
Could not find both crossing points for the beat at index 2579
JSON file added successfully
Skiping beat in the index 368 by leak peaks Q/S associated.
Skiping beat in the index 746 by leak peaks Q/S associated.
Skiping beat in the index 1186 by leak peaks Q/S associated.
Skiping beat in the index 1721 by leak peaks Q/S associated.
Could not find both crossing points for the beat at index 4893
Skiping beat in the index 368 by leak peaks Q/S associated.
Skiping beat in the index 746 by leak peaks Q/S associated.
Skiping beat in the index 1186 by leak peaks Q/S associated.
Skiping beat in the index 1721 by leak peaks Q/S associated.
Skiping beat in the index 363 by leak peaks Q/S associated.
Skiping beat in the index 367 by leak peaks Q/S associated.
Skiping beat in the index 745 by leak peaks Q/S associated.
Skiping beat in the index 1185 by leak peaks Q/S associated.
Skiping beat in the index 1719 by leak peaks Q/S associat