In [None]:
from utils import *
import neurokit2 as nk
import pandas as pd
import matplotlib.pyplot as plt
import plotly.graph_objects as go
import numpy as np
import pyarrow as pa

In [None]:
def find_peaks(ecg, sampling_rate, method):
    signal, info = nk.ecg_peaks(ecg, sampling_rate=sampling_rate, method=method)
    return info["ECG_R_Peaks"]


def rolling_idxmax(series, window_size, closed = 'both', center = False, **kwargs):
    return series.rolling(window_size, closed = closed, center = center, **kwargs).apply(lambda x: x.idxmax()).dropna(ignore_index = True).astype(int)

In [None]:
record_num = 103
record, ann =  load_record(record_num)
fs = record.fs

total_time = 30 # seconds
offset = 60 # seconds
samples = int(total_time * fs)
start_samples = int(offset * fs)
end_samples = start_samples + samples
discard_samples = 2 * fs # 2 seconds
first_used_sample = start_samples + discard_samples

In [None]:
# ECG signal
ecg = pd.Series(record.p_signal[:,0])[start_samples:end_samples]

ann_samples = pd.Series(ann.sample)
ann_symbols= pd.Series(ann.symbol)

# Derised annotations (N - normal beat)
derised_anns = ['N']

# Mask for time window and derised annotations
mask_time_window = (ann_samples >= start_samples) & (ann_samples < end_samples) 
mask_derised_ann = ann_symbols.isin(derised_anns)

mask_not_discard = ann_samples >= first_used_sample

mask_used_ann = mask_time_window & mask_derised_ann & mask_not_discard

# Apply mask
ann_samples = ann_samples[mask_used_ann]
ann_symbols = ann_symbols[mask_used_ann]

In [None]:
methods=  ['hamilton2002', 'kalidas2017', 'rodrigues2020']

dict_results = {}

for method in methods:
    result = find_peaks(ecg, fs, method)
    # Fix index
    result += start_samples
    result = result[result >= first_used_sample]

    # Store results in dict
    dict_results[method] = result

ecg = ecg.loc[first_used_sample:]

In [None]:
# Plot the ECG signal

x_xis_factor = 1 #1/fs # 1/fs = seconds, 1 = samples

fig = go.Figure()
fig.add_trace(go.Scatter(x=ecg.index*x_xis_factor, y=ecg, name="ECG"))

# Plot the R peaks from the annotations
fig.add_trace(go.Scatter(x=ann_samples*x_xis_factor, y=ecg.loc[ann_samples], mode="markers", name="R peaks", marker=dict(color="red", size=7)))


for method in methods:
    result = dict_results[method]
    fig.add_trace(go.Scatter(x=result*x_xis_factor, y=ecg.loc[result], mode="markers", name=method, marker=dict(size=7)))


# Plot the R peaks from the Hamilton method
#fig.add_trace(go.Scatter(x=ann_samples_hamilton*x_xis_factor, y=ecg.loc[ann_samples_hamilton], mode="markers", name="R peaks Hamilton", marker=dict(color="green", size=7)))

# Plot the R peaks from the Kalidas method
#fig.add_trace(go.Scatter(x=ann_samples_kalidas*x_xis_factor, y=ecg.loc[ann_samples_kalidas], mode="markers", name="R peaks Kalidas", marker=dict(color="blue", size=7)))

# Plot the R peaks from the Rodrigues method
#fig.add_trace(go.Scatter(x=ann_samples_rodrigues*x_xis_factor, y=ecg.loc[ann_samples_rodrigues], mode="markers", name="R peaks Rodrigues", marker=dict(color="orange", size=7)))



#Define x zoom
fig.update_xaxes(range=[first_used_sample * x_xis_factor, (first_used_sample + 10 * fs) * x_xis_factor ])

# Remove borders
fig.update_layout(
    margin=dict(l=0, r=0, t=15, b=0),
    paper_bgcolor="white",
)

# Add range slider
fig.update_layout(
    xaxis=dict(
        rangeslider=dict(
            visible=True
        ),
        type="linear"
    )
)