In [5]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from scipy.signal import butter, filtfilt, find_peaks
from scipy.stats import kurtosis

from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix

import joblib
# //import tensorflow as tf
# from tensorflow.keras.models import Sequential
# from tensorflow.keras.layers import LSTM, Dense, Dropout

import os
import json
import random

SEED = 42
np.random.seed(SEED)
random.seed(SEED)
tf.random.set_seed(SEED)


NameError: name 'tf' is not defined

In [None]:
def synth_ppg(duration_s=30, fs=100, hr_bpm=75, motion_events=None, noise_level=0.02):
    t = np.arange(0, duration_s, 1/fs)
    f_hr = hr_bpm / 60.0
    ppg_clean = 0.6*np.sin(2*np.pi*f_hr*t) + 0.05*np.sin(2*np.pi*2*f_hr*t)
    ppg_clean += 0.02*np.sin(2*np.pi*0.25*t)
    ppg_noisy = ppg_clean + noise_level*np.random.randn(len(t))

    artifact = np.zeros_like(t)
    if motion_events:
        for (s,e,amp,freq) in motion_events:
            si, ei = int(s*fs), int(e*fs)
            artifact[si:ei] += amp * np.sin(2*np.pi*freq*t[si:ei])

    ppg = ppg_noisy + artifact
    return t, ppg

def bandpass(signal, fs, low=0.5, high=5.0, order=3):
    b,a = butter(order, [low/(fs/2), high/(fs/2)], btype='band')
    return filtfilt(b,a,signal)

def extract_peaks_rr(ppg_f, fs):
    peaks, props = find_peaks(ppg_f, distance=fs*0.35, height=np.mean(ppg_f)*0.3)
    if len(peaks) < 2:
        return None, None
    rr_intervals = np.diff(peaks) / fs
    return peaks, rr_intervals

def compute_hr_hrv(rr_intervals):
    hr = 60.0 / np.mean(rr_intervals)
    rmssd = np.sqrt(np.mean(np.square(np.diff(rr_intervals))))
    sdnn = np.std(rr_intervals)
    return hr, rmssd, sdnn

def compute_motion_variance(acc_signal, fs, window_s=1):
    w = int(window_s * fs)
    return np.array([np.var(acc_signal[i:i+w]) for i in range(0, len(acc_signal)-w)])

def compute_sqi(ppg_segment, peaks, motion_var):
    score = 1.0
    if peaks is None or len(peaks) < 8:
        score -= 0.4
    k = kurtosis(ppg_segment)
    if k < 1 or k > 10:
        score -= 0.25
    if np.mean(motion_var) > 0.5:
        score -= 0.4
    return max(0.0, score)


In [None]:
def make_synthetic_dataset(n_windows=600, fs=100, window_s=10):
    rows = []
    labels = []
    
    for i in range(n_windows):
        r = np.random.rand()
        if r < 0.6:
            hr_bpm = np.random.normal(75, 5)
            rmssd_v = np.random.uniform(30, 60)
            gsr_base = np.random.uniform(0.18, 0.32)
            motion_amp = np.random.uniform(0, 0.4)
            label = "Normal"
            motion_events = None if motion_amp < 0.15 else [(2, 4, motion_amp, 0.8)]
        else:
            hr_bpm = np.random.normal(95, 6)
            rmssd_v = np.random.uniform(5, 20)
            gsr_base = np.random.uniform(0.38, 0.65)
            motion_amp = np.random.uniform(0, 0.5)
            label = "Stress"
            motion_events = None if motion_amp < 0.15 else [(2, 5, motion_amp, 0.8)]

        t, ppg = synth_ppg(window_s, fs, int(hr_bpm), motion_events)
        ppg_f = bandpass(ppg, fs)
        peaks, rr = extract_peaks_rr(ppg_f, fs)

        acc = 0.02*np.random.randn(len(t))
        if motion_events:
            for (s,e,amp,freq) in motion_events:
                si, ei = int(s*fs), int(e*fs)
                acc[si:ei] += amp*np.sin(2*np.pi*freq*t[si:ei])

        motion_var = compute_motion_variance(acc, fs)
        sqi = compute_sqi(ppg_f, peaks, motion_var)

        if rr is None or len(rr) < 2:
            hr, rmssd_val, sdnn_val = 0, 0, 0
        else:
            hr, rmssd_val, sdnn_val = compute_hr_hrv(rr)

        rows.append({
            "hr": float(hr),
            "rmssd": float(rmssd_val),
            "gsr": float(gsr_base),
            "motion_var": float(np.mean(motion_var)),
            "sqi": float(sqi)
        })
        labels.append(label)

    df = pd.DataFrame(rows)
    df["label"] = labels
    return df

print("Generating dataset...")
df = make_synthetic_dataset(800)
print(df.head())


In [None]:
df_clean = df[df.sqi > 0.2].reset_index(drop=True)
print(df_clean.shape)
df_clean.head()


In [None]:
features = ["hr", "rmssd", "gsr", "motion_var"]
X = df_clean[features]
y = df_clean["label"]

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.28, random_state=SEED, stratify=y
)

rf = RandomForestClassifier(n_estimators=150, random_state=SEED)
rf.fit(X_train, y_train)

preds = rf.predict(X_test)
print("Accuracy:", accuracy_score(y_test, preds))
print(classification_report(y_test, preds))
print(confusion_matrix(y_test, preds))

joblib.dump(rf, "rf_stress_model.joblib")


In [None]:
def predict_from_features(hr, rmssd, gsr, motion_var):
    model = joblib.load("rf_stress_model.joblib")
    X_in = np.array([[hr, rmssd, gsr, motion_var]])
    pred = model.predict(X_in)[0]
    prob = model.predict_proba(X_in)[0]
    return pred, dict(zip(model.classes_, prob))

tests = [
    {"hr":110, "rmssd":8, "gsr":0.52, "motion_var":0.12},
    {"hr":82, "rmssd":45, "gsr":0.26, "motion_var":0.08},
]

for d in tests:
    pred, prob = predict_from_features(**d)
    print(d, "â†’", pred, prob)


In [None]:

X_seq = []
y_seq = []

for i in range(400):
    label = 0 if np.random.rand() < 0.6 else 1
    base_hr = np.random.normal(75,5) if label==0 else np.random.normal(95,6)
    
    seq_hr = base_hr + np.random.randn(30)
    seq_rmssd = np.random.uniform(35,60,30) if label==0 else np.random.uniform(8,20,30)
    seq_gsr = np.random.uniform(0.18,0.32,30) if label==0 else np.random.uniform(0.4,0.6,30)
    
    seq = np.vstack([seq_hr, seq_rmssd, seq_gsr]).T
    X_seq.append(seq)
    y_seq.append(label)

X_seq = np.array(X_seq)
y_seq = np.array(y_seq)

split = int(0.7*len(X_seq))
X_train, X_test = X_seq[:split], X_seq[split:]
y_train, y_test = y_seq[:split], y_seq[split:]

model = Sequential([
    LSTM(32, input_shape=(30,3)),
    Dense(16, activation='relu'),
    Dense(1, activation='sigmoid')
])

model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
model.fit(X_train, y_train, epochs=6, batch_size=16)

acc = model.evaluate(X_test, y_test)[1]
print("LSTM Accuracy:", acc)
