In [27]:
import pandas as pd
import numpy as np
import neurokit2 as nk
import matplotlib.pyplot as plt
import scipy.signal as sp_sig
import scipy.fft as sp_fft
from te import transfer_entropy
from cjte import calculate_cjte
import re

In [28]:
def calculate_fundamental_component(
    signal: np.array, fs: float, low_f=0.66, high_f=3
) -> tuple[float, float]:
    """
    Calculates fundamental component of signal in given frequency range
    :param signal: signal
    :param fs: sampling frequency
    :param low_f: lower frequency range
    :param high_f: higher frequency range
    :return: fundamental frequency and its amplitude
    """
    n_fft = len(signal)
    win_fft_amp = sp_fft.rfft(
        sp_sig.detrend(signal) * sp_sig.windows.hann(n_fft), n=n_fft
    )
    corr_factor = n_fft / np.sum(sp_sig.windows.hann(n_fft))
    win_fft_amp = abs(win_fft_amp) * 2 / n_fft * corr_factor

    win_fft_f = sp_fft.rfftfreq(n_fft, d=1 / fs)
    f_low = int(low_f * n_fft / fs)
    f_upp = int(high_f * n_fft / fs)
    win_fft_amp_range = win_fft_amp[f_low:f_upp]
    fund_idx = np.argmax(win_fft_amp_range) + f_low

    fund_f = win_fft_f[fund_idx]
    fund_amp = win_fft_amp[fund_idx]
    return fund_f, fund_amp

def calculate_mean_HR(signal: np.array, fs: float = 200) -> float:
    """
    Calculates mean HR from abp signal
    :param signal: abp signal
    :param fs: sampling frequency
    """
    c_f1, amp_abp = calculate_fundamental_component(signal, fs)
    HR = c_f1 * 60
    return HR

In [29]:
def find_peaks(
    signal: np.array,
    sampling_rate: int = 200,
    mindelay: float = 0.3,
) -> np.array:    
    # Fill missing values
    filled_signal = nk.signal_fillmissing(signal)
    cleaned_signal = nk.ppg_clean(filled_signal, sampling_rate=sampling_rate, method="elgendi")
    peaks_up = nk.ppg_findpeaks(
        cleaned_signal, sampling_rate=sampling_rate, method="elgendi", mindelay=mindelay
    )['PPG_Peaks']
    peaks_down = nk.ppg_findpeaks(
        cleaned_signal * -1, sampling_rate=sampling_rate, method="elgendi", mindelay=mindelay
    )['PPG_Peaks']
    return peaks_up, peaks_down

In [30]:
def get_hp(peaks: np.array, sampling_rate: int = 200):
    rr = np.diff(peaks) / sampling_rate
    hp = 1 / rr
    return hp

def get_sap(signal, peaks):
    sap = np.array([signal[peak] for peak in peaks])[1:] # skip first peak to match length of hp
    return sap

def get_map(signal, peaks_up, peaks_down):
    peaks = np.sort(np.concatenate((peaks_up, peaks_down)))
    first_peak = 0 if peaks[0] < peaks[1] else 1

    # MAP = (2*DP + SP) / 3
    map_ = []
    for i in range(first_peak, len(peaks)-2, 2):
        dp = signal[peaks[i]]
        sp = signal[peaks[i+1]]
        map_.append((2*dp + sp) / 3)
    
    return np.array(map_)

def get_mcbfv(signal, peaks):
    mcbfv = np.array([np.mean(signal[peaks[i-1]:peaks[i]]) for i, _ in enumerate(peaks) if i > 0])
    return mcbfv

In [31]:
def read_fv(df):
    fvl = df['fvl'].values
    fvr = df['fvr'].values
    fvl_mean = np.mean(fvl)
    fvr_mean = np.mean(fvr)

    if fvl_mean > 30:
        return fvl
    elif fvr_mean > 30:
        return fvr
    else:
        return None

In [32]:
def plot_signals(signals):
    fig, axs = plt.subplots(5, 1, figsize=(20, 10))
    for i, (key, value) in enumerate(signals.items()):
        axs[i].plot(value)
        axs[i].set_title(key)
    plt.show()

In [33]:
def get_signals(file_location, sampling_rate: int = 200, mindelay: float = 0.3, reference: str = 'etco2[mmHg]'):
    signals = {}
    df = pd.read_csv(file_location, sep=';', decimal=',')
    abp = df['abp_cnap[mmHg]'].values
    fv = read_fv(df)
    if fv is None:
        print('No FV signal found')
        return None
        
    abp_peaks_up, abp_peaks_down = find_peaks(abp, sampling_rate, mindelay)
    fv_peaks_up, fv_peaks_down = find_peaks(fv, sampling_rate, mindelay)

    signals['ABP_mean'] = np.mean(abp)
    signals['FV_mean'] = np.mean(fv)
    signals['HR_mean'] = calculate_mean_HR(abp, sampling_rate)
    signals['ETCO2_mean'] = np.mean(pd.read_csv(file_location, sep=';', decimal=',')['etco2[mmHg]'].values)
    signals['HP'] = get_hp(abp_peaks_up)
    signals['MAP'] = get_map(abp, abp_peaks_up, abp_peaks_down)
    signals['MCBFV'] = get_mcbfv(fv, fv_peaks_down)
    signals['R_mean'] = np.mean(df['rr[rpm]'].values)
    signals['reference'] = df[reference].values
    if any(np.isnan(signals['reference'])):
        print('reference signal contains NaN values')
        return None
    return signals

In [34]:
def get_entropies(signals, baseline_reference):
    e = {}
    te = {}
    cjte = {}
    normalised_r = signals["reference"] / baseline_reference
    r_widnowed = np.array_split(normalised_r, len(signals["HP"]))
    signalsr_matched = np.array([np.mean(window) for window in r_widnowed])
    min_len = min(len(signals["MAP"]), len(signals["MCBFV"]))
    map_shorted = signals["MAP"][:min_len]
    mcbfv_shorted = signals["MCBFV"][:min_len]
    r_widnowed = np.array_split(normalised_r, len(map_shorted))
    signals_to_r_matched = np.array([np.mean(window) for window in r_widnowed])

    e['MAP'] = nk.entropy_sample(map_shorted)[0]
    e['MCBFV'] = nk.entropy_sample(mcbfv_shorted)[0]
    e['reference'] = nk.entropy_sample(signalsr_matched)[0]
    te['MAP->MCBFV'] = transfer_entropy(mcbfv_shorted, map_shorted)
    te['MCBFV->MAP'] = transfer_entropy(map_shorted, mcbfv_shorted)
    cjte['MAP->MCBFV'] = calculate_cjte(mcbfv_shorted, map_shorted, signals_to_r_matched)
    cjte['MCBFV->MAP'] = calculate_cjte(map_shorted, mcbfv_shorted, signals_to_r_matched)

    return e, te, cjte

In [35]:
class Result:
    def __init__(self, id, abp, fv, hr, etco2, r, map, mcbfv, hp, e, te, cjte):
        self.id = id
        self.abp = abp
        self.fv = fv
        self.hr = hr
        self.etco2 = etco2
        self.r = r
        self.map = map
        self.mcbfv = mcbfv
        self.hp = hp
        self.e_map = e['MAP']
        self.e_mcbfv = e['MCBFV']
        self.e_reference = e['reference']
        self.te_mfv_on_map = te['MCBFV->MAP']
        self.te_map_on_mfv = te['MAP->MCBFV']
        self.cjte_mfv_on_map = cjte['MCBFV->MAP']
        self.cjte_map_on_mfv = cjte['MAP->MCBFV']

In [36]:
valid_datasets = [i for i in range(1, 38) if i not in [3, 4, 7, 14, 15, 10, 11, 17, 20, 24, 27, 28, 35]]

location_baseline = lambda on: f'data/CONTROL_BREATHING_RECORDINGS/OCHOTNIK_{on}/CLEAN_DATA_BASELINE/OCH_{on}_CLEAN_CB_BASELINE.csv'
location_6 = lambda on: f'data/CONTROL_BREATHING_RECORDINGS/OCHOTNIK_{on}/CLEAN_DATA_6_BREATHS/OCH_{on}_CLEAN_CB_6.csv'
location_10 = lambda on: f'data/CONTROL_BREATHING_RECORDINGS/OCHOTNIK_{on}/CLEAN_DATA_10_BREATHS/OCH_{on}_CLEAN_CB_10.csv'
location_15 = lambda on: f'data/CONTROL_BREATHING_RECORDINGS/OCHOTNIK_{on}/CLEAN_DATA_15_BREATHS/OCH_{on}_CLEAN_CB_15.csv'

In [37]:
results_baseline= []
results_6breaths = []
results_10breaths = []
results_15breaths = []
reference_sig = 'etco2[mmHg]'
for res_g, loc in zip([results_baseline, results_6breaths, results_10breaths, results_15breaths],[location_baseline, location_6, location_10, location_15]):
    for on in valid_datasets:
        location = loc(on)
        print(f'{location}')
        baseline_reference = np.mean(pd.read_csv(location_baseline(on), sep=';', decimal=',')[reference_sig].values)
        signals = get_signals(location)
        if signals is None or baseline_reference is None:
            print(f'Error in dataset {on}')
            continue
        e_results, te_results, cjte_results = get_entropies(signals, baseline_reference)
        result = Result(
            on,
            signals['ABP_mean'], 
            signals['FV_mean'],
            signals['HR_mean'],
            signals['ETCO2_mean'],
            signals['R_mean'],
            np.mean(signals['MAP']),
            np.mean(signals['MCBFV']),
            np.mean(signals['HP']),
            e_results,
            te_results,
            cjte_results
        )
        res_g.append(result)

data/CONTROL_BREATHING_RECORDINGS/OCHOTNIK_1/CLEAN_DATA_BASELINE/OCH_1_CLEAN_CB_BASELINE.csv
data/CONTROL_BREATHING_RECORDINGS/OCHOTNIK_2/CLEAN_DATA_BASELINE/OCH_2_CLEAN_CB_BASELINE.csv
data/CONTROL_BREATHING_RECORDINGS/OCHOTNIK_5/CLEAN_DATA_BASELINE/OCH_5_CLEAN_CB_BASELINE.csv
data/CONTROL_BREATHING_RECORDINGS/OCHOTNIK_6/CLEAN_DATA_BASELINE/OCH_6_CLEAN_CB_BASELINE.csv
data/CONTROL_BREATHING_RECORDINGS/OCHOTNIK_8/CLEAN_DATA_BASELINE/OCH_8_CLEAN_CB_BASELINE.csv
data/CONTROL_BREATHING_RECORDINGS/OCHOTNIK_9/CLEAN_DATA_BASELINE/OCH_9_CLEAN_CB_BASELINE.csv
data/CONTROL_BREATHING_RECORDINGS/OCHOTNIK_12/CLEAN_DATA_BASELINE/OCH_12_CLEAN_CB_BASELINE.csv
data/CONTROL_BREATHING_RECORDINGS/OCHOTNIK_13/CLEAN_DATA_BASELINE/OCH_13_CLEAN_CB_BASELINE.csv
data/CONTROL_BREATHING_RECORDINGS/OCHOTNIK_16/CLEAN_DATA_BASELINE/OCH_16_CLEAN_CB_BASELINE.csv
data/CONTROL_BREATHING_RECORDINGS/OCHOTNIK_18/CLEAN_DATA_BASELINE/OCH_18_CLEAN_CB_BASELINE.csv
data/CONTROL_BREATHING_RECORDINGS/OCHOTNIK_19/CLEAN_DATA_BASEL

In [38]:
# Save results to csv for each breathing type
df_baseline = pd.DataFrame([vars(res) for res in results_baseline])
df_baseline.to_csv(f'biomeeting/results_{reference_sig}_baseline.csv', index=False)
df_6breaths = pd.DataFrame([vars(res) for res in results_6breaths])
df_6breaths.to_csv(f'biomeeting/results_{reference_sig}_6breaths.csv', index=False)
df_10breaths = pd.DataFrame([vars(res) for res in results_10breaths])
df_10breaths.to_csv(f'biomeeting/results_{reference_sig}_10breaths.csv', index=False)
df_15breaths = pd.DataFrame([vars(res) for res in results_15breaths])
df_15breaths.to_csv(f'biomeeting/results_{reference_sig}_15breaths.csv', index=False)