
# EEG Beta ERD Pipeline ‚Äî Annotated Walkthrough

This notebook walks through the provided Python script line by line and explains:

- How EEG samples are acquired via **Lab Streaming Layer (LSL)**.
- How we filter the signal and compute beta-band power (13‚Äì30 Hz).
- How we estimate **MRBD**.
- Practical considerations: buffers, timestamps, artifact rejection, and markers.



## Imports

- **numpy**: numeric arrays and math.
- **time**: timing for trial loops.
- **pylsl** (`StreamInlet`, `resolve_streams`, `StreamInfo`, `StreamOutlet`, `local_clock`):
  - Discover LSL streams, pull samples, and create/send marker streams.
- **psychopy.sound**: plays beeps to cue movement/rest.
- **scipy.signal** (`welch`, `butter`, `lfilter`): PSD estimation and bandpass filtering.


In [None]:
import numpy as np
import time
from pylsl import StreamInlet, resolve_streams, StreamInfo, StreamOutlet, local_clock
from psychopy import sound
from scipy.signal import welch, butter, lfilter


## Stream discovery & markers

- The script searches for an LSL stream named **`UnicornRecorderLSLStream`** and opens it.
- It also creates its own **marker outlet** called `BetaMarkers`. The helper `send_marker(val)`
  pushes markers (`1` for **movement**, `0` for **rest**) with the LSL `local_clock()` timestamp.


In [None]:
# ---------- Setup ----------
print("üîç Resolving EEG stream...")
eeg_streams = [s for s in resolve_streams() if s.name() == "UnicornRecorderLSLStream"]
if not eeg_streams:
    raise RuntimeError("‚ùå No EEG stream found.")
print(f"‚úÖ Connected to: {eeg_streams[0].name()}")
inlet = StreamInlet(eeg_streams[0])

# Marker outlet (we emit our own markers)
marker_info = StreamInfo(name='BetaMarkers', type='Markers', channel_count=1,
                         channel_format='string', source_id='beta_marker_001')
marker_outlet = StreamOutlet(marker_info)
def send_marker(val: int) -> float:
    ts = local_clock()
    marker_outlet.push_sample([str(val)], timestamp=ts)
    print(f"üìç Marker@{ts:.6f}: {val}")
    return ts


## Parameters & task design

- `fs = 250.0` Hz: sampling rate expected from the stream (used in PSD and filtering).
- `ch_idx = 1`: channel to analyze (0-indexed) ==> Should be adapted based on the EEG setup you use. You want the index of the C3 electrode 
- `beta_band = (13, 30)` Hz.
- `artifact_ptp_uv = 200`: reject windows whose **peak-to-peak amplitude** exceeds this (post-filter).  
- Trial timing: **4 s movement** + **6 s rest**; `trials = 50`.
- A **beep** (440 Hz, 0.2 s) cues start and end of the movement period.


In [None]:
fs = 250.0
ch_idx = 1                # 0-indexed channel to analyze
beta_band = (13.0, 30.0)
artifact_ptp_uv = 200.0   # reject if peak-to-peak after filtering exceeds this

movement_s = 4.0
rest_s     = 6.0
trials     = 50

beep = sound.Sound(value=440, secs=0.2)


## Filtering & beta power

- `bandpass_butter`: a 4th-order Butterworth bandpass (0.5‚Äì40 Hz) applied with `lfilter`.
- `beta_power_1s`: Welch PSD on a **1 s** segment (`nperseg = fs`) and mean power in 13‚Äì30 Hz.
- `mean_beta_nonoverlap_1s`: divides an interval into **non-overlapping** 1 s windows and averages their beta power.


In [None]:
# ---------- Filter ----------
def bandpass_butter(x, fs, low=0.5, high=40.0, order=4):
    nyq = fs * 0.5
    b, a = butter(order, [low/nyq, high/nyq], btype='band')
    return lfilter(b, a, x)

# ---------- Beta power helpers ----------
def beta_power_1s(seg_1s, fs):
    """Beta power from a 1 s segment using Welch with nperseg=1 s."""
    if len(seg_1s) < int(0.9*fs):
        return np.nan
    freqs, psd = welch(seg_1s, fs=fs, nperseg=int(fs), noverlap=0)
    m = (freqs >= beta_band[0]) & (freqs <= beta_band[1])
    return float(np.mean(psd[m])) if np.any(m) else np.nan

def mean_beta_nonoverlap_1s(x, fs):
    """Mean beta over non-overlapping 1 s windows fully inside x."""
    n = int(fs)
    if len(x) < n:
        return np.nan
    # use only full seconds
    full = (len(x) // n) * n
    x = x[:full]
    betas = []
    for i in range(0, full, n):
        b = beta_power_1s(x[i:i+n], fs)
        if np.isfinite(b):
            betas.append(b)
    return float(np.mean(betas)) if betas else np.nan


## Rolling buffers & slicing

The script keeps a small rolling buffer of timestamps (`ts_buf`) and samples (`x_buf`) for **one channel**:

- `trim(now_ts)`: drops samples older than 60 s relative to `now_ts` to keep memory bounded.
- `slice_by_time(t0, t1)`: returns samples with timestamps in **[t0, t1)**.
- `slice_last_seconds(end_ts, seconds)`: returns up to `seconds*fs` samples **immediately before** `end_ts`,
  handling timestamp jitter with a fallback index-based approach.


In [None]:
# ---------- Small rolling buffer ----------
# We‚Äôll keep the last ~60 s of raw samples for one channel + timestamps.
MAX_KEEP_S = 60.0
ts_buf = []
x_buf  = []

def trim(now_ts):
    cutoff = now_ts - MAX_KEEP_S
    # drop from start while older than cutoff
    i = 0
    while i < len(ts_buf) and ts_buf[i] < cutoff:
        i += 1
    if i > 0:
        del ts_buf[:i]
        del x_buf[:i]

def slice_by_time(t0, t1):
    """Return samples with ts in [t0, t1)."""
    # simple linear pass (buffers are short)
    out = [x for (t, x) in zip(ts_buf, x_buf) if t0 <= t < t1]
    return np.asarray(out, dtype=float)

def slice_last_seconds(end_ts, seconds):
    """Return up to exactly seconds*fs samples immediately before end_ts."""
    n_needed = int(seconds * fs)
    # First try time-based
    x = [x for (t, x) in zip(ts_buf, x_buf) if end_ts - seconds <= t <= end_ts]
    if len(x) >= n_needed:
        return np.asarray(x[-n_needed:], dtype=float)
    # Fallback: index-based from tail (in case of timestamp jitter)
    # Take samples whose timestamps are < end_ts, from the end backwards.
    xs = []
    for t, x in zip(reversed(ts_buf), reversed(x_buf)):
        if t < end_ts:
            xs.append(x)
            if len(xs) == n_needed:
                break
    return np.asarray(list(reversed(xs)), dtype=float)


## Main loop logic

For each trial:

1. **Rest** starts
   - Beep, send marker **0** (`rest_ts`).
2. **Movement** starts
   - Beep, send marker **1** (`onset_ts`).
   - Collect samples for `movement_s` seconds into the rolling buffers.
3. **Baseline** beta:
   - Take exactly `[onset_ts-1 s, onset_ts)`; filter; artifact-check; compute beta power.
4. **Task** beta:
   - Take samples fully inside `[onset_ts, move_end_ts)`; filter; artifact-check; compute mean beta across non-overlap 1 s windows.
5. **ERD%**:
   - If both baseline and task beta are finite (and baseline non-zero), compute MRBD
   - Otherwise, report **NaN** with context.


In [None]:
# ---------- Main loop (REST ‚Üí MOVEMENT) ----------
print("üöÄ Starting trials...")
for trial in range(1, trials + 1):

    # ----- REST (pre-movement baseline period) -----
    beep.play()
    ts = local_clock()
    rest_ts = send_marker(0)
    t_start = time.time()
    # collect during REST
    while time.time() - t_start < rest_s:
        s, t = inlet.pull_sample(timeout=0.1)
        if s is None:
            continue
        ts_buf.append(t if t is not None else local_clock())
        x_buf.append(s[ch_idx])
        trim(ts_buf[-1])

    # ----- MOVEMENT (task) -----
    beep.play()
    onset_ts = send_marker(1)  # MOVEMENT starts
    t_start = time.time()
    # collect during MOVEMENT
    while time.time() - t_start < movement_s:
        s, t = inlet.pull_sample(timeout=0.1)
        if s is None:
            continue
        ts_buf.append(t if t is not None else local_clock())
        x_buf.append(s[ch_idx])
        trim(ts_buf[-1])
    
    # Mark end of movement and the start of the next REST window
    move_end_ts = local_clock()  # End movement timestamp

    # ---------- Metrics ----------
    # Baseline: exactly the 1 s immediately before movement onset
    # (fix: correct argument order for slice_last_seconds)
    base_raw = slice_last_seconds(onset_ts, 1.0)
    base_flt = bandpass_butter(base_raw, fs) if base_raw.size else base_raw
    if base_flt.size == 0 or np.ptp(base_flt) > artifact_ptp_uv:
        baseline_beta = np.nan
    else:
        baseline_beta = beta_power_1s(base_flt, fs)

    # Task: only data fully inside [onset_ts, move_end_ts)
    task_raw = slice_by_time(onset_ts, move_end_ts)
    task_flt = bandpass_butter(task_raw, fs) if task_raw.size else task_raw
    if task_flt.size == 0 or np.ptp(task_flt) > artifact_ptp_uv:
        task_beta = np.nan
    else:
        task_beta = mean_beta_nonoverlap_1s(task_flt, fs)

    # ERD%
    if np.isfinite(baseline_beta) and np.isfinite(task_beta) and baseline_beta != 0:
        erd = ((task_beta - baseline_beta) / baseline_beta) * 100.0
        print(f"üèÅ Trial {trial:02d} | BaselineŒ≤ {baseline_beta:.4f} | TaskŒ≤ {task_beta:.4f} | ERD% {erd:.2f}")
    else:
        print(f"üèÅ Trial {trial:02d} | ERD% NaN "
              f"(baselineŒ≤={baseline_beta}, taskŒ≤={task_beta}, n_task={task_flt.size})")

print("‚úÖ Done.")


## How to adapt this to your hardware

- Change the stream name match from `"UnicornRecorderLSLStream"` to your device's stream name.
- Confirm the sampling rate `fs`. If you can't guarantee it, dynamically estimate from timestamps (rolling average of inter-sample intervals).
- Add preprocessing filters (notch and band pass)
- Make sure to correctly index **C3 electrode** in parameters **ch_idx**.



### Notes & best practices 

- **Artifact handling**: Peak-to-peak rejection is simple. For better robustness, add:
  - line noise removal (e.g., notch at 50/60 Hz),
  - eye-blink/EMG detection (if multi-channel).
- **Markers & alignment**: Prefer **hardware timestamps** if available, even **EMG signal**. Keep an eye on **timestamp jitter**.
- **Beta band choice**: Many studies use 13‚Äì30 Hz; you can parameterize this per subject.
- **Windowing**: Non-overlapping 1 s windows are easy to interpret. Overlapping windows can smooth estimates if needed.
- **Troubleshooting**: Log `len(task_flt)`, computed `nperseg`, and PSD peak locations; plot PSDs to confirm beta suppression.
