# U5: Ejemplo de detección de estrés con señales ECG

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/husseinlopez/cdsi2026/blob/main/U5_example-ecg.ipynb)

In [None]:
#!pip install heartpy scipy scikit-learn pandas numpy matplotlib seaborn

In [None]:
# system
import os
import warnings
warnings.filterwarnings('ignore')

# data
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

# signal processing
import heartpy as hp
from scipy.signal import welch
from scipy.interpolate import interp1d

# machine learning
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.tree import DecisionTreeClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import LeaveOneGroupOut
from sklearn.metrics import f1_score, confusion_matrix, ConfusionMatrixDisplay

### WESAD dataset

WESAD (Wearable Stress and Affect Detection) es un dataset multimodal para la detección de estrés y afecto. Contiene señales ECG de 15 sujetos recolectadas con un dispositivo RespiBAN Professional a 700 Hz.

Las etiquetas de condición son:
- **1**: baseline (reposo)
- **2**: stress (tarea de aritmética mental / Trier Social Stress Test)
- **3**: amusement (clips de video divertidos)

Dataset en Kaggle (requiere cuenta): https://www.kaggle.com/datasets/qiriro/wesad

Para descargarlo directamente desde Colab:
1. En Kaggle → Account → Create New Token → descarga `kaggle.json`
2. Sube el archivo cuando la celda siguiente lo solicite

In [None]:
import os
from google.colab import files

# upload kaggle.json when prompted
os.makedirs('/root/.kaggle', exist_ok=True)
uploaded = files.upload()  # select kaggle.json here

with open('/root/.kaggle/kaggle.json', 'wb') as f:
    f.write(uploaded['kaggle.json'])
os.chmod('/root/.kaggle/kaggle.json', 0o600)

# download and unzip (~2 GB, may take a few minutes)
!pip install -q kaggle
!kaggle datasets download -d qiriro/wesad -p /content --unzip

WESAD_PATH = '/content/WESAD'

# subjects available
subjects = ['S2', 'S3', 'S4', 'S5', 'S6', 'S7', 'S8', 'S9', 'S10', 'S11',
            'S13', 'S14', 'S15', 'S16', 'S17']

SAMPLE_RATE = 700  # Hz, chest ECG

### Loading a single subject

In [None]:
import pickle

def load_subject(subject_id):
    path = os.path.join(WESAD_PATH, subject_id, f'{subject_id}.pkl')
    with open(path, 'rb') as f:
        data = pickle.load(f, encoding='latin1')
    
    ecg  = data['signal']['chest']['ECG'].flatten()
    label = data['label']
    
    return ecg, label

ecg_raw, labels = load_subject('S2')

print(f'ECG samples: {len(ecg_raw)}')
print(f'Duration: {len(ecg_raw) / SAMPLE_RATE / 60:.1f} min')
print(f'Label values: {np.unique(labels)}')

## Exploratory analysis

In [None]:
# show a 10-second segment per condition
conditions = {1: 'baseline', 2: 'stress', 3: 'amusement'}
colors     = {1: 'steelblue', 2: 'tomato', 3: 'mediumseagreen'}

fig, axes = plt.subplots(3, 1, figsize=(14, 6), sharex=True)

for ax, (cond, name) in zip(axes, conditions.items()):
    # find first occurrence of this condition
    idx = np.where(labels == cond)[0][0]
    segment = ecg_raw[idx : idx + 10 * SAMPLE_RATE]
    ax.plot(segment, color=colors[cond], linewidth=0.8)
    ax.set_ylabel(name, fontsize=11)
    ax.set_xlim(0, len(segment))
    sns.despine(ax=ax)

axes[-1].set_xlabel('samples')
plt.suptitle('ECG raw signal — S2', fontsize=13)
plt.tight_layout()
plt.show()

In [None]:
# label distribution
label_series = pd.Series(labels)
counts = label_series[label_series.isin([1, 2, 3])].map(conditions).value_counts()

plt.figure(figsize=(6, 3))
counts.plot(kind='bar', color=['steelblue', 'tomato', 'mediumseagreen'])
plt.ylabel('# samples (700 Hz)')
plt.xlabel('condition')
plt.xticks(rotation=0)
sns.despine()
plt.tight_layout()
plt.show()

## Preprocessing

Before extracting features, the raw ECG needs to be filtered and the R-peaks detected. We use HeartPy for this — it handles both bandpass filtering and adaptive peak detection.

In [None]:
# bandpass filter: keep cardiac frequencies (0.5 – 40 Hz)
ecg_filtered = hp.filter_signal(ecg_raw,
                                cutoff=[0.5, 40],
                                sample_rate=SAMPLE_RATE,
                                filtertype='bandpass')

# show effect of filter on a 5-second window
t = np.arange(5 * SAMPLE_RATE)

fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(14, 5), sharex=True)
ax1.plot(t, ecg_raw[1000:1000 + 5 * SAMPLE_RATE], linewidth=0.8, color='gray')
ax1.set_title('raw', fontsize=11)
ax2.plot(t, ecg_filtered[1000:1000 + 5 * SAMPLE_RATE], linewidth=0.8, color='steelblue')
ax2.set_title('filtered (bandpass 0.5–40 Hz)', fontsize=11)
for ax in [ax1, ax2]:
    ax.set_ylabel('mV')
    sns.despine(ax=ax)
ax2.set_xlabel('samples')
plt.tight_layout()
plt.show()

In [None]:
# run peak detection on a short segment to visualize
segment = ecg_filtered[0 : 20 * SAMPLE_RATE]

wd, m = hp.process(hp.scale_data(segment), sample_rate=SAMPLE_RATE)

plt.figure(figsize=(14, 4))
hp.plotter(wd, m, title='R-peak detection — S2 (20 s)')
plt.show()

print('\nMeasures from this segment:')
for k, v in m.items():
    print(f'  {k}: {v:.4f}')

## Windowing

A continuous ECG recording cannot be fed directly into a classifier. Instead, we divide the signal into fixed-length windows and extract one feature vector per window. 

Two parameters control this:
- **window size**: how many seconds of signal to use — long enough to compute HRV reliably (at least 30 s), short enough to capture changes over time
- **overlap**: how much consecutive windows share — here we use 50% overlap to increase the number of samples without losing temporal resolution

In [None]:
WINDOW_SEC  = 60     # window length in seconds
OVERLAP     = 0.5    # 50% overlap
WINDOW_SAMP = WINDOW_SEC * SAMPLE_RATE
STEP_SAMP   = int(WINDOW_SAMP * (1 - OVERLAP))

def get_windows(ecg, labels, sample_rate=SAMPLE_RATE,
                window_samp=WINDOW_SAMP, step_samp=STEP_SAMP,
                valid_labels=(1, 2)):
    """Slide a window over the ECG and return segments with their majority label."""
    windows, window_labels = [], []
    for start in range(0, len(ecg) - window_samp, step_samp):
        end = start + window_samp
        seg_label = labels[start:end]
        # keep only baseline and stress windows with consistent labeling
        unique, counts = np.unique(seg_label, return_counts=True)
        majority = unique[np.argmax(counts)]
        purity   = counts.max() / len(seg_label)
        if majority in valid_labels and purity >= 0.9:
            windows.append(ecg[start:end])
            window_labels.append(majority)
    return windows, window_labels

# test on S2
windows, window_labels = get_windows(ecg_filtered, labels)
print(f'Total windows: {len(windows)}')
print(f'Label distribution: {pd.Series(window_labels).map({1: "baseline", 2: "stress"}).value_counts().to_dict()}')

In [None]:
# visualize windowing scheme on the label timeline
fig, ax = plt.subplots(figsize=(14, 2.5))

time_axis = np.arange(len(labels)) / SAMPLE_RATE / 60
label_mapped = pd.Series(labels).map({1: 1, 2: 2, 3: 3}).fillna(0)

ax.fill_between(time_axis, label_mapped, alpha=0.4, step='pre',
                color='gray', label='condition')
ax.set_yticks([1, 2, 3])
ax.set_yticklabels(['baseline', 'stress', 'amusement'])
ax.set_xlabel('time (min)')
ax.set_title('Label timeline — S2')
sns.despine(ax=ax)
plt.tight_layout()
plt.show()

## Feature extraction

We extract features from three domains, matching what was covered in the slides.

### Time-domain features

Time-domain HRV metrics are computed directly from the RR intervals (distances between consecutive R-peaks).

In [None]:
def extract_time_domain(rr_intervals):
    """
    rr_intervals: array of RR intervals in milliseconds
    Returns a dict with SDNN, RMSSD, pNN50, mean HR.
    """
    if len(rr_intervals) < 5:
        return None
    
    rr   = np.array(rr_intervals)
    diff = np.diff(rr)
    
    features = {
        'mean_rr'  : np.mean(rr),
        'sdnn'     : np.std(rr),
        'rmssd'    : np.sqrt(np.mean(diff**2)),
        'pnn50'    : np.sum(np.abs(diff) > 50) / len(diff) * 100,
        'mean_hr'  : 60000 / np.mean(rr),
    }
    return features

### Frequency-domain features

The RR interval series is interpolated to a uniform grid and then the Power Spectral Density (PSD) is estimated with Welch's method. We integrate the PSD in three bands:
- **VLF**: 0.003 – 0.04 Hz  
- **LF**: 0.04 – 0.15 Hz (sympathetic + parasympathetic activity)
- **HF**: 0.15 – 0.4 Hz (parasympathetic / respiratory sinus arrhythmia)  
- **LF/HF ratio**: classic autonomic balance index

In [None]:
def extract_frequency_domain(rr_intervals, fs_interp=4.0):
    """
    Interpolates the RR series to uniform sampling and computes PSD bands.
    fs_interp: resampling frequency in Hz (4 Hz is standard for HRV).
    """
    if len(rr_intervals) < 10:
        return None
    
    rr = np.array(rr_intervals)
    
    # cumulative time axis (in seconds)
    t_rr = np.cumsum(rr) / 1000.0
    t_rr -= t_rr[0]
    
    # interpolate to uniform grid
    t_uniform = np.arange(t_rr[0], t_rr[-1], 1.0 / fs_interp)
    if len(t_uniform) < 16:
        return None
    
    interp_fn = interp1d(t_rr, rr, kind='cubic', fill_value='extrapolate')
    rr_uniform = interp_fn(t_uniform)
    
    # Welch PSD
    freqs, psd = welch(rr_uniform, fs=fs_interp, nperseg=min(256, len(rr_uniform)))
    
    def band_power(f_low, f_high):
        idx = (freqs >= f_low) & (freqs < f_high)
        return np.trapz(psd[idx], freqs[idx])
    
    vlf = band_power(0.003, 0.04)
    lf  = band_power(0.04,  0.15)
    hf  = band_power(0.15,  0.40)
    total = vlf + lf + hf
    
    features = {
        'lf_power'   : lf,
        'hf_power'   : hf,
        'lf_hf_ratio': lf / hf if hf > 0 else 0,
        'lf_norm'    : lf / (total - vlf) * 100 if (total - vlf) > 0 else 0,
        'hf_norm'    : hf / (total - vlf) * 100 if (total - vlf) > 0 else 0,
    }
    return features

### Non-linear features (Poincaré plot)

The Poincaré plot maps each RR interval against the next one (RR_n vs RR_{n+1}). SD1 captures short-term variability (parasympathetic), SD2 captures long-term variability.

In [None]:
def extract_nonlinear(rr_intervals):
    """SD1, SD2 and SD2/SD1 from Poincaré plot."""
    if len(rr_intervals) < 5:
        return None
    
    rr   = np.array(rr_intervals)
    rr1  = rr[:-1]
    rr2  = rr[1:]
    
    sd1 = np.std((rr2 - rr1) / np.sqrt(2))
    sd2 = np.std((rr2 + rr1) / np.sqrt(2))
    
    features = {
        'sd1'      : sd1,
        'sd2'      : sd2,
        'sd2_sd1'  : sd2 / sd1 if sd1 > 0 else 0,
    }
    return features

In [None]:
# visualize Poincare plot for baseline vs stress
fig, axes = plt.subplots(1, 2, figsize=(10, 4))

for ax, (cond, name) in zip(axes, {1: 'baseline', 2: 'stress'}.items()):
    # grab first valid window of this condition
    idx = [i for i, l in enumerate(window_labels) if l == cond][0]
    seg = windows[idx]
    try:
        wd, m = hp.process(hp.scale_data(seg), sample_rate=SAMPLE_RATE)
        rr = np.array(wd['RR_list_cor'])
        ax.scatter(rr[:-1], rr[1:], alpha=0.5, s=15, color=colors[cond])
        ax.set_title(f'Poincaré — {name}', fontsize=11)
        ax.set_xlabel('RR_n (ms)')
        ax.set_ylabel('RR_{n+1} (ms)')
        ax.set_aspect('equal')
        sns.despine(ax=ax)
    except:
        ax.set_title(f'{name} — processing error')

plt.tight_layout()
plt.show()

### Building the feature matrix

In [None]:
def extract_all_features(ecg_segment, sample_rate=SAMPLE_RATE):
    """Run HeartPy on one window and extract all three feature domains."""
    try:
        wd, m = hp.process(hp.scale_data(ecg_segment),
                           sample_rate=sample_rate,
                           clean_rr=True)
        rr = np.array(wd['RR_list_cor'])
        if len(rr) < 10:
            return None
        
        td = extract_time_domain(rr)
        fd = extract_frequency_domain(rr)
        nl = extract_nonlinear(rr)
        
        if td is None or fd is None or nl is None:
            return None
        
        return {**td, **fd, **nl}
    except:
        return None

In [None]:
# extract features for all windows of S2
records = []
for win, lbl in zip(windows, window_labels):
    feats = extract_all_features(win)
    if feats is not None:
        feats['label']   = lbl
        feats['subject'] = 'S2'
        records.append(feats)

df_s2 = pd.DataFrame(records)
print(df_s2.shape)
df_s2.head()

In [None]:
# compare features between conditions
feature_cols = [c for c in df_s2.columns if c not in ['label', 'subject']]
label_names  = {1: 'baseline', 2: 'stress'}
df_s2['condition'] = df_s2['label'].map(label_names)

fig, axes = plt.subplots(3, 4, figsize=(16, 9))
axes = axes.flatten()

for ax, feat in zip(axes, feature_cols):
    sns.boxplot(data=df_s2, x='condition', y=feat,
                palette={'baseline': 'steelblue', 'stress': 'tomato'},
                ax=ax, width=0.5)
    ax.set_title(feat, fontsize=10)
    ax.set_xlabel('')
    sns.despine(ax=ax)

for ax in axes[len(feature_cols):]:
    ax.set_visible(False)

plt.suptitle('Feature distributions — S2', fontsize=13)
plt.tight_layout()
plt.show()

## Multi-subject feature matrix

We repeat the same pipeline for all subjects to build a dataset suitable for Leave-One-Subject-Out evaluation.

In [None]:
all_records = []

for subj in subjects:
    try:
        ecg, lbl = load_subject(subj)
        ecg_f = hp.filter_signal(ecg, cutoff=[0.5, 40],
                                  sample_rate=SAMPLE_RATE,
                                  filtertype='bandpass')
        wins, win_lbls = get_windows(ecg_f, lbl)
        
        n_ok = 0
        for win, wlbl in zip(wins, win_lbls):
            feats = extract_all_features(win)
            if feats is not None:
                feats['label']   = wlbl
                feats['subject'] = subj
                all_records.append(feats)
                n_ok += 1
        
        print(f'{subj}: {n_ok} windows')
    except Exception as e:
        print(f'{subj}: skipped ({e})')

df_all = pd.DataFrame(all_records)
df_all['condition'] = df_all['label'].map({1: 'baseline', 2: 'stress'})
print(f'\nTotal: {len(df_all)} windows — {df_all["subject"].nunique()} subjects')
df_all.head()

In [None]:
# class balance per subject
pivot = df_all.groupby(['subject', 'condition']).size().unstack(fill_value=0)
pivot.plot(kind='bar', figsize=(12, 4),
           color=['steelblue', 'tomato'],
           width=0.7)
plt.ylabel('# windows')
plt.xlabel('subject')
plt.xticks(rotation=45)
plt.legend(title='condition')
sns.despine()
plt.tight_layout()
plt.show()

## Classification

### Leave-One-Subject-Out (LOSO) evaluation

In previous units we split data randomly into train/test. For physiological data this is problematic: the same subject appearing in both sets inflates accuracy because the model learns individual physiology rather than generalizable patterns.

Leave-One-Subject-Out (LOSO) is a stricter protocol: on each fold, one subject is held out completely as the test set, and the remaining subjects are used for training. This simulates deploying the model on a new, unseen person.

In [None]:
feature_cols = [c for c in df_all.columns if c not in ['label', 'subject', 'condition']]

X = df_all[feature_cols].values
y = df_all['label'].values
groups = df_all['subject'].values

logo = LeaveOneGroupOut()

classifiers = {
    'Decision Tree' : DecisionTreeClassifier(random_state=42),
    'kNN'           : KNeighborsClassifier(n_neighbors=5),
    'Random Forest' : RandomForestClassifier(n_estimators=100, random_state=42),
}

results = {name: {'y_true': [], 'y_pred': []} for name in classifiers}

for train_idx, test_idx in logo.split(X, y, groups):
    X_train, X_test = X[train_idx], X[test_idx]
    y_train, y_test = y[train_idx], y[test_idx]
    
    scaler  = StandardScaler()
    X_train = scaler.fit_transform(X_train)
    X_test  = scaler.transform(X_test)
    
    for name, clf in classifiers.items():
        clf.fit(X_train, y_train)
        pred = clf.predict(X_test)
        results[name]['y_true'].extend(y_test)
        results[name]['y_pred'].extend(pred)

# print scores
for name in classifiers:
    y_true = results[name]['y_true']
    y_pred = results[name]['y_pred']
    f1 = f1_score(y_true, y_pred, average='macro')
    acc = np.mean(np.array(y_true) == np.array(y_pred))
    print(f'{name:20s}  accuracy: {acc:.3f}   F1 (macro): {f1:.3f}')

### Confusion matrices

In [None]:
fig, axes = plt.subplots(1, 3, figsize=(14, 4))
class_names = ['baseline', 'stress']

for ax, name in zip(axes, classifiers):
    cm = confusion_matrix(results[name]['y_true'],
                          results[name]['y_pred'],
                          labels=[1, 2])
    cm_norm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis] * 100
    
    disp = ConfusionMatrixDisplay(confusion_matrix=cm_norm,
                                   display_labels=class_names)
    disp.plot(ax=ax, colorbar=False, cmap='Blues')
    ax.set_title(name, fontsize=11)

plt.suptitle('Confusion matrices — LOSO (% per true class)', fontsize=12)
plt.tight_layout()
plt.show()

### Feature importance (Random Forest)

In [None]:
rf = classifiers['Random Forest']

# refit on all data to get stable importances
scaler_full = StandardScaler()
X_scaled = scaler_full.fit_transform(X)
rf.fit(X_scaled, y)

importances = pd.Series(rf.feature_importances_, index=feature_cols)
importances = importances.sort_values(ascending=True)

plt.figure(figsize=(8, 5))
importances.plot(kind='barh', color='steelblue')
plt.xlabel('importance')
plt.title('Feature importance — Random Forest')
sns.despine()
plt.tight_layout()
plt.show()