In [1]:
import matplotlib.pyplot as plt
from scipy.io import wavfile
import pandas as pd
import numpy as np
import os
import pathlib
import sys

code_paths = {}
code_paths["repo_name"] = "p5-stress-classifier"

code_paths["repo_path"] = os.getcwd()
base_dir = os.path.basename(code_paths["repo_path"])
while base_dir != code_paths["repo_name"]:
    code_paths["repo_path"] = os.path.dirname(os.path.abspath(code_paths["repo_path"]))
    base_dir = os.path.basename(code_paths["repo_path"])

package_dir = pathlib.Path(code_paths["repo_path"], "src")
sys.path.append(str(package_dir))
from stresspred import (
    peak_time_to_rri,
    P5_StressDataLoader,
    timestamp_to_samp,
    resample_nonuniform,
    code_paths,
    timestamps_to_audacity_txt,
    find_files_with_string,
    get_camel_case
)

  "class": algorithms.Blowfish,


In [2]:
for path in code_paths["neurokit2_paths"]:
    sys.path.append(str(path))
    
from neurokit2 import signal_interpolate, find_outliers
import neurokit2 as nk

In [3]:
from stresspred import get_frame_start_stop, get_frame, append_samples_to_wav
def clean_sig(
    sig,
    sig_time=None,
    wav_path="out.wav",
    sampling_rate=1000,
    frame_len=np.inf,
    method="own_filt",
    show=False
):

    #sig = sig - np.nanmean(sig)
    #sig[np.isnan(sig)] = 0
    #sig = sig / np.max(np.abs(sig))
    in_start_stop, out_start_stop = get_frame_start_stop(sig_time, frame_len=frame_len)
    for frame_n in np.arange(1, len(in_start_stop) + 1):
        
        frame_sig = get_frame(sig, sig_time, in_start_stop, frame_n)
        frame_sig_time = get_frame(sig_time, sig_time, in_start_stop, frame_n)
        frame_sig_time_r = frame_sig_time
        if method=="own_filt":
            frame_sig_r = nk.signal_filter(
                frame_sig,
                sampling_rate=sampling_rate,
                lowcut=0.5,
                highcut=8,
            )
        elif method=="nk_ecg_clean":
            frame_sig_r = nk.ecg_clean(frame_sig, sampling_rate=sampling_rate, method="pantompkins1985")
        else:
            frame_sig_r = nk.ppg_clean(frame_sig, sampling_rate=sampling_rate)
        out_frame_sig_r = get_frame(
            frame_sig_r, frame_sig_time_r, out_start_stop, frame_n
        )
        out_frame_sig_time_r = get_frame(
            frame_sig_time_r, frame_sig_time_r, out_start_stop, frame_n
        )
        if show:
            plt.figure(figsize=(15,3))
            plt.plot(frame_sig_time, frame_sig)
            plt.plot(frame_sig_time_r, frame_sig_r)
            plt.plot(out_frame_sig_time_r, out_frame_sig_r)
            plt.xlabel("Time (s)")
            plt.title("Frame " + str(frame_n))
            plt.legend(["Original input signal", "Processed input signal", "Output signal"])
            plt.show()
        if frame_n == 1:
            append_samples_to_wav(
                out_frame_sig_r, wav_path, sampling_rate, rewrite=True
            )
        else:
            append_samples_to_wav(
                out_frame_sig_r, wav_path, sampling_rate, rewrite=False
            )
    if frame_len == np.inf:
        out_frame_sig_r_time = get_frame(
            frame_sig_time_r, frame_sig_time_r, out_start_stop, frame_n
        )
        sig_info = {}
        sig_info["sig"] = out_frame_sig_r
        sig_info["time"] = out_frame_sig_r_time
        sig_info["sampling_rate"] = sampling_rate
        return sig_info

In [4]:
def get_name_conv(
    dataset_name="P5_Stress",
    sub_label=None,
    sub_id=1,
    part_id=1,
    sig_name="zephyr_ecg",
    info_types_str="",
    v_str="",
    end_add="",
    ext=".txt",
):
    if sub_label is None:
        sub_label = "P0" + str(sub_id)
    new_name = (
        dataset_name
        + "-"
        + sub_label
        + "_"
        + str(part_id)
        + "-"
        + sig_name
        + "-"
        + info_types_str
        + v_str
        + end_add
        + ext
    )
    
    new_file_parent_path = pathlib.Path(sub_label, "part" + str(part_id), sig_name)
    if v_str != "":
        new_file_parent_path = pathlib.Path(new_file_parent_path, v_str.split("_")[1])
    new_file_path = str(pathlib.Path(new_file_parent_path, new_name))
    return new_file_path

In [7]:
root_dir = str(pathlib.Path(r"Z:\Shared\Documents\RD\RD2\_AudioRD\datasets\Biosignals\CritiasStress\data_derivatives\Stage_ltallon"))

In [9]:
for sub_id in [1, 2, 3, 4, 5, 6]:
    for sig_name in ["ieml"]:
        if sub_id == 4:
            part_ids = [1, 2]
        else:
            part_ids = [1]
        for part_id in part_ids:
            loader = P5_StressDataLoader(sub_id=sub_id, part_id=part_id)
            sig_info = loader.get_sig(sig_name, data_format="DB8k")
            wav_path = str(
                pathlib.Path(
                    root_dir,
                    get_name_conv(
                        sub_id=sub_id,
                        part_id=part_id,
                        sig_name=sig_name,
                        ext=".wav",
                        info_types_str="Sig-Clean",
                    ),
                )
            )
            cleaned_info = clean_sig(
                sig=sig_info["sig"],
                sig_time=sig_info["time"],
                wav_path=wav_path,
                sampling_rate=sig_info["sampling_rate"],
                frame_len=180,
                show=False,
            )

In [8]:
wav_path = str(
                pathlib.Path(
                    new_root_dir,
                    get_name_conv(
                        sub_id=sub_id,
                        part_id=part_id,
                        sig_name=sig_name,
                        ext=".wav",
                        info_types_str="Sig-Clean",
                        v_str="_v2"
                    ),
                )
            )
wav_path

'C:\\Users\\dbenesch\\Documents\\Stage_ltallon_new\\P01\\part1\\zephyr_ecg\\v2\\P5_Stress-P01_1-zephyr_ecg-Sig-Clean_v2.wav'

In [None]:
for 

loader = P5_StressDataLoader(sub_id=sub_label, part_id=part_id)
sig_name = "ieml"
sig_info = loader.get_sig(sig_name, data_format="DB8k", file_path=file_path)


anno_parent_paths = [d for d in anno_info_path.parent.glob("Task*") if d.is_dir()]
anno_parent_path = anno_parent_paths[6]
wav_file_paths = [
    os.path.join(anno_parent_path, f)
    for f in os.listdir(anno_parent_path)
    if ".wav" in f and get_camel_case(sig_name, first_upper=True) in f
]
file_path = wav_file_paths[0]
sub_label = pathlib.Path(file_path).stem.split("-")[1][:3]

In [None]:

sig_info = loader.get_sig(sig_name, data_format="DB8k", file_path=file_path)

ppg_cleaned_info = clean_ppg(
    sig=sig_info["sig"],
    sig_time=sig_info["time"],
    wav_path=wav_path,
    sampling_rate=sig_info["sampling_rate"],
    frame_len=180,
    show=True
)