# Title

## Setup and Helper Functions

In [1]:
import json
from pathlib import Path

import matplotlib.pyplot as plt
import seaborn as sns
from fau_colors import cmaps, register_fausans_font

from pepbench.datasets import EmpkinsDataset, GuardianDataset
from pepbench.evaluation import PepEvaluationChallenge

%matplotlib widget
%load_ext autoreload
%autoreload 2

In [2]:
register_fausans_font()
plt.close("all")

palette = sns.color_palette(cmaps.faculties)
sns.set_theme(context="notebook", style="ticks", font="sans-serif", palette=palette)

plt.rcParams["figure.figsize"] = (10, 5)
plt.rcParams["pdf.fonttype"] = 42
plt.rcParams["mathtext.default"] = "regular"

palette

In [3]:
deploy_type = "local"

config_dict = json.load(Path("./config.json").open(encoding="utf-8"))

guardian_base_path = Path(config_dict[deploy_type]["guardian_path"])
guardian_base_path

PosixPath('/Users/richer/Documents/PhD/Projects/EmpkinS/HealthPsychology_D03/Data/2024_08_PEP_Benchmarking/Guardian_Dataset')

## Guardian Dataset

In [4]:
dataset_guardian = GuardianDataset(guardian_base_path, use_cache=True, only_labeled=True)
dataset_guardian

Unnamed: 0,participant,phase
0,GDN0005,Pause
1,GDN0005,Valsalva
2,GDN0005,HoldingBreath
3,GDN0005,TiltUp
4,GDN0005,TiltDown
...,...,...
101,GDN0029,TiltUp
102,GDN0029,TiltDown
103,GDN0030,Valsalva
104,GDN0030,HoldingBreath


In [14]:
subset = dataset_guardian.get_subset(participant="GDN0005", phase="TiltUp")

labeling_borders = subset.labeling_borders
reference_labels_icg = subset.reference_labels_icg
reference_labels_ecg = subset.reference_labels_ecg
ecg_data = subset.ecg
icg_data = subset.icg

display(labeling_borders)
display(reference_labels_ecg)
#display(ecg_data)

Unnamed: 0_level_0,description,sample_absolute,sample_relative
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2018-02-16 10:45:04.940000+01:00,{'TiltUp': 'start'},79516,79516
2018-02-16 10:46:04.940000+01:00,{'TiltUp': 'end'},109516,109516


Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,sample_absolute,sample_relative
heartbeat_id,channel,label,Unnamed: 3_level_1,Unnamed: 4_level_1
0,heartbeat,start,79532,16
0,ECG,Q-wave_onset,79661,145
0,heartbeat,end,79959,443
1,heartbeat,start,79959,443
1,ECG,Q-wave_onset,80089,573
...,...,...,...,...
80,ECG,Q-wave_onset,108802,29286
80,heartbeat,end,109049,29533
81,heartbeat,start,109049,29533
81,ECG,Q-wave_onset,109154,29638


## EmpkinS Dataset

In [None]:
empkins_base_path = Path(config_dict[deploy_type]["empkins_path"])
empkins_base_path

In [None]:
dataset_empkins = EmpkinsDataset(empkins_base_path, use_cache=False, only_labeled=True)
dataset_empkins

In [None]:
subset = dataset_empkins.get_subset(participant="VP_001", condition="tsst", phase="Math")

labeling_borders = subset.labeling_borders
reference_labels_icg = subset.reference_labels_icg
reference_labels_ecg = subset.reference_labels_ecg
ecg_data = subset.ecg
icg_data = subset.icg

display(labeling_borders)
display(reference_labels_ecg)
display(ecg_data)

## Plot Tests

In [None]:
reference_labels_ecg

In [None]:
# fig, ax = plt.subplots()
#
# subset = dataset_empkins.get_subset(participant="VP_001", condition="tsst", phase="Pause_1")
#
# ecg_data = subset.ecg
# reference_labels_ecg = subset.reference_labels_ecg
# labeling_borders = subset.labeling_borders
#
# start_end_borders = reference_labels_ecg.xs("heartbeat", level="channel")
# qwave_labels = reference_labels_ecg.xs("Q-wave_onset", level="label")
#
# ecg_data.plot(ax=ax)
#
# ax.vlines(x=ecg_data.index[start_end_borders["sample_relative"]], ymin=0, ymax=1, colors=cmaps.tech[2], transform=ax.get_xaxis_transform())
# ax.scatter(x=ecg_data.index[qwave_labels["sample_relative"]], y=ecg_data["ecg"][ecg_data.index[qwave_labels["sample_relative"]]], color=cmaps.nat[0], zorder=3)
#
# fig.tight_layout()

## Signal Processing Tests

In [None]:
# subset = dataset_empkins.get_subset(participant="VP_001", condition="tsst", phase="Math")
# icg_data = subset.icg

# fig, ax = plt.subplots()
# icg_data.plot(ax=ax)
# fig.tight_layout()

In [None]:
subset = dataset_empkins.get_subset(participant="VP_001", condition="tsst", phase="Math")
ecg_data = subset.ecg

fig, ax = plt.subplots()
ecg_data.plot(ax=ax)
fig.tight_layout()

In [None]:
subset = dataset_empkins.get_subset(participant="VP_001", condition="tsst", phase="Pause_1")

In [None]:
subset = dataset_guardian.get_subset(participant="GDN0005", phase="Pause")
# subset.reference_pep

## Algorithm Tests

In [None]:
from biopsykit.signals.ecg.event_extraction import (
    QPeakExtractionVanLien2013,
)
from biopsykit.signals.ecg.segmentation import HeartbeatSegmentationNeurokit
from biopsykit.signals.icg.event_extraction import (
    BPointExtractionForouzanfar2018,
    CPointExtractionScipyFindPeaks,
)
from biopsykit.signals.icg.outlier_correction import (
    OutlierCorrectionForouzanfar2018,
    OutlierCorrectionLinearInterpolation,
)

In [None]:
# subset = dataset_empkins.get_subset(participant="VP_001", condition="tsst", phase="Pause_1")
subset = dataset_guardian.get_subset(participant="GDN0005", phase="Pause")

In [None]:
ecg = subset.ecg
icg = subset.icg
fs = subset.sampling_rate_ecg

In [None]:
fig, ax = plt.subplots()
icg.plot(ax=ax)

In [None]:
heartbeats = subset.heartbeats
heartbeats

In [None]:
ecg_extraction = QPeakExtractionVanLien2013(time_interval_ms=40, handle_missing_events="ignore")
ecg_extraction.extract(ecg=ecg, heartbeats=heartbeats, sampling_rate_hz=fs)
ecg_extraction.points_

In [None]:
heartbeat_segmentation = HeartbeatSegmentationNeurokit()

cpoint_extraction = CPointExtractionScipyFindPeaks()
cpoint_extraction.extract(icg=icg, heartbeats=heartbeats, sampling_rate_hz=fs)

display(cpoint_extraction.points_)

bpoint_extraction = BPointExtractionForouzanfar2018(correct_outliers=True)
bpoint_extraction.extract(
    icg=icg, heartbeats=heartbeats, c_points=cpoint_extraction.points_, sampling_rate_hz=fs
)
display(bpoint_extraction.points_)

outlier_correction = OutlierCorrectionLinearInterpolation()
outlier_correction.correct_outlier(
    b_points=bpoint_extraction.points_, c_points=cpoint_extraction.points_, sampling_rate_hz=fs
)
outlier_correction.points_

In [None]:
from pepbench.pipelines import (
    PepExtractionPipeline,
    PepExtractionPipelineReferenceBPoints,
    PepExtractionPipelineReferenceQPeaks,
)

In [None]:
heartbeat_segmentation_algo = HeartbeatSegmentationNeurokit()
q_wave_algo = QPeakExtractionVanLien2013()
b_point_algo = BPointExtractionForouzanfar2018(correct_outliers=True)
outlier_correction_algo = OutlierCorrectionForouzanfar2018()
# outlier_correction_algo = OutlierCorrectionDummy()

In [None]:
pipeline_qpoints = PepExtractionPipelineReferenceQPeaks(
    heartbeat_segmentation_algo=heartbeat_segmentation_algo,
    q_peak_algo=q_wave_algo,
    b_point_algo=b_point_algo,
    outlier_correction_algo=outlier_correction_algo,
    handle_missing_events="ignore",
)

In [None]:
pipeline_bpoints = PepExtractionPipelineReferenceBPoints(
    heartbeat_segmentation_algo=heartbeat_segmentation,
    q_peak_algo=q_wave_algo,
    b_point_algo=b_point_algo,
    outlier_correction_algo=outlier_correction_algo,
    handle_missing_events="ignore",
)

In [None]:
pipeline = PepExtractionPipeline(
    heartbeat_segmentation_algo=heartbeat_segmentation_algo,
    q_peak_algo=q_wave_algo,
    b_point_algo=b_point_algo,
    outlier_correction_algo=outlier_correction_algo,
    handle_negative_pep="nan",
    handle_missing_events="ignore",
)

In [None]:
subset_guardian = dataset_guardian.get_subset(participant=["GDN0005", "GDN0006"])
pep_challenge = PepEvaluationChallenge(dataset=subset_guardian)

pep_challenge.run(pipeline)

In [None]:
pep_challenge.results_as_df()

In [None]:
pep_challenge.results_agg_mean_std_

In [None]:
# subset_test = dataset_empkins.get_subset(participant="VP_002", condition="ftsst", phase="Talk")
# match_heartbeat_lists(heartbeats_reference=subset_test.reference_heartbeats, heartbeats_extracted=subset_test.heartbeats, tolerance_ms=20, sampling_rate_hz=subset_test.sampling_rate_ecg)

In [None]:
fig, ax = plt.subplots()

icg_data = subset.icg
reference_labels_icg = subset.reference_labels_icg
labeling_borders = subset.labeling_borders

start_end_borders = reference_labels_icg.xs("heartbeat", level="channel")
bpoint_labels = reference_labels_icg.xs("B-point", level="label")
# bpoint_artefacts = reference_labels_icg.xs("Artefact", level="label")
bpoint_labels_algo = bpoint_extraction.points_["b_point_sample"].dropna()
bpoint_labels_outlier = outlier_correction.points_["b_point_sample"].dropna()

icg_data.plot(ax=ax)

ax.vlines(
    x=icg_data.index[start_end_borders["sample_relative"]],
    ymin=0,
    ymax=1,
    colors=cmaps.tech[2],
    transform=ax.get_xaxis_transform(),
)
ax.scatter(
    x=icg_data.index[bpoint_labels["sample_relative"]],
    y=icg_data["icg_der"][icg_data.index[bpoint_labels["sample_relative"]]],
    color=cmaps.nat[0],
    zorder=3,
    label="Reference",
)
# ax.scatter(x=icg_data.index[bpoint_artefacts["sample_relative"]], y=icg_data["icg_der"][icg_data.index[bpoint_artefacts["sample_relative"]]], color=cmaps.med[0], zorder=3, label="Artefacts")
ax.scatter(
    x=icg_data.index[bpoint_labels_algo.astype(int)],
    y=icg_data["icg_der"][icg_data.index[bpoint_labels_algo.astype(int)]],
    color=cmaps.phil[0],
    zorder=3,
    label="Algorithm",
)
ax.scatter(
    x=icg_data.index[bpoint_labels_outlier.astype(int)],
    y=icg_data["icg_der"][icg_data.index[bpoint_labels_outlier.astype(int)]],
    color=cmaps.wiso[0],
    zorder=3,
    label="Outlier Correction",
)

ax.legend()

fig.tight_layout()

In [None]:
fig, ax = plt.subplots()

ecg_data = subset.ecg
reference_labels_ecg = subset.reference_labels_ecg
labeling_borders = subset.labeling_borders

start_end_borders = reference_labels_ecg.xs("heartbeat", level="channel")
qwave_labels = reference_labels_ecg.xs("Q-wave_onset", level="label")
qwave_labels_algo = ecg_extraction.points_["q_peak_sample"].dropna()

ecg_data.plot(ax=ax)

ax.vlines(
    x=ecg_data.index[start_end_borders["sample_relative"]],
    ymin=0,
    ymax=1,
    colors=cmaps.tech[2],
    transform=ax.get_xaxis_transform(),
)
ax.scatter(
    x=ecg_data.index[qwave_labels["sample_relative"]],
    y=ecg_data["ecg"][ecg_data.index[qwave_labels["sample_relative"]]],
    color=cmaps.nat[0],
    zorder=3,
)
ax.scatter(
    x=ecg_data.index[qwave_labels_algo.astype(int)],
    y=ecg_data["ecg"][ecg_data.index[qwave_labels_algo.astype(int)]],
    color=cmaps.phil[0],
    zorder=3,
)

fig.tight_layout()