In [None]:
from google.colab import drive
drive.mount("/content/drive")

PROJECT_ROOT = "/content/drive/MyDrive/stress-heart-ml-wesad"


In [None]:
import os

PROJECT_ROOT = "/content/stress-heart-ml-wesad"
DATA_DIR = f"{PROJECT_ROOT}/data/processed"
RESULTS_DIR = f"{PROJECT_ROOT}/results"

os.makedirs(DATA_DIR, exist_ok=True)
os.makedirs(RESULTS_DIR, exist_ok=True)

print("Project directories ready ✅")


In [None]:
peak_heights = properties["peak_heights"]

scr_features = {
    "scr_peak_count": len(peaks),
    "scr_mean_amplitude": np.mean(peak_heights) if len(peak_heights) > 0 else 0,
    "scr_max_amplitude": np.max(peak_heights) if len(peak_heights) > 0 else 0,
}
scr_features

In [None]:
from scipy.signal import find_peaks
import numpy as np
eda_norm = (eda_filtered - eda_filtered.min()) / (eda_filtered.max() - eda_filtered.min())

peaks, properties = find_peaks(eda_norm, height=0.1, distance=10)

print("Number of SCR Peaks:", len(peaks))

In [None]:
from scipy.signal import savgol_filter
tonic = savgol_filter(eda_filtered, window_length=301, polyorder=3)
phasic = eda_filtered - tonic

In [None]:
eda_slope = np.gradient(eda_filtered)

eda_features = {
    "tonic_mean": np.mean(tonic),
    "phasic_std": np.std(phasic),
    "eda_slope_mean": np.mean(eda_slope),
    "eda_slope_std": np.std(eda_slope),
}

eda_features


In [None]:
signals = data['signal']['wrist']
bvp = signals['BVP']
fs_bvp = 64

print(bvp.shape)

In [None]:
import numpy as np

bvp_1d = np.asarray(bvp).squeeze().astype(float)

# handle NaNs if any
mask = np.isfinite(bvp_1d)
bvp_1d = np.interp(
    np.arange(len(bvp_1d)),
    np.where(mask)[0],
    bvp_1d[mask]
)

In [None]:
from scipy.signal import butter, filtfilt

def bandpass_filter(data, lowcut=0.67, highcut=3.5, fs=64, order=3):
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    b, a = butter(order, [low, high], btype='band')
    return filtfilt(b, a, data)

bvp_filt = bandpass_filter(bvp_1d)

In [None]:
rr_intervals = np.diff(peaks) / fs_bvp

hr = 60 / rr_intervals

print("HR sample:", hr[:10])

In [None]:
sdnn = np.std(rr_intervals)
rmssd = np.sqrt(np.mean(np.square(np.diff(rr_intervals))))
pnn50 = np.sum(np.abs(np.diff(rr_intervals)) > 0.05) / len(rr_intervals)

hrv_features = {
    "SDNN": sdnn,
    "RMSSD": rmssd,
    "pNN50": pnn50
}

hrv_features

In [None]:
import numpy as np

def sliding_windows(signal, fs, window_sec=60, step_sec=30):
    win = int(window_sec * fs)
    step = int(step_sec * fs)
    windows = []
    for start in range(0, len(signal) - win, step):
        windows.append(signal[start:start + win])
    return windows

In [None]:
eda_windows = sliding_windows(eda_filtered, fs=4)
print("EDA windows:", len(eda_windows))

In [None]:
from scipy.signal import find_peaks

def eda_stress_score(eda_win):
    eda_norm = (eda_win - eda_win.min()) / (eda_win.max() - eda_win.min() + 1e-8)
    peaks, _ = find_peaks(eda_norm, height=0.1, distance=5)
    return len(peaks), np.mean(eda_norm[peaks]) if len(peaks)>0 else 0


In [None]:
labels = data['label']

def window_labels(labels, fs_label=700, window_sec=60, step_sec=30):
    win = int(window_sec * fs_label)
    step = int(step_sec * fs_label)
    y = []
    for start in range(0, len(labels) - win, step):
        window = labels[start:start+win]

        y.append(np.bincount(window).argmax())
    return y


In [None]:
y = window_labels(labels)

In [None]:
import numpy as np
import pandas as pd

eda_filtered = np.load(f"{DATA_DIR}/eda_filtered.npy")
bvp_filtered = np.load(f"{DATA_DIR}/bvp_filtered.npy")

print("✅ Loaded EDA & BVP")


In [None]:
df_ml.to_csv(f"{DATA_DIR}/stress_heart_ml_dataset.csv", index=False)
print("✅ Saved ML dataset")
