In [4]:
import numpy as np
import pandas as pd

EPS = 1e-9

def _robust_dt(t: np.ndarray) -> float:
    """Median dt, ignoring non-increasing timestamps."""
    t = np.asarray(t, dtype=float)
    dt = np.diff(t)
    dt = dt[dt > 0]
    return float(np.median(dt)) if dt.size else np.nan

def _mag(xyz: np.ndarray) -> np.ndarray:
    return np.linalg.norm(xyz, axis=1)

def _basic_stats(prefix: str, x: np.ndarray) -> dict:
    x = np.asarray(x, dtype=float)
    x = x[np.isfinite(x)]
    if x.size == 0:
        return {f"{prefix}_{k}": np.nan for k in ["mean","std","median","p95","max","min"]}
    return {
        f"{prefix}_mean": float(np.mean(x)),
        f"{prefix}_std": float(np.std(x, ddof=0)),
        f"{prefix}_median": float(np.median(x)),
        f"{prefix}_p95": float(np.percentile(x, 95)),
        f"{prefix}_max": float(np.max(x)),
        f"{prefix}_min": float(np.min(x)),
    }

def _time_above(prefix: str, x: np.ndarray, thresholds: list[float]) -> dict:
    x = np.asarray(x, dtype=float)
    x = x[np.isfinite(x)]
    out = {}
    if x.size == 0:
        for thr in thresholds:
            out[f"{prefix}_pct_gt_{thr}"] = np.nan
        return out
    for thr in thresholds:
        out[f"{prefix}_pct_gt_{thr}"] = float(np.mean(x > thr) * 100.0)
    return out

def _jerk_from_signal(xyz: np.ndarray, dt: float) -> np.ndarray:
    """
    Finite-diff derivative magnitude (jerk-like):
      if xyz is acceleration -> jerk
      if xyz is angular acceleration -> angular jerk
    """
    if not np.isfinite(dt) or dt <= 0 or xyz is None:
        return np.full((len(xyz),), np.nan) if xyz is not None else np.array([np.nan])
    d = np.vstack([np.zeros(3), np.diff(xyz, axis=0)])
    j = d / dt
    return _mag(j)

def _derive_acc_from_vel(v_xyz: np.ndarray, dt: float) -> np.ndarray:
    """Acceleration from velocity via finite differences."""
    if not np.isfinite(dt) or dt <= 0:
        return np.full_like(v_xyz, np.nan)
    dv = np.vstack([np.zeros(3), np.diff(v_xyz, axis=0)])
    return dv / dt

def extract_quest_round_features(
    df: pd.DataFrame,
    time_col: str = "timestamp",              # set to your Quest time column (e.g., "lsl_timestamp")
    vel_cols: tuple[str, str, str] = ("vx", "vy", "vz"),
    angvel_cols: tuple[str, str, str] = ("wx", "wy", "wz"),  # or ("ang_vx","ang_vy","ang_vz")
    t_start: float | None = None,
    t_end: float | None = None,
    # thresholds (pick values that match your units)
    speed_thresholds: list[float] = (0.3, 0.5, 1.0),          # m/s (if your v is m/s)
    acc_thresholds: list[float] = (1.0, 2.0, 5.0),            # m/s^2
    angspeed_thresholds: list[float] = (0.5, 1.0, 2.0),       # rad/s  (if your w is rad/s)
    angacc_thresholds: list[float] = (1.0, 2.0, 5.0),         # rad/s^2
    dropna: bool = True,
) -> dict:
    """
    Per-round features from Quest kinematics:
      - speed (|v|), linear acceleration (|a|), linear jerk (|da/dt|)
      - angular speed (|w|), angular acceleration (|alpha|), angular jerk (|dalpha/dt|)
      - component ranges/stds (optional but useful)
      - time-above-threshold percentages

    Returns a dict of scalar features for the selected time window.
    """
    d = df.copy()

    # time slice
    if t_start is not None:
        d = d[d[time_col] >= t_start]
    if t_end is not None:
        d = d[d[time_col] <= t_end]
    d = d.sort_values(time_col)

    out = {"n_samples": int(len(d))}

    # not enough samples
    if len(d) < 3:
        # keep keys stable so you can build a dataframe later
        base_keys = [
            "duration_s","dt_median_s",
            "speed_mean","speed_p95","acc_mean","acc_p95","jerk_p95",
            "angspeed_mean","angspeed_p95","angacc_mean","angacc_p95","angjerk_p95",
        ]
        out.update({k: np.nan for k in base_keys})
        return out

    # numeric arrays
    t = pd.to_numeric(d[time_col], errors="coerce").to_numpy(dtype=float)
    dt = _robust_dt(t)
    out["dt_median_s"] = dt
    out["duration_s"] = float(t[-1] - t[0]) if np.isfinite(t[-1] - t[0]) else np.nan

    v = d.loc[:, list(vel_cols)].apply(pd.to_numeric, errors="coerce").to_numpy(dtype=float)
    w = d.loc[:, list(angvel_cols)].apply(pd.to_numeric, errors="coerce").to_numpy(dtype=float)

    if dropna:
        # drop rows where any required signal is NaN
        mask = np.isfinite(t) & np.isfinite(v).all(axis=1) & np.isfinite(w).all(axis=1)
        t, v, w = t[mask], v[mask], w[mask]
        out["n_samples_after_dropna"] = int(len(t))
        if len(t) < 3:
            out.update({k: np.nan for k in [
                "duration_s","dt_median_s",
                "speed_mean","speed_p95","acc_mean","acc_p95","jerk_p95",
                "angspeed_mean","angspeed_p95","angacc_mean","angacc_p95","angjerk_p95",
            ]})
            return out
        dt = _robust_dt(t)
        out["dt_median_s"] = dt
        out["duration_s"] = float(t[-1] - t[0]) if np.isfinite(t[-1] - t[0]) else np.nan

    # magnitudes
    speed = _mag(v)
    angspeed = _mag(w)

    # derived accelerations
    a = _derive_acc_from_vel(v, dt)
    alpha = _derive_acc_from_vel(w, dt)  # angular acceleration

    accmag = _mag(a)
    angaccmag = _mag(alpha)

    # jerks
    jerk = _jerk_from_signal(a, dt)
    angjerk = _jerk_from_signal(alpha, dt)

    # stats
    out.update(_basic_stats("speed", speed))
    out.update(_basic_stats("acc", accmag))
    out.update(_basic_stats("jerk", jerk))

    out.update(_basic_stats("angspeed", angspeed))
    out.update(_basic_stats("angacc", angaccmag))
    out.update(_basic_stats("angjerk", angjerk))

    # time above thresholds
    out.update(_time_above("speed", speed, list(speed_thresholds)))
    out.update(_time_above("acc", accmag, list(acc_thresholds)))
    out.update(_time_above("angspeed", angspeed, list(angspeed_thresholds)))
    out.update(_time_above("angacc", angaccmag, list(angacc_thresholds)))

    # component variability (often useful for “movement style”)
    for name, arr in [("v", v), ("w", w), ("a", a), ("alpha", alpha)]:
        out[f"{name}_std_x"] = float(np.nanstd(arr[:, 0]))
        out[f"{name}_std_y"] = float(np.nanstd(arr[:, 1]))
        out[f"{name}_std_z"] = float(np.nanstd(arr[:, 2]))
        out[f"{name}_range_x"] = float(np.nanmax(arr[:, 0]) - np.nanmin(arr[:, 0]))
        out[f"{name}_range_y"] = float(np.nanmax(arr[:, 1]) - np.nanmin(arr[:, 1]))
        out[f"{name}_range_z"] = float(np.nanmax(arr[:, 2]) - np.nanmin(arr[:, 2]))

    return out


def extract_features_per_round(
    df: pd.DataFrame,
    round_col: str = "round_id",
    **kwargs
) -> pd.DataFrame:
    """
    Convenience wrapper:
    groups by round_col and returns one feature-row per round.
    """
    rows = []
    for rid, g in df.groupby(round_col, sort=True):
        feats = extract_quest_round_features(g, **kwargs)
        feats[round_col] = rid
        rows.append(feats)
    return pd.DataFrame(rows).set_index(round_col, drop=False)

In [None]:
# Example: your df has columns:
# lsl_timestamp, vx, vy, vz, ang_vx, ang_vy, ang_vz, round_id

feat_df = extract_features_per_round(
    df,
    round_col="round_id",
    time_col="lsl_timestamp",
    vel_cols=("vx","vy","vz"),
    angvel_cols=("ang_vx","ang_vy","ang_vz"),
    # thresholds matching your units (edit as needed)
    speed_thresholds=[0.3, 0.5, 1.0],
    angspeed_thresholds=[0.5, 1.0, 2.0],  # rad/s
)