In [1]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import find_peaks, welch


In [2]:
base_path_video = "../BVPs"

failed_masks = [
    [2, "Q1_1"],
    [52, "Q7_2"],
    [53, "Q4_2"]
]

class Timestamps:
    Q1_1 = [[9, 14],[14, 19]]
    Q1_2 = [[24, 29]]

    Q2_1 = [[1, 6],[6, 11]]
    Q2_2 = [[7, 12], [12, 17]]

    Q3_1 = [[14, 19], [19, 24]]
    Q3_2 = [[34, 39], [40, 44], [45, 49]]

    Q4_1 = [[9, 14], [16, 21]]
    Q4_2 = [[10, 15], [16, 21]]

    Q5_1 = [[18, 23], [10, 15]]
    Q5_2 = [[13, 18], [5, 10]]

    Q6_1 = [[80, 85], [85, 90]]
    Q6_2 = [[10, 15], [18, 23]]

    Q7_1 = [[43, 48], [30, 35]]
    Q7_2 = [[36, 41], [41, 46]]

    Q8_1 = [[12, 17], [17, 22]]
    Q8_2 = [[7, 12], [12, 17]]

    Q9_1 = [[15, 20], [25, 30]]
    Q9_2 = [[13, 18], [19, 24]]

paths = [
    "Q1_1",
    "Q1_2",
    # "Q2_1",
    # "Q2_2",
    "Q3_1",
    "Q3_2",
    # "Q4_1",
    # "Q4_2",
    #"Q5_1",
    #"Q5_2",
    # "Q6_1",
    # "Q6_2",
    "Q7_1",
    "Q7_2",
    # "Q8_1",
    # "Q8_2",
    "Q9_1",
    "Q9_2"
]

patients = list(range(1, 62))
patients.remove(23)

#patients = expressive

In [3]:
class BVP:
    def __init__(self, patient, path, signal, features, id):
        self.patient = patient
        self.path = path
        self.signal = signal
        self.features = features
        self.id = id

In [4]:
def cut_bvp(bvp, t_start, t_end, fs = 60):

    n_start = int(t_start * fs)
    n_end   = int(t_end * fs) if t_end is not None else len(bvp)
    return bvp[n_start:n_end]

In [5]:
fs = 60

BVPs = []

for patient in patients:

    for path in paths:

        if [patient, path] in failed_masks:
            print(f"Skipping Patient_{patient}, {path}")
            continue

        data = np.load(f"{base_path_video}/Patient_{patient}/{path}.npy")

        for t_start, t_end in getattr(Timestamps, path):

            data_cut = cut_bvp(data, t_start, t_end, fs)

            id = f"{patient}{path}"

            bvp = BVP(patient, path, data_cut, [], id)

            BVPs.append(bvp)

            #print(f"Patient_{patient}, {path}: {data.shape}")

print(f"Loaded {len(BVPs)} BVP signals")

Skipping Patient_2, Q1_1
Skipping Patient_52, Q7_2
Loaded 956 BVP signals


In [6]:
print(len(BVPs[0].signal))

300


In [7]:
import numpy as np
from scipy.signal import find_peaks, czt
from scipy.stats import linregress, skew, kurtosis
from itertools import permutations


def compute_bvp_short_window_features(
    bvp,
    fs,
    fmin=0.66,
    fmax=3.0,
    n_czt_bins=512
):
    """
    Robust short-window (≈3s) BVP features for arousal classification.
    Designed for small manually labeled emotional segments.

    Parameters
    ----------
    bvp : np.ndarray
        1D BVP signal
    fs : float
        Sampling frequency (Hz)
    """

    features = {}
    bvp = np.asarray(bvp, dtype=float)

    if len(bvp) < fs * 2:  # too short
        return {k: np.nan for k in [
            "mean_hr", "hr_slope",
            "dom_freq", "peak_power_ratio",
            "spec_entropy", "freq_variance", "hr_snr",
            "amp_mean", "amp_std",
            "signal_energy", "signal_std",
            "skewness", "kurtosis",
            "sample_entropy", "perm_entropy",
            "peak_success_ratio"
        ]}

    # Remove DC
    bvp = bvp - np.mean(bvp)

    # --------------------------------------------------
    # 1. Peak detection
    # --------------------------------------------------
    min_dist = int(0.4 * fs)
    peaks, _ = find_peaks(bvp, distance=min_dist)

    if len(peaks) >= 2:
        ibi = np.diff(peaks) / fs
        hr = 60.0 / ibi
        features["mean_hr"] = np.mean(hr)

        t = np.arange(len(hr))
        features["hr_slope"] = linregress(t, hr).slope
    else:
        features["mean_hr"] = np.nan
        features["hr_slope"] = np.nan

    # --------------------------------------------------
    # 2. Spectral features (CZT)
    # --------------------------------------------------
    w = np.exp(-1j * 2 * np.pi * (fmax - fmin) / (n_czt_bins * fs))
    a = np.exp(1j * 2 * np.pi * fmin / fs)

    spectrum = czt(bvp, n_czt_bins, w, a)
    power = np.abs(spectrum) ** 2
    freqs = np.linspace(fmin, fmax, n_czt_bins)

    total_power = np.sum(power)

    if total_power > 0:
        idx_peak = np.argmax(power)
        dom_freq = freqs[idx_peak]

        features["dom_freq"] = dom_freq
        features["peak_power_ratio"] = power[idx_peak] / total_power

        p_norm = power / total_power
        features["spec_entropy"] = -np.sum(
            p_norm * np.log2(p_norm + 1e-12)
        )

        features["freq_variance"] = np.sum(
            power * (freqs - dom_freq) ** 2
        ) / total_power

        features["hr_snr"] = np.max(power) / (np.mean(power) + 1e-12)

    else:
        features["dom_freq"] = np.nan
        features["peak_power_ratio"] = np.nan
        features["spec_entropy"] = np.nan
        features["freq_variance"] = np.nan
        features["hr_snr"] = np.nan

    # --------------------------------------------------
    # 3. Pulse amplitude features
    # --------------------------------------------------
    troughs, _ = find_peaks(-bvp, distance=min_dist)
    n_beats = min(len(peaks), len(troughs))

    if n_beats > 0:
        amp = bvp[peaks[:n_beats]] - bvp[troughs[:n_beats]]
        features["amp_mean"] = np.mean(amp)
        features["amp_std"] = np.std(amp)
    else:
        features["amp_mean"] = np.nan
        features["amp_std"] = np.nan

    # --------------------------------------------------
    # 4. Signal statistics
    # --------------------------------------------------
    features["signal_energy"] = np.sum(bvp ** 2)
    features["signal_std"] = np.std(bvp)
    features["skewness"] = skew(bvp)
    features["kurtosis"] = kurtosis(bvp)

    # --------------------------------------------------
    # 5. Entropy features
    # --------------------------------------------------
    def sample_entropy(x, m=2, r=0.2):
        x = np.asarray(x)
        r *= np.std(x)
        N = len(x)

        if N < m + 2:
            return np.nan

        def _phi(m):
            x_m = np.array([x[i:i + m] for i in range(N - m)])
            C = np.sum(
                np.max(
                    np.abs(x_m[:, None] - x_m[None, :]), axis=2
                ) <= r,
                axis=0
            ) - 1
            return np.sum(C) / ((N - m) * (N - m - 1))

        return -np.log(_phi(m + 1) / _phi(m))

    try:
        features["sample_entropy"] = sample_entropy(bvp)
    except Exception:
        features["sample_entropy"] = np.nan

    def permutation_entropy(x, order=3, delay=1):
        x = np.asarray(x)
        perms = list(permutations(range(order)))
        counts = np.zeros(len(perms))

        for i in range(len(x) - delay * (order - 1)):
            pattern = x[i:i + delay * order:delay]
            idx = perms.index(tuple(np.argsort(pattern)))
            counts[idx] += 1

        p = counts / np.sum(counts)
        return -np.sum(p * np.log2(p + 1e-12))

    try:
        features["perm_entropy"] = permutation_entropy(bvp)
    except Exception:
        features["perm_entropy"] = np.nan

    # --------------------------------------------------
    # 6. Signal quality
    # --------------------------------------------------
    if not np.isnan(features["mean_hr"]):
        expected_beats = len(bvp) / fs * (features["mean_hr"] / 60.0)
        features["peak_success_ratio"] = (
            len(peaks) / expected_beats
            if expected_beats > 0 else np.nan
        )
    else:
        features["peak_success_ratio"] = np.nan

    return features


In [8]:
fs = 60

valid = []

failed = []

failed_masks = [
    [2, "Q1_1"],
    [52, "Q7_2"],
    [53, "Q4_2"]
]

bvp = BVPs[0]
try:
    
    feats = compute_bvp_short_window_features(bvp.signal, fs)

    if feats is None or []:
        # Do nothing
        print(f"Failed: Patient_{bvp.patient}, {bvp.path}")
        BVPs.remove(bvp)

    else:

        bvp.features = feats

        valid.append(f"Patient_{bvp.patient}, {bvp.path}")

except Exception as e:
    print(e)

print(f"Extracted features for {len(valid)} videos")

print(f"{len(feats)} Feats: {feats}")


Extracted features for 1 videos
16 Feats: {'mean_hr': np.float64(111.44981065285458), 'hr_slope': np.float64(-10.533509727906129), 'dom_freq': np.float64(1.1179256360078278), 'peak_power_ratio': np.float64(0.014472147296813044), 'spec_entropy': np.float64(7.76061405476444), 'freq_variance': np.float64(0.3761858775543272), 'hr_snr': np.float64(7.409739415968071), 'amp_mean': np.float64(0.26862234354099823), 'amp_std': np.float64(0.08823250705071595), 'signal_energy': np.float64(2.942719042171681), 'signal_std': np.float64(0.09904071624289479), 'skewness': np.float64(0.479878882144499), 'kurtosis': np.float64(-0.44220686560316524), 'sample_entropy': np.float64(0.4270695671693521), 'perm_entropy': np.float64(1.4099888308980844), 'peak_success_ratio': np.float64(0.861374276345988)}


In [9]:
fs = 60  # sampling rate

valid = []
failed = []

for bvp in BVPs:  # use a copy to safely remove items

    print(f"Computing Features for Patient_{bvp.patient}...", end="\r", flush=True)
    
    try:
        # Use the new windowed feature extraction
        feats = compute_bvp_short_window_features(bvp.signal, fs)

        if feats is None or feats == []:
            print(f"Failed: Patient_{bvp.patient}, {bvp.path}")
            failed.append(f"Patient_{bvp.patient}, {bvp.path}")
            BVPs.remove(bvp)  # remove problematic signal
        else:
            bvp.features = feats
            valid.append(f"Patient_{bvp.patient}, {bvp.path}")

    except Exception as e:
        print(f"Error for Patient_{bvp.patient}, {bvp.path}: {e}")
        failed.append(f"Patient_{bvp.patient}, {bvp.path}")
        BVPs.remove(bvp)

print(f"Extracted features for {len(valid)} videos")
print(f"Failed: {failed}")

Extracted features for 956 videos...
Failed: []


In [10]:
print("Example of features: ", BVPs[0].features.keys())
print("Features per video: ", len(BVPs[0].features))

Example of features:  dict_keys(['mean_hr', 'hr_slope', 'dom_freq', 'peak_power_ratio', 'spec_entropy', 'freq_variance', 'hr_snr', 'amp_mean', 'amp_std', 'signal_energy', 'signal_std', 'skewness', 'kurtosis', 'sample_entropy', 'perm_entropy', 'peak_success_ratio'])
Features per video:  16


In [11]:
label_map = {
    "Q1": "Q1: ↑Arousal ↓Val",
    "Q2": "Q2: ↑Arousal -Val",
    "Q3": "Q3: ↑Arousal ↑Val",
    "Q4": "Q4: -Arousal ↓Val",
    "Q5": "Q5: -Arousal -Val",
    "Q6": "Q6: -Arousal ↑Val",
    "Q7": "Q7: ↓Arousal ↓Val",
    "Q8": "Q8: ↓Arousal -Val",
    "Q9": "Q9: ↓Arousal ↑Val",
}

def get_label(path):
    q = path.split("_")[0]  # "Q3_2" → "Q3"
    return label_map[q]



LEARNING

In [12]:
# # Random Split

# from sklearn.model_selection import train_test_split

# X_train, X_test, y_train, y_test = train_test_split(
#     X, y,
#     test_size=0.3,
#     stratify=y,
# )

# print(X_train.shape, y_train.shape)
# print(X_test.shape, y_test.shape)

# print("Example of data: ", X_train[0], y_train[0])


In [13]:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier

pipe = Pipeline([
    ("scaler", StandardScaler()),
    ("clf", RandomForestClassifier(
        n_estimators=200,
        random_state=42,
        class_weight="balanced"
    ))
])

In [14]:
import numpy as np

X = []
y = []
groups = []

for bvp in BVPs:
    if bvp is None or bvp.features is []:
        print("Error: Patient", bvp.patient, bvp.path)
        continue

    feat_values = list(bvp.features.values())
    X.append(feat_values)
    y.append(get_label(bvp.path))
    groups.append(bvp.id)  
    

X = np.array(X)
y = np.array(y)

print(X.shape, y.shape)
print(np.unique(y, return_counts=True))
print("Example of data: ", X[0])


(956, 16) (956,)
(array(['Q1: ↑Arousal ↓Val', 'Q3: ↑Arousal ↑Val', 'Q7: ↓Arousal ↓Val',
       'Q9: ↓Arousal ↑Val'], dtype='<U17'), array([178, 300, 238, 240]))
Example of data:  [ 1.11449811e+02 -1.05335097e+01  1.11792564e+00  1.44721473e-02
  7.76061405e+00  3.76185878e-01  7.40973942e+00  2.68622344e-01
  8.82325071e-02  2.94271904e+00  9.90407162e-02  4.79878882e-01
 -4.42206866e-01  4.27069567e-01  1.40998883e+00  8.61374276e-01]


In [15]:
from sklearn.model_selection import StratifiedGroupKFold
from sklearn.metrics import classification_report, confusion_matrix

cv = StratifiedGroupKFold(n_splits=5)

for fold, (train_idx, test_idx) in enumerate(cv.split(X, y, groups)):
    print(f"Fold {fold}")

    X_train, X_test = X[train_idx], X[test_idx]
    y_train, y_test = y[train_idx], y[test_idx]

    pipe.fit(X_train, y_train)
    score = pipe.score(X_test, y_test)

    print("Score:", score)

    y_pred = pipe.predict(X_test)

    print(classification_report(y_test, y_pred))
    print(confusion_matrix(y_test, y_pred))
    print()
    print("-"*50)



Fold 0
Score: 0.3489583333333333
                   precision    recall  f1-score   support

Q1: ↑Arousal ↓Val       0.36      0.14      0.20        36
Q3: ↑Arousal ↑Val       0.48      0.70      0.57        60
Q7: ↓Arousal ↓Val       0.24      0.25      0.25        48
Q9: ↓Arousal ↑Val       0.20      0.17      0.18        48

         accuracy                           0.35       192
        macro avg       0.32      0.31      0.30       192
     weighted avg       0.33      0.35      0.32       192

[[ 5 12 13  6]
 [ 2 42  7  9]
 [ 3 15 12 18]
 [ 4 19 17  8]]

--------------------------------------------------
Fold 1
Score: 0.3645833333333333
                   precision    recall  f1-score   support

Q1: ↑Arousal ↓Val       0.50      0.22      0.31        36
Q3: ↑Arousal ↑Val       0.42      0.53      0.47        60
Q7: ↓Arousal ↓Val       0.32      0.40      0.35        48
Q9: ↓Arousal ↑Val       0.28      0.23      0.25        48

         accuracy                           0.36 

In [16]:
import joblib

joblib.dump(pipe, "pipe_5s.joblib")

# import joblib

# pipe = joblib.load("bvp_emotion_pipeline.joblib")

# pipe.predict(X)
# pipe.predict_proba(X)
# pipe.classes_

['pipe_5s.joblib']

In [17]:
import pandas as pd

# feature names from BVP objects (windowed)
feature_names = list(BVPs[0].features.keys())

# extract RF importances from pipeline
importances = pipe.named_steps['clf'].feature_importances_

# build DataFrame
imp_df = pd.DataFrame({
    "feature": feature_names,
    "importance": importances
}).sort_values(by="importance", ascending=False)

print(imp_df)


               feature  importance
4         spec_entropy    0.072606
3     peak_power_ratio    0.071037
1             hr_slope    0.066742
11            skewness    0.066288
6               hr_snr    0.066081
12            kurtosis    0.065698
13      sample_entropy    0.064353
8              amp_std    0.063014
5        freq_variance    0.062587
14        perm_entropy    0.060540
15  peak_success_ratio    0.059552
0              mean_hr    0.059460
2             dom_freq    0.058691
7             amp_mean    0.058336
9        signal_energy    0.053684
10          signal_std    0.051331
