# Preparation

In [1]:
# data science
import pandas as pd
import numpy as np

# signal processing
from scipy import signal
from scipy.ndimage import label
from scipy.stats import zscore
from scipy.interpolate import interp1d
from scipy.integrate import trapz

import pythermalcomfort as ptc
import warnings

In [2]:
data = pd.read_excel("../9_plotting_report/HR_compilation.xlsx")

# Insert PMV into V6

In [3]:
master_data = pd.read_excel("../0_dataset_master/Experiment_edited_V6.xlsx")

In [6]:
master_data['pmv'] = master_data.apply(lambda row: ptc.models.pmv(row['T'], row['Tgl'], row['v_adj'], row['H'], 1, row['clo'], limit_inputs=False), axis=1)

# Frequency Domain Analysis

In [7]:
def frequency_domain(rri, fs):
    # Estimate the spectral density using Welch's method
    fxx, pxx = signal.welch(x=rri, fs=fs)
    
    '''
    Segement found frequencies in the bands 
     - Very Low Frequency (VLF): 0-0.04Hz 
     - Low Frequency (LF): 0.04-0.15Hz 
     - High Frequency (HF): 0.15-0.4Hz
    '''
    cond_vlf = (fxx >= 0) & (fxx < 0.04)
    cond_lf = (fxx >= 0.04) & (fxx < 0.15)
    cond_hf = (fxx >= 0.15) & (fxx < 0.4)
    
    # calculate power in each band by integrating the spectral density 
    vlf = trapz(pxx[cond_vlf], fxx[cond_vlf])
    lf = trapz(pxx[cond_lf], fxx[cond_lf])
    hf = trapz(pxx[cond_hf], fxx[cond_hf])
    
    # sum these up to get total power
    total_power = vlf + lf + hf

    # find which frequency has the most power in each band
    peak_vlf = fxx[cond_vlf][np.argmax(pxx[cond_vlf])]
    peak_lf = fxx[cond_lf][np.argmax(pxx[cond_lf])]
    peak_hf = fxx[cond_hf][np.argmax(pxx[cond_hf])]

    # fraction of lf and hf
    lf_nu = 100 * lf / (lf + hf)
    hf_nu = 100 * hf / (lf + hf)
    
    results = {}
    results['Power VLF (ms2)'] = vlf
    results['Power LF (ms2)'] = lf
    results['Power HF (ms2)'] = hf   
    results['Power Total (ms2)'] = total_power

    results['LF/HF'] = (lf/hf)
    results['Peak VLF (Hz)'] = peak_vlf
    results['Peak LF (Hz)'] = peak_lf
    results['Peak HF (Hz)'] = peak_hf

    results['Fraction LF (nu)'] = lf_nu
    results['Fraction HF (nu)'] = hf_nu
    return results, fxx, pxx

In [8]:
def FDA(rr, fs, correction=False):
    if correction:
        rr = rr.copy()
        rr[np.abs(zscore(rr)) > 2] = np.median(rr)

    # create interpolation function based on the rr-samples. 
    x = np.cumsum(rr) / 1000.0
    f = interp1d(x, rr, kind='cubic')

    # sample rate for interpolation
    steps = 1 / fs

    # now we can sample from interpolation function
    xx = np.arange(1, np.max(x), steps)
    try:
        rr_interpolated = f(xx)
    except Exception as e:
        xx = np.arange(min(x), np.max(x), steps)
        rr_interpolated = f(xx)
    results, fxx, pxx = frequency_domain(rr_interpolated, fs)
    return results

## Insert HRV into V7

In [9]:
new = master_data['Time_thermal'].str.split("-", n=1, expand=True)
master_data['New_Start_Date'] = pd.to_datetime(master_data['Start Date']).dt.date
master_data['New_End_Date'] = pd.to_datetime(master_data['Start Date']).dt.date
master_data['New_Start_Date'] = master_data['New_Start_Date'].astype(str) + ' ' + new[0]
master_data['New_End_Date'] = master_data['New_End_Date'].astype(str) + ' ' + new[1]
master_data['New_Start_Date'] = master_data['New_Start_Date'].astype('datetime64[ms]')
master_data['New_End_Date'] = master_data['New_End_Date'].astype('datetime64[ms]')

In [10]:
hrv = [
        'Power VLF (ms2)', 'Power LF (ms2)','Power HF (ms2)','Power Total (ms2)',
        'LF/HF', 'Fraction LF (nu)', 'Fraction HF (nu)', 'rr_mean'
    ]

result = []
for index, row in master_data.iterrows():
    # print(row['New_Start_Date'])
    # print(row['New_End_Date'])
    sliced = data[(row['New_Start_Date']<=data['Time']) & (row['New_End_Date']>=data['Time'])]
    # print(f"Slice data not found: {timestamp}")
    # print(sliced)
    # break
    try:
        results = FDA(sliced.RR, 1)
    except Exception as e:
        print(sliced)
        print(e)
        raise e
    resTemp = []
    for item in hrv:
        if item == "rr_mean":
            resTemp.append(sliced.RR.mean())
            continue
        resTemp.append(results[item])
    result.append(resTemp)

master_data[hrv] = result

In [11]:
master_data.to_excel("../0_dataset_master/Experiment_edited_V7_rr_without_correction.xlsx", index=False)

In [12]:
hrv = [
        'Power VLF (ms2)', 'Power LF (ms2)','Power HF (ms2)','Power Total (ms2)',
        'LF/HF', 'Fraction LF (nu)', 'Fraction HF (nu)', 'rr_mean'
    ]

result = []
for index, row in master_data.iterrows():
    # print(row['New_Start_Date'])
    # print(row['New_End_Date'])
    sliced = data[(row['New_Start_Date']<=data['Time']) & (row['New_End_Date']>=data['Time'])]
    # print(f"Slice data not found: {timestamp}")
    # print(sliced)
    # break
    try:
        results = FDA(sliced.RR, 1, True)
    except Exception as e:
        print(sliced)
        print(e)
        raise e
    resTemp = []
    for item in hrv:
        if item == "rr_mean":
            resTemp.append(sliced.RR.mean())
            continue
        resTemp.append(results[item])
    result.append(resTemp)

master_data[hrv] = result

In [13]:
master_data.to_excel("../0_dataset_master/Experiment_edited_V7_rr_with_correction.xlsx", index=False)