In [101]:
from typing import List

import os
import warnings

from scipy.signal import resample
from sklearn.preprocessing import MinMaxScaler, StandardScaler

import neurokit2 as nk
import numpy as np
import pandas as pd

In [102]:
warnings.simplefilter("ignore")

In [103]:
CLEAN_DATA_DIR = "clean_data"
FINAL_DATA_DIR = "final_data"

DOMAINS = ("temp", "hr", "gsr", "rr")

TEMP = 0
HR = 1
GSR = 2
RR = 3

L_DOMAIN = 30

In [104]:
def extract_stat_features(segments: pd.DataFrame, domain: str) -> pd.DataFrame:
    basic_features: List[str] = [
        "mean",
        "std",
        "skew",
        "kurtosis",
        "diff",
        "diff2",
        "q25",
        "q75",
        "qdev",
        "max-min",
    ]

    feature_names: List[str] = [f"{domain}_{feature}" for feature in basic_features]

    values: np.ndarray = np.column_stack(
        [
            segments.mean(axis=1).values,  # mean
            segments.std(axis=1).values,  # standard deviation
            segments.skew(axis=1).values,  # skewness
            segments.kurtosis(axis=1).values,  # kurtosis
            segments.diff(axis=1).mean(axis=1).values,  # 1st derivative mean
            segments.diff(axis=1)
            .diff(axis=1)
            .mean(axis=1)
            .values,  # 2nd derivative mean
            segments.quantile(0.25, axis=1).values,  # 25th quantile
            segments.quantile(0.75, axis=1).values,  # 75th quantile
            segments.quantile(0.75, axis=1).values
            - segments.quantile(0.25, axis=1).values,  # quartile deviation
            segments.max(axis=1).values - segments.min(axis=1).values,  # range
        ]
    )

    return pd.DataFrame(values, columns=feature_names)

In [105]:
def split_domains(
    segments: pd.DataFrame,
) -> List[pd.DataFrame]:
    # order = ["temp", "hr", "gsr", "rr"]
    domain_indices: List[int] = [0, 30, 60, 90]

    return [segments.iloc[:, i : i + L_DOMAIN] for i in domain_indices]

In [106]:
def extract_eda_features(df: pd.DataFrame) -> pd.DataFrame:
    feature_keys = [
        "SCR_Onsets",
        "SCR_Peaks",
        "SCR_Height",
        "SCR_Amplitude",
        "SCR_RiseTime",
        "SCR_Recovery",
        "SCR_RecoveryTime",
    ]

    # for each feature key we will calculate min, max and mean values
    feature_names = []
    for f in feature_keys:
        feature_names.append(f"min_{f}")
        feature_names.append(f"max_{f}")
        feature_names.append(f"mean_{f}")

    # iterate through all 30-second segments
    features_arr = []
    for i in range(len(df)):
        my_eda = df.iloc[i].dropna()
        my_eda_resampled = resample(
            my_eda.values, len(my_eda.values) * 10
        )  # upsampling (neurokit requires 10Hz sampling frequency)
        signals, info = nk.eda_process(my_eda_resampled, sampling_rate=10)

        segment_features = []
        for k in feature_keys:
            feature_min = 0
            feature_max = 0
            feature_mean = 0

            values = info[k]
            values = values[~np.isnan(values)]
            if (
                len(values) > 0
            ):  # update feature-values if there is at least 1 detected value (e.g., at least one peak), else leave 0
                feature_min = np.min(values)
                feature_max = np.max(values)
                feature_mean = np.mean(values)
            segment_features.extend([feature_min, feature_max, feature_mean])
        features_arr.append(segment_features)

    return pd.DataFrame(features_arr, columns=feature_names)

In [107]:
def extract_hrv_features(df: pd.DataFrame) -> pd.DataFrame:
    feature_names = [
        "HRV_RMSSD",
        "HRV_MeanNN",
        "HRV_SDNN",
        "HRV_SDSD",
        "HRV_CVNN",
        "HRV_CVSD",
        "HRV_MedianNN",
        "HRV_MadNN",
        "HRV_MCVNN",
        "HRV_IQRNN",
        "HRV_pNN50",
        "HRV_pNN20",
        "HRV_TINN",
        "HRV_HTI",
        "HRV_ULF",
        "HRV_VLF",
        "HRV_LF",
        "HRV_HF",
        "HRV_VHF",
        "HRV_LFHF",
        "HRV_LFn",
        "HRV_HFn",
        "HRV_LnHF",
        "HRV_SD1",
        "HRV_SD2",
        "HRV_SD1SD2",
        "HRV_S",
        "HRV_CSI",
        "HRV_CVI",
        "HRV_CSI_Modified",
        "HRV_PIP",
        "HRV_IALS",
        "HRV_PSS",
        "HRV_PAS",
        "HRV_GI",
        "HRV_SI",
        "HRV_AI",
        "HRV_PI",
        "HRV_C1d",
        "HRV_C1a",
        "HRV_SD1d",
        "HRV_SD1a",
        "HRV_C2d",
        "HRV_C2a",
        "HRV_SD2d",
        "HRV_SD2a",
        "HRV_Cd",
        "HRV_Ca",
        "HRV_SDNNd",
        "HRV_SDNNa",
        "HRV_ApEn",
        "HRV_SampEn",
        "HRV_MSE",
        "HRV_CMSE",
        "HRV_RCMSE",
        "HRV_DFA",
        "HRV_CorrDim",
    ]

    features_arr = []
    for i in range(len(df)):
        # noinspection PyBroadException
        try:
            rr = df.iloc[i].dropna()  # 30-second RR intervals

            # convert RR intervals to peaks array (input expected by neurokit)
            peaks_rr = np.zeros((len(rr) + 1) * 1000)
            peaks_rr[0] = 1
            prev_peak = 0
            for r in rr:
                peak_idx = prev_peak + int(r * 1000)
                prev_peak = peak_idx
                peaks_rr[peak_idx] = 1

            segment_features = nk.hrv(peaks_rr, sampling_rate=1000, show=False)
            features_arr.append(segment_features)
        except Exception:
            values = np.zeros(len(feature_names))
            segment_features = pd.DataFrame([values], columns=feature_names)
            features_arr.append(segment_features)

    return pd.concat(features_arr, ignore_index=True)

In [108]:
def select_hrv_features(file_paths: List[str]) -> List[pd.DataFrame]:
    rr_segments_per_user: pd.DataFrame = pd.concat(
        [
            pd.read_csv(file_path)
            .drop("Unnamed: 0", axis=1)
            .iloc[:, 90:120]
            .rolling(3, axis=1)
            .mean()
            for file_path in file_paths
        ]
    )

    lengths: List[int] = [len(pd.read_csv(file_path)) for file_path in file_paths]

    hrv_features_per_user: pd.DataFrame = extract_hrv_features(rr_segments_per_user)
    hrv_features_per_user.replace([np.inf, -np.inf], np.nan, inplace=True)
    good_features = hrv_features_per_user.isnull().sum() == 0
    hrv_features_per_user = hrv_features_per_user[
        hrv_features_per_user.columns[good_features]
    ]

    hrv_features: List[pd.DataFrame] = []
    i = 0
    for l in lengths:
        hrv_features.append(hrv_features_per_user.iloc[i : i + l, :].reset_index())
        i += l

    return hrv_features

In [109]:
def extract_features(source_data_dir: str) -> List[pd.DataFrame]:
    file_names: List[str] = sorted(
        [
            file_name
            for file_name in os.listdir(source_data_dir)
            if "segments" in file_name
        ]
    )
    file_paths: List[str] = [
        f"{source_data_dir}/{file_name}" for file_name in file_names
    ]

    hrv_features_per_user: List[pd.DataFrame] = select_hrv_features(
        file_paths=file_paths
    )

    all_features_per_user: List[pd.DataFrame] = []
    for file_path, hrv_features in zip(file_paths, hrv_features_per_user):
        segments: pd.DataFrame = pd.read_csv(file_path).drop("Unnamed: 0", axis=1)
        domain_segments: List[pd.DataFrame] = split_domains(segments=segments)
        domain_segments_rms: List[pd.DataFrame] = [
            df.rolling(3, axis=1).mean() for df in domain_segments
        ]

        stat_features: pd.DataFrame = pd.concat(
            [
                extract_stat_features(segments=segments_rms, domain=domain)
                for domain, segments_rms in zip(DOMAINS, domain_segments_rms)
            ],
            axis=1,
        )
        gsr_features: pd.DataFrame = extract_eda_features(domain_segments_rms[GSR])
        all_features: pd.DataFrame = pd.concat(
            [stat_features, gsr_features, hrv_features], axis=1
        ).drop("index", axis=1)
        all_features_per_user.append(all_features)

    return all_features_per_user

In [110]:
def scale(raw_features: pd.DataFrame) -> pd.DataFrame:
    return pd.DataFrame(
        MinMaxScaler().fit_transform(raw_features), columns=raw_features.columns
    )

In [111]:
def standardize(raw_features: pd.DataFrame) -> pd.DataFrame:
    return pd.DataFrame(
        StandardScaler().fit_transform(raw_features), columns=raw_features.columns
    )

In [112]:
def merge_features(
    features_per_user: List[pd.DataFrame],
) -> [pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    raw_features: pd.DataFrame = pd.concat(
        [raw_f for raw_f in features_per_user], ignore_index=True
    )
    scaled_features: pd.DataFrame = pd.concat(
        [scale(raw_features=raw_f) for raw_f in features_per_user], ignore_index=True
    )
    standardized_features: pd.DataFrame = pd.concat(
        [standardize(raw_features=raw_f) for raw_f in features_per_user],
        ignore_index=True,
    )

    return raw_features, scaled_features, standardized_features


def merge_labels(source_data_dir: str) -> pd.DataFrame:
    return pd.concat(
        [
            pd.read_csv(f"{source_data_dir}/{file_name}")
            for file_name in sorted(os.listdir(source_data_dir))
            if "labels" in file_name
        ],
        ignore_index=True,
    ).drop("Unnamed: 0", axis=1)

In [113]:
raw_features_per_user: List[pd.DataFrame] = extract_features(
    source_data_dir=CLEAN_DATA_DIR
)

feature_dfs: [pd.DataFrame, pd.DataFrame, pd.DataFrame] = merge_features(
    features_per_user=raw_features_per_user
)

save_names: [str, str, str] = ("features_raw", "features_0-1", "features_std")

for feature_df, save_name in zip(feature_dfs, save_names):
    feature_df.to_csv(f"{FINAL_DATA_DIR}/{save_name}.csv")

label_df: pd.DataFrame = merge_labels(source_data_dir=CLEAN_DATA_DIR)
label_df.to_csv(f"{FINAL_DATA_DIR}/labels.csv")

In [116]:
def data_equal(comparee_dir: str, standard_dir: str) -> bool:
    # Check if the directory is compared with itself
    if comparee_dir == standard_dir:
        return True

    # Check if comparee data is equivalent
    comparee_raw_features: pd.DataFrame = pd.read_csv(
        f"{comparee_dir}/features_raw.csv", index_col=0
    )
    comparee_scaled_features: pd.DataFrame = pd.read_csv(
        f"{comparee_dir}/features_0-1.csv", index_col=0
    )
    comparee_standardized_features: pd.DataFrame = pd.read_csv(
        f"{comparee_dir}/features_std.csv", index_col=0
    )

    comparee_labels: pd.DataFrame = pd.read_csv(
        f"{comparee_dir}/labels.csv", index_col=0
    )

    if not (
        comparee_raw_features.shape
        == comparee_scaled_features.shape
        == comparee_standardized_features.shape
    ):
        return False

    # Check if both sources have data of the same shape
    comparee_data = pd.concat([comparee_labels, comparee_raw_features], axis=1)

    standard_features: pd.DataFrame = pd.read_csv(
        f"{standard_dir}/features.csv", index_col=0
    )
    standard_labels: pd.DataFrame = pd.read_csv(
        f"{standard_dir}/labels.csv", index_col=0
    )

    standard_data = pd.concat([standard_labels, standard_features], axis=1)

    if comparee_data.shape != standard_data.shape:
        print(comparee_data.shape, standard_data.shape)
        # return False

    # Check if both sources contain the same rows
    merged_data: pd.DataFrame = pd.merge(
        comparee_data,
        standard_data,
        how="outer",
        left_index=False,
        right_index=False,
        indicator=True,
    )
    data_sources: pd.Index = (
        merged_data["_merge"].unique().remove_unused_categories().categories
    )

    if len(data_sources) > 1 or "both" not in data_sources:
        return False

    return True

In [117]:
data_equal(comparee_dir=FINAL_DATA_DIR, standard_dir="end_data")

(837, 127) (838, 127)


True