# Kinematics QA: Dual-Track Velocities (Step 06 QA)

**Purpose:** Compute and compare two velocity tracks for QA when T-pose may be flawed.

- **Track A (Raw Relative):** Root-relative linear and angular velocities — independent of reference. Valid even if T-pose calibration is bad.
- **Track B (Zeroed Relative):** Reference-dependent angular velocities — normalized for inter-subject comparison.

**Input:** Step 04 filtered parquet (quaternions + positions), Step 05 reference_map and kinematics_map.  
**Output:** `{RUN_ID}__kinematics_qa.parquet` with both velocity sets + Plotly comparison.

In [1]:
import os
import sys
import json
import numpy as np
import pandas as pd
from pathlib import Path
from scipy.spatial.transform import Rotation as Rot
from scipy.signal import savgol_filter

if os.path.basename(os.getcwd()) == 'notebooks':
    PROJECT_ROOT = os.path.abspath(os.path.join(os.getcwd(), ".."))
else:
    PROJECT_ROOT = os.path.abspath(os.getcwd())
SRC_PATH = os.path.join(PROJECT_ROOT, "src")
if SRC_PATH not in sys.path:
    sys.path.insert(0, SRC_PATH)

from pipeline_config import CONFIG
from angular_velocity import quaternion_log_angular_velocity

DERIV_FILTERED = os.path.join(PROJECT_ROOT, CONFIG['derivatives_dir'], "step_04_filtering")
DERIV_REF = os.path.join(PROJECT_ROOT, CONFIG['derivatives_dir'], "step_05_reference")
DERIV_KIN = os.path.join(PROJECT_ROOT, CONFIG['derivatives_dir'], "step_06_kinematics")
os.makedirs(DERIV_KIN, exist_ok=True)

RUN_ID = Path(CONFIG['current_csv']).stem
FS = float(CONFIG.get('FS_TARGET', 120.0))
SG_WINDOW_SEC = CONFIG.get('SG_WINDOW_SEC', 0.175)
SG_POLYORDER = CONFIG.get('SG_POLYORDER', 3)

INPUT_PARQUET = Path(DERIV_FILTERED) / f"{RUN_ID}__filtered.parquet"
if not INPUT_PARQUET.exists():
    raise FileNotFoundError(f"Step 04 output not found: {INPUT_PARQUET}")

df_in = pd.read_parquet(INPUT_PARQUET)

map_path = Path(DERIV_REF) / f"{RUN_ID}__kinematics_map.json"
if not map_path.exists():
    map_path = Path(PROJECT_ROOT) / CONFIG['derivatives_dir'] / "step_03_resample" / f"{RUN_ID}__kinematics_map.json"
with open(map_path, 'r', encoding='utf-8') as f:
    kinematics_map = json.load(f)

ref_path = Path(DERIV_REF) / f"{RUN_ID}__reference_map.json"
if not ref_path.exists():
    raise FileNotFoundError(f"Reference map not found: {ref_path}")
with open(ref_path, 'r', encoding='utf-8') as f:
    ref_pose = json.load(f)

ROOT_SEGMENT = 'Pelvis' if 'Pelvis' in kinematics_map else 'Hips'
print(f"RUN_ID: {RUN_ID}, FS: {FS}, Root: {ROOT_SEGMENT}")
print(f"SG: window={SG_WINDOW_SEC}s, polyorder={SG_POLYORDER}")
print(f"Loaded {len(df_in)} frames, {len(kinematics_map)} joints.")

RUN_ID: 763_T2_P2_R2_Take_2025-12-25 10.51.23 AM_005, FS: 120.0, Root: Hips
SG: window=0.175s, polyorder=3
Loaded 17262 frames, 19 joints.


## Helper: Quaternion continuity (shortest path) + SavGol derivative params

In [2]:
def enforce_quat_continuity(q):
    """Ensure quaternions follow shortest path (q and -q same rotation)."""
    q = np.asarray(q, dtype=float)
    if q.ndim == 1:
        q = q.reshape(1, -1)
    out = q.copy()
    for i in range(1, len(out)):
        if np.dot(out[i], out[i-1]) < 0:
            out[i] *= -1
    return out

def savgol_window_len(fs, w_sec, polyorder):
    w_len = int(round(w_sec * fs))
    if w_len % 2 == 0:
        w_len += 1
    w_len = max(5, w_len, polyorder + 2)
    if w_len % 2 == 0:
        w_len += 1
    return w_len

dt = 1.0 / FS
w_len = savgol_window_len(FS, SG_WINDOW_SEC, SG_POLYORDER)
print(f"SavGol window length: {w_len} frames")

SavGol window length: 21 frames


## Track A: Raw relative (reference-independent)

- **Linear:** Position relative to root (Pelvis/Hips) → SavGol derivative → `vel_linear_relative_raw`.
- **Angular:** inv(parent)*child quaternions (no zeroing) → quat_log (child_body) → `vel_angular_relative_raw`.

In [3]:
result = {'time_s': df_in['time_s'].values}

# --- Track A: Linear velocity (root-relative) ---
pos_cols = [c for c in df_in.columns if c.endswith(('__px','__py','__pz')) and df_in[c].notna().all()]
root_pos_cols = [f"{ROOT_SEGMENT}__px", f"{ROOT_SEGMENT}__py", f"{ROOT_SEGMENT}__pz"]
axis_idx = {'__px': 0, '__py': 1, '__pz': 2}
if all(c in df_in.columns for c in root_pos_cols):
    root_pos = df_in[root_pos_cols].values
    for col in pos_cols:
        seg, suffix = col.split('__')[0], '__' + col.split('__')[1]
        idx = axis_idx.get(suffix, 0)
        pos_rel = df_in[col].values - root_pos[:, idx]
        vel = savgol_filter(pos_rel, window_length=w_len, polyorder=SG_POLYORDER, deriv=1, delta=dt)
        result[f"{seg}__vel_linear_relative_raw_{col[-1]}"] = vel
    print("Track A linear: vel_linear_relative_raw computed.")
else:
    print("Track A linear: root position columns missing; skipping.")

# --- Track A: Angular velocity (raw relative, no zeroing) ---
for joint_name, info in kinematics_map.items():
    parent_name = info['parent']
    q_c_cols = [f"{joint_name}__qx", f"{joint_name}__qy", f"{joint_name}__qz", f"{joint_name}__qw"]
    if not all(c in df_in.columns for c in q_c_cols):
        continue
    q_c = df_in[q_c_cols].values
    q_c = enforce_quat_continuity(q_c)
    if parent_name is not None:
        q_p_cols = [f"{parent_name}__qx", f"{parent_name}__qy", f"{parent_name}__qz", f"{parent_name}__qw"]
        if not all(c in df_in.columns for c in q_p_cols):
            continue
        q_p = enforce_quat_continuity(df_in[q_p_cols].values)
        rot_p = Rot.from_quat(q_p)
        rot_c = Rot.from_quat(q_c)
        rot_rel = rot_p.inv() * rot_c
        q_rel = rot_rel.as_quat()
    else:
        q_rel = q_c
    q_rel = enforce_quat_continuity(q_rel)
    omega = quaternion_log_angular_velocity(q_rel, FS, frame='local')
    omega_deg = np.degrees(omega)
    result[f"{joint_name}__vel_angular_relative_raw_x"] = omega_deg[:, 0]
    result[f"{joint_name}__vel_angular_relative_raw_y"] = omega_deg[:, 1]
    result[f"{joint_name}__vel_angular_relative_raw_z"] = omega_deg[:, 2]
    # Angles (for validation): Euler XYZ degrees from same relative rotation
    euler_rel = Rot.from_quat(q_rel).as_euler('XYZ', degrees=True)
    result[f"{joint_name}__angle_relative_raw_x"] = euler_rel[:, 0]
    result[f"{joint_name}__angle_relative_raw_y"] = euler_rel[:, 1]
    result[f"{joint_name}__angle_relative_raw_z"] = euler_rel[:, 2]

print("Track A angular: vel_angular_relative_raw + angle_relative_raw computed.")

Track A linear: vel_linear_relative_raw computed.
Track A angular: vel_angular_relative_raw + angle_relative_raw computed.


## Track B: Zeroed relative (reference-dependent)

Apply reference_map: `inv(reference) * current` relative quaternion → quat_log (child_body) → `vel_angular_zeroed`.

In [4]:
# --- Track B: Angular velocity (zeroed to T-pose) ---
for joint_name, info in kinematics_map.items():
    parent_name = info['parent']
    q_c_cols = [f"{joint_name}__qx", f"{joint_name}__qy", f"{joint_name}__qz", f"{joint_name}__qw"]
    if not all(c in df_in.columns for c in q_c_cols):
        continue
    q_c = enforce_quat_continuity(df_in[q_c_cols].values)
    q_c_ref = np.array([ref_pose[f"{joint_name}__qx"], ref_pose[f"{joint_name}__qy"], ref_pose[f"{joint_name}__qz"], ref_pose[f"{joint_name}__qw"]])
    rot_c_ref = Rot.from_quat(q_c_ref)
    if parent_name is not None:
        q_p_cols = [f"{parent_name}__qx", f"{parent_name}__qy", f"{parent_name}__qz", f"{parent_name}__qw"]
        if not all(c in df_in.columns for c in q_p_cols):
            continue
        q_p = enforce_quat_continuity(df_in[q_p_cols].values)
        q_p_ref = np.array([ref_pose[f"{parent_name}__qx"], ref_pose[f"{parent_name}__qy"], ref_pose[f"{parent_name}__qz"], ref_pose[f"{parent_name}__qw"]])
        rot_p = Rot.from_quat(q_p)
        rot_p_ref = Rot.from_quat(q_p_ref)
        rot_rel = rot_p.inv() * Rot.from_quat(q_c)
        rot_rel_ref = rot_p_ref.inv() * rot_c_ref
    else:
        rot_rel = Rot.from_quat(q_c)
        rot_rel_ref = rot_c_ref
    rot_final = rot_rel_ref.inv() * rot_rel
    q_final = rot_final.as_quat()
    q_final = enforce_quat_continuity(q_final)
    omega = quaternion_log_angular_velocity(q_final, FS, frame='local')
    omega_deg = np.degrees(omega)
    result[f"{joint_name}__vel_angular_zeroed_x"] = omega_deg[:, 0]
    result[f"{joint_name}__vel_angular_zeroed_y"] = omega_deg[:, 1]
    result[f"{joint_name}__vel_angular_zeroed_z"] = omega_deg[:, 2]
    # Angles (for validation): Euler XYZ degrees from zeroed rotation
    euler_zeroed = rot_final.as_euler('XYZ', degrees=True)
    result[f"{joint_name}__angle_zeroed_x"] = euler_zeroed[:, 0]
    result[f"{joint_name}__angle_zeroed_y"] = euler_zeroed[:, 1]
    result[f"{joint_name}__angle_zeroed_z"] = euler_zeroed[:, 2]

print("Track B angular: vel_angular_zeroed + angle_zeroed computed.")

Track B angular: vel_angular_zeroed + angle_zeroed computed.


In [5]:
# Build DataFrame and save
df_qa = pd.DataFrame(result)
out_path = Path(DERIV_KIN) / f"{RUN_ID}__kinematics_qa.parquet"
df_qa.to_parquet(out_path, index=False)
print(f"Saved: {out_path}")
print(f"Columns: {len(df_qa.columns)} (time_s + Track A linear/angular + Track B angular)")

Saved: c:\Users\drorh\OneDrive - Mobileye\Desktop\gaga\derivatives\step_06_kinematics\763_T2_P2_R2_Take_2025-12-25 10.51.23 AM_005__kinematics_qa.parquet
Columns: 286 (time_s + Track A linear/angular + Track B angular)


## Visualization: Raw vs Zeroed angular velocity (QA)

Compare `vel_angular_relative_raw` vs `vel_angular_zeroed` for a representative joint (e.g. RightForearm) to see the impact of T-pose calibration.

In [6]:
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# Representative joint: RightForeArm if present, else first available
rep_joint = None
for name in ['RightForeArm', 'LeftForeArm', 'RightArm', 'LeftArm']:
    if name in kinematics_map and f"{name}__vel_angular_relative_raw_x" in df_qa.columns:
        rep_joint = name
        break
if rep_joint is None:
    rep_joint = next((j for j in kinematics_map if f"{j}__vel_angular_relative_raw_x" in df_qa.columns), None)

if rep_joint is None:
    print("No joint with both raw and zeroed angular velocity found; skip plot.")
else:
    t = df_qa['time_s'].values
    raw_x = df_qa[f"{rep_joint}__vel_angular_relative_raw_x"].values
    raw_y = df_qa[f"{rep_joint}__vel_angular_relative_raw_y"].values
    raw_z = df_qa[f"{rep_joint}__vel_angular_relative_raw_z"].values
    raw_mag = np.sqrt(raw_x**2 + raw_y**2 + raw_z**2)
    zero_x = df_qa[f"{rep_joint}__vel_angular_zeroed_x"].values
    zero_y = df_qa[f"{rep_joint}__vel_angular_zeroed_y"].values
    zero_z = df_qa[f"{rep_joint}__vel_angular_zeroed_z"].values
    zero_mag = np.sqrt(zero_x**2 + zero_y**2 + zero_z**2)

    fig = make_subplots(rows=2, cols=1, subplot_titles=(f'{rep_joint}: magnitude (°/s)', 'X, Y, Z components (°/s)'), vertical_spacing=0.12)
    fig.add_trace(go.Scatter(x=t, y=raw_mag, name='vel_angular_relative_raw (mag)', line=dict(color='blue')), row=1, col=1)
    fig.add_trace(go.Scatter(x=t, y=zero_mag, name='vel_angular_zeroed (mag)', line=dict(color='orange')), row=1, col=1)
    fig.add_trace(go.Scatter(x=t, y=raw_x, name='raw X', line=dict(color='blue', dash='dot')), row=2, col=1)
    fig.add_trace(go.Scatter(x=t, y=raw_y, name='raw Y', line=dict(color='green', dash='dot')), row=2, col=1)
    fig.add_trace(go.Scatter(x=t, y=raw_z, name='raw Z', line=dict(color='red', dash='dot')), row=2, col=1)
    fig.add_trace(go.Scatter(x=t, y=zero_x, name='zeroed X', line=dict(color='blue')), row=2, col=1)
    fig.add_trace(go.Scatter(x=t, y=zero_y, name='zeroed Y', line=dict(color='green')), row=2, col=1)
    fig.add_trace(go.Scatter(x=t, y=zero_z, name='zeroed Z', line=dict(color='red')), row=2, col=1)
    fig.update_layout(height=500, title_text=f'QA: Raw vs Zeroed angular velocity — {rep_joint}')
    fig.update_xaxes(title_text='Time (s)', row=2, col=1)
    fig.update_yaxes(title_text='°/s', row=1, col=1)
    fig.update_yaxes(title_text='°/s', row=2, col=1)
    fig.show()

## Validation: Alignment across entire dataset

Hard proof that zeroed data aligns with raw relative data:
- **Velocity:** magnitude similarity (raw vs zeroed) → expect ~100%.
- **Stability (quaternion):** Geodesic distance between relative_raw and zeroed quaternions; its std across the recording should be near zero (constant T-pose → constant rotation offset).

In [7]:
# Velocity Magnitude Alignment: similarity (raw vs zeroed) per joint → mean similarity %
# Stability Proof (quaternion): Geodesic distance d(q_rel, q_zeroed) = 2*arccos(|<q1,q2>|) in °; std across frames → near zero
from scipy.stats import pearsonr

def geodesic_deg(q1, q2):
    """Geodesic distance in degrees: theta = 2*arccos(|<q1,q2>|)."""
    dot = np.abs((q1 * q2).sum(axis=1))
    dot = np.clip(dot, 0.0, 1.0)
    return np.degrees(2 * np.arccos(dot))

rows = []
for joint_name in kinematics_map:
    # Velocity similarity (from df_qa)
    raw_x = df_qa.get(f"{joint_name}__vel_angular_relative_raw_x")
    raw_y = df_qa.get(f"{joint_name}__vel_angular_relative_raw_y")
    raw_z = df_qa.get(f"{joint_name}__vel_angular_relative_raw_z")
    zero_x = df_qa.get(f"{joint_name}__vel_angular_zeroed_x")
    zero_y = df_qa.get(f"{joint_name}__vel_angular_zeroed_y")
    zero_z = df_qa.get(f"{joint_name}__vel_angular_zeroed_z")
    if raw_x is None or zero_x is None:
        continue
    mag_raw = np.sqrt(raw_x**2 + raw_y**2 + raw_z**2)
    mag_zeroed = np.sqrt(zero_x**2 + zero_y**2 + zero_z**2)
    r, _ = pearsonr(mag_raw, mag_zeroed)
    similarity_pct = 100.0 * r if np.isfinite(r) else 0.0

    # Quaternion-based stability: recompute q_rel and q_final, then geodesic std
    parent_name = kinematics_map[joint_name].get("parent")
    q_c_cols = [f"{joint_name}__qx", f"{joint_name}__qy", f"{joint_name}__qz", f"{joint_name}__qw"]
    geodesic_std_deg = np.nan
    if not all(c in df_in.columns for c in q_c_cols):
        pass
    else:
        q_c = enforce_quat_continuity(df_in[q_c_cols].values)
        have_q_rel = False
        if parent_name is not None:
            q_p_cols = [f"{parent_name}__qx", f"{parent_name}__qy", f"{parent_name}__qz", f"{parent_name}__qw"]
            if all(c in df_in.columns for c in q_p_cols):
                q_p = enforce_quat_continuity(df_in[q_p_cols].values)
                q_rel = (Rot.from_quat(q_p).inv() * Rot.from_quat(q_c)).as_quat()
                have_q_rel = True
        else:
            q_rel = q_c
            have_q_rel = True
        if have_q_rel:
            q_rel = enforce_quat_continuity(q_rel)
            q_c_ref = np.array([ref_pose[f"{joint_name}__qx"], ref_pose[f"{joint_name}__qy"], ref_pose[f"{joint_name}__qz"], ref_pose[f"{joint_name}__qw"]])
            rot_c_ref = Rot.from_quat(q_c_ref)
            if parent_name is not None:
                q_p_ref = np.array([ref_pose[f"{parent_name}__qx"], ref_pose[f"{parent_name}__qy"], ref_pose[f"{parent_name}__qz"], ref_pose[f"{parent_name}__qw"]])
                rot_rel_ref = Rot.from_quat(q_p_ref).inv() * rot_c_ref
            else:
                rot_rel_ref = rot_c_ref
            rot_final = rot_rel_ref.inv() * Rot.from_quat(q_rel)
            q_final = enforce_quat_continuity(rot_final.as_quat())
            theta_deg = geodesic_deg(q_rel, q_final)
            geodesic_std_deg = float(np.std(theta_deg))

    rows.append({
        "joint": joint_name,
        "velocity_similarity_pct": round(similarity_pct, 2),
        "geodesic_std_deg": round(geodesic_std_deg, 6),
    })

validation_table = pd.DataFrame(rows)
mean_similarity_pct = validation_table["velocity_similarity_pct"].mean()
print("Velocity Magnitude Alignment: mean similarity (raw vs zeroed) = {:.2f}% (expect close to 100%).".format(mean_similarity_pct))
max_geodesic_std = validation_table["geodesic_std_deg"].max()
print("Stability Proof (quaternion): max Std of geodesic distance over joints = {:.6f}° (expect near zero; constant T-pose → constant rotation).".format(max_geodesic_std))
print()
print("Validation Summary Table (quaternion-based proof for proceeding with zeroed data):")
display(validation_table)
if mean_similarity_pct >= 99.0 and np.isfinite(max_geodesic_std) and max_geodesic_std < 0.1:
    print("Conclusion: T-pose is stable; zeroed data aligns with raw. Proceed using zeroed data.")
else:
    print("Conclusion: Review T-pose or per-joint metrics above before relying solely on zeroed data.")

Velocity Magnitude Alignment: mean similarity (raw vs zeroed) = 100.00% (expect close to 100%).
Stability Proof (quaternion): max Std of geodesic distance over joints = 0.000000° (expect near zero; constant T-pose → constant rotation).

Validation Summary Table (quaternion-based proof for proceeding with zeroed data):


Unnamed: 0,joint,velocity_similarity_pct,geodesic_std_deg
0,Hips,100.0,0.0
1,Spine,100.0,0.0
2,Spine1,100.0,0.0
3,Neck,100.0,0.0
4,Head,100.0,0.0
5,LeftUpLeg,100.0,0.0
6,LeftLeg,100.0,0.0
7,LeftFoot,100.0,0.0
8,RightUpLeg,100.0,0.0
9,RightLeg,100.0,0.0


Conclusion: T-pose is stable; zeroed data aligns with raw. Proceed using zeroed data.
