Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

new function in baseline module and added testing #298

Merged
merged 2 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 112 additions & 69 deletions resurfemg/postprocessing/baseline.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,57 +11,48 @@


def moving_baseline(
emg_env,
window_s,
start_s,
end_s,
emg_sample_rate,
set_percentile=33,
emg_env,
window_s,
step_s,
set_percentile=33,
):
"""This function calculates a moving baseline from a filtered EMG
envelope in accordance with Graßhoff et al. (2021)
:param emg_env: filtered envelope signal of EMG data
:type emg_env: ~numpy.ndarray
:param window_s: window length in samples
:type window_s: int
:param start_s: start sample of selected window
:type start_s: int
:param end_s: end sample of selected window
:type end_s: int
:param emg_sample_rate: sample rate from recording
:type emg_sample_rate: int
:param step_s: number of consecutive samples with the same baseline
value
:type step_s: int
:param: set_percentile
:type: numpy percentile

:returns: The rolling baseline for the filtered EMG data
:rtype: ~numpy.ndarray
"""

baseline_w_emg = int(window_s*emg_sample_rate) # window length

rolling_baseline = np.zeros(
(len(emg_env[int(start_s):int(end_s)]), ))
rolling_baseline = np.zeros((len(emg_env), ))

for idx in range(0, int(end_s)-int(start_s), int(emg_sample_rate/5)):
start_i = max([int(start_s), int(start_s)+idx-int(baseline_w_emg/2)])
end_i = min([int(end_s), int(start_s)+idx+int(baseline_w_emg/2)])
baseline_value_emg_di = np.percentile(
for idx in range(0, len(emg_env), step_s):
start_i = max([0, idx-int(window_s/2)])
end_i = min([len(emg_env), idx + int(window_s/2)])
baseline_value_y = np.percentile(
emg_env[start_i:end_i], set_percentile)

for i in range(idx, min([idx+int(emg_sample_rate/5),
int(end_s) - int(start_s)])):
rolling_baseline[i] = baseline_value_emg_di

for i in range(idx, min([idx + step_s, len(emg_env)])):
rolling_baseline[i] = baseline_value_y
return rolling_baseline


def slopesum_baseline(
emg_env,
window_s,
start_s,
end_s,
emg_sample_rate,
set_percentile=33
emg_env,
window_s,
step_s,
emg_sample_rate,
set_percentile=33,
augm_percentile=25,
ma_window=None,
perc_window=None,
):
"""This function calculates the augmented version of the moving baseline
from a filtered EMG, using a slope sum
Expand All @@ -70,59 +61,111 @@ def slopesum_baseline(
:type emg_env: ~numpy.ndarray
:param window_s: window length in seconds
:type window_s: int
:param start_s: start sample of selected window
:type start_s: int
:param end_s: end sample of selected window
:type end_s: int
:param step_s: number of consecutive samples with the same baseline
value
:type step_s: int
:param emg_sample_rate: sample rate from recording
:type emg_sample_rate: int
:param: set_percentile
:type: numpy percentile

:param set_percentile
:type set_percentile: float (0-100)
:param ma_window: moving average window in samples for average dy/dt
:type ma_window: int
:param perc_window: number of consecutive samples with the same
baseline value
:type perc_window: int
:returns: The slopesum baseline for the filtered EMG data
:rtype: ~numpy.ndarray
"""

if ma_window is None:
ma_window = emg_sample_rate//2
print(ma_window)

if perc_window is None:
perc_window = emg_sample_rate
print(perc_window)

# 1. call the Graßhoff version function for moving baseline
rolling_baseline = moving_baseline(emg_env, window_s, 0*emg_sample_rate,
50*emg_sample_rate, emg_sample_rate)
rolling_baseline = moving_baseline(
emg_env,
window_s,
step_s,
set_percentile)

# 2. Calculate the augmented moving baseline for the sEAdi data
# 2.a. Rolling standard deviation and mean over provided window length
baseline_w_emg = int(window_s * emg_sample_rate) # window length
# baseline_w_emg = int(window_s * emg_sample_rate) # window length

di_baseline_series = pd.Series(rolling_baseline)
di_baseline_std = di_baseline_series.rolling(baseline_w_emg,
min_periods=1,
center=True).std().values
di_baseline_mean = di_baseline_series.rolling(baseline_w_emg,
min_periods=1,
center=True).mean().values
y_baseline_series = pd.Series(rolling_baseline)
y_baseline_std = y_baseline_series.rolling(window_s,
min_periods=1,
center=True).std().values
y_baseline_mean = y_baseline_series.rolling(window_s,
min_periods=1,
center=True).mean().values

# 2.b. Augmented signal: EMG + abs([dEMG/dt]_smoothed)
ma_window = emg_sample_rate//2
augmented_perc = 25
perc_window = emg_sample_rate

y_di_rms = emg_env[int(start_s):int(end_s)]
s_di = pd.Series(y_di_rms - rolling_baseline)
seadi_MA = s_di.rolling(window=ma_window, center=True).mean().values
dseadi_dt = (seadi_MA[1:] - seadi_MA[:-1]) * emg_sample_rate
seadi_aug = y_di_rms[:-1] + np.abs(dseadi_dt)
s_di = pd.Series(emg_env - rolling_baseline)
y_ma = s_di.rolling(window=ma_window, center=True).mean().values
dy_dt = (y_ma[1:] - y_ma[:-1]) * emg_sample_rate
y_aug = emg_env[:-1] + np.abs(dy_dt)

# 2.c. Run the moving median filter over the augmented signal to obtain
# the baseline
slopesum_baseline = np.zeros(
(len(emg_env[int(start_s):int(end_s)-1]), ))
_slopesum_baseline = np.zeros((len(emg_env), ))

for idx in range(0, int(end_s-1)-int(start_s), perc_window):
start_i = max([0, idx-int(baseline_w_emg)])
end_i = min([int(end_s-start_s-1), idx+int(baseline_w_emg)])
for idx in range(0, len(emg_env), perc_window):
start_i = max([0, idx-int(window_s)])
end_i = min([len(emg_env)-1, idx + int(window_s)])

baseline_value_emg_di = np.nanpercentile(
seadi_aug[start_i:end_i], augmented_perc)
for i in range(idx, min([idx+int(perc_window),
int(end_s-1)-int(start_s)])):
slopesum_baseline[i] = 1.2 * baseline_value_emg_di
baseline_value_y = np.nanpercentile(
y_aug[start_i:end_i], augm_percentile)
for i in range(idx, min([idx+int(perc_window), len(emg_env)-1])):
_slopesum_baseline[i] = 1.2 * baseline_value_y

return (slopesum_baseline, di_baseline_mean,
di_baseline_std, di_baseline_series)
_slopesum_baseline[i+1] = _slopesum_baseline[i]
return (_slopesum_baseline, y_baseline_mean,
y_baseline_std, y_baseline_series)


def onoffpeak_baseline(
emg_env,
baseline,
peak_idxs
):
"""This function calculates the peaks of each breath using the
slopesum baseline from a filtered EMG

:param emg_env: filtered envelope signal of EMG data
:type emg_env: ~numpy.ndarray
:param baseline: baseline signal of EMG data for baseline detection
:type baseline: ~numpy.ndarray
:param peak_idxs: list of peak indices for which to find on- and offset
:type peak_idxs: ~numpy.ndarray
:returns: peak_idxs, peak_start_idxs, peak_end_idxs
:rtype: list
"""

# Detect the sEAdi on- and offsets
baseline_crossings_idx = np.nonzero(
np.diff(np.sign(emg_env - baseline)) != 0)[0]

peak_start_idxs = np.zeros((len(peak_idxs),), dtype=int)
peak_end_idxs = np.zeros((len(peak_idxs),), dtype=int)
for peak_nr, peak_idx in enumerate(peak_idxs):
delta_samples = peak_idx - baseline_crossings_idx[
baseline_crossings_idx < peak_idx]
if len(delta_samples) < 1:
peak_start_idxs[peak_nr] = 0
peak_end_idxs[peak_nr] = baseline_crossings_idx[
baseline_crossings_idx > peak_idx][0]
else:
a = np.argmin(delta_samples)

peak_start_idxs[peak_nr] = int(baseline_crossings_idx[a])
if a < len(baseline_crossings_idx) - 1:
peak_end_idxs[peak_nr] = int(baseline_crossings_idx[a+1])
else:
peak_end_idxs[peak_nr] = len(emg_env) - 1

return (peak_idxs, peak_start_idxs, peak_end_idxs)
88 changes: 83 additions & 5 deletions tests/postprocessing_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import unittest
import os
import numpy as np
import scipy
# helper_functions
from resurfemg.postprocessing.features import entropy_scipy
from resurfemg.postprocessing.features import pseudo_slope
Expand All @@ -12,7 +13,9 @@
from resurfemg.postprocessing.features import times_under_curve
from resurfemg.postprocessing.features import find_peak_in_breath
from resurfemg.postprocessing.features import variability_maker

from resurfemg.postprocessing.baseline import moving_baseline
from resurfemg.postprocessing.baseline import slopesum_baseline
from resurfemg.postprocessing.baseline import onoffpeak_baseline

sample_emg = os.path.join(
os.path.abspath(os.path.dirname(os.path.dirname(__file__))),
Expand Down Expand Up @@ -58,7 +61,6 @@ def test_variability_maker_std(self):
)

class TestArrayMath(unittest.TestCase):


def test_simple_area_under_curve(self):
sample_array= np.array(
Expand All @@ -79,7 +81,7 @@ def test_area_under_curve(self):
counted,
28,
)

def test_times_under_curve(self):
sample_array= np.array(
[0,1,2,3,1,5,6,-5,8,9,20,11,12,13,4,5,6,1,1,1,0]
Expand Down Expand Up @@ -118,7 +120,83 @@ def test_find_peak_in_breath_convy(self):
self.assertEqual(
peak,
(8,10, 11.5)
)

)


class TestBaseline(unittest.TestCase):
fs = 1000
t = np.arange(0, 10, 1/fs)
slow_component = np.sin(2 * np.pi * 0.1 * t)
fast_component = 0.5 * np.sin(2 * np.pi * 0.5 * t)
breathing_signal = np.abs(slow_component + fast_component)

def test_movingbaseline(self):
sinusbase = moving_baseline(self.breathing_signal,
self.fs,
self.fs//5,
33)
self.assertEqual(
(len(self.breathing_signal)),
len(sinusbase),
)

def test_slopesum(self):
sinusbase, _, _, _ = slopesum_baseline(
self.breathing_signal,
self.fs,
self.fs//5,
self.fs,
set_percentile=33,
augm_percentile=25)

self.assertEqual(
(len(self.breathing_signal)),
len(sinusbase),
)

def test_onoffpeak_starts(self):
baseline = 0.5 * np.ones((len(self.breathing_signal), ))
treshold = 0
width = self.fs // 2
prominence = 0.5 * \
(np.nanpercentile(self.breathing_signal - baseline, 75)
+ np.nanpercentile(self.breathing_signal - baseline, 50))

peak_idxs, _ = scipy.signal.find_peaks(
self.breathing_signal,
height=treshold,
prominence=prominence,
width=width)

_, peak_start_idxs, _ = onoffpeak_baseline(
self.breathing_signal, baseline, peak_idxs)

self.assertEqual(
len(peak_idxs),
len(peak_start_idxs),
)

def test_onoffpeak_ends(self):
baseline = 0.5 * np.ones((len(self.breathing_signal), ))
treshold = 0
width = self.fs // 2
prominence = 0.5 * \
(np.nanpercentile(self.breathing_signal - baseline, 75)
+ np.nanpercentile(self.breathing_signal - baseline, 50))

peak_idxs, _ = scipy.signal.find_peaks(
self.breathing_signal,
height=treshold,
prominence=prominence,
width=width)

_, _, peak_end_idxs = onoffpeak_baseline(
self.breathing_signal, baseline, peak_idxs)

self.assertEqual(
len(peak_idxs),
len(peak_end_idxs),
)

if __name__ == '__main__':
unittest.main()
Loading