# OST – Gait Data Preprocessing Notebook

**Author:** Bakhtiyor Sohibnazarov  
**Module:** Final Year Project  

This notebook:

1. Loads raw 3D joint data captured by the OST system.
2. Handles timestamps and derives a clean time base.
3. Runs data quality diagnostics:
   - Missing data per joint
   - Bone-length stability
   - Basic velocity sanity check
4. Interpolates short gaps and smooths joint trajectories using a One Euro filter.
5. Reduces the dataset to key biomechanical joints and exports:
   - A filtered full-joint CSV
   - A reduced preprocessed CSV for analysis notebook


In [6]:
# Install necessary libraries
!pip -q install pandas numpy

In [7]:
# ============================================================
# 1. IMPORTS & GLOBAL CONFIGURATION
# ============================================================

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

pd.set_option("display.precision", 4)
pd.set_option("display.width", 140)

# ------------------------------------------------------------
# File paths (adjust as needed)
# ------------------------------------------------------------
INPUT_CSV_RAW       = "data/28.11_raw.csv"
OUTPUT_CSV_FILTERED = "output/28.11_filtered.csv"
OUTPUT_CSV_REDUCED  = "output/28.11_preprocessed.csv"

# ------------------------------------------------------------
# Capture / timing
# ------------------------------------------------------------
TIMESTAMP_COLUMN    = "timestamp"   # set to None if no timestamp column

# ------------------------------------------------------------
# Interpolation settings
# ------------------------------------------------------------
MAX_INTERP_GAP      = 3   # max consecutive NaN frames to interpolate

# ------------------------------------------------------------
# One Euro filter parameters
# ------------------------------------------------------------
MIN_CUTOFF          = 0.5    # base cutoff
BETA                = 0.02   # speed sensitivity
DCUTOFF             = 1.0    # derivative cutoff

# ------------------------------------------------------------
# Joint indices (BlazePose-style)
# ------------------------------------------------------------
LEFT_HEEL_INDEX     = 29
RIGHT_HEEL_INDEX    = 30
LEFT_HIP_INDEX      = 23
RIGHT_HIP_INDEX     = 24

## 2. Utility functions

We define helpers for:
- One Euro filter (for smooth but responsive coordinates)
- Interpolation of short gaps
- Joint extraction
- Small reporting helpers


In [8]:
# ============================================================
# 2. UTILITY FUNCTIONS
# ============================================================

def lowpass(prev, curr, alpha):
    return alpha * curr + (1 - alpha) * prev

def compute_alpha(cutoff, dt):
    if cutoff <= 0:
        return 1.0
    tau = 1.0 / (2 * np.pi * cutoff)
    return 1.0 / (1.0 + tau / dt)

class OneEuroFilter:
    """
    Simple One Euro filter implementation for 1D signals.
    """
    def __init__(self, min_cutoff=1.0, beta=0.0, d_cutoff=1.0):
        self.min_cutoff = float(min_cutoff)
        self.beta = float(beta)
        self.d_cutoff = float(d_cutoff)
        self.x_prev = None
        self.dx_prev = 0.0
        self.t_prev = None

    def __call__(self, t, x):
        # First sample → initialise
        if self.t_prev is None:
            self.t_prev = t
            self.x_prev = x
            self.dx_prev = 0.0
            return x

        dt = t - self.t_prev
        if dt <= 0:
            # Non-positive dt → keep previous value
            return self.x_prev

        # Raw derivative
        dx = (x - self.x_prev) / dt

        # Filter derivative
        alpha_d = compute_alpha(self.d_cutoff, dt)
        dx_hat = lowpass(self.dx_prev, dx, alpha_d)

        # Dynamic cutoff based on derivative magnitude
        cutoff = self.min_cutoff + self.beta * abs(dx_hat)
        alpha = compute_alpha(cutoff, dt)

        # Filter signal
        x_hat = lowpass(self.x_prev, x, alpha)

        # Update state
        self.x_prev = x_hat
        self.dx_prev = dx_hat
        self.t_prev = t

        return x_hat

def interpolate_short_gaps(arr: np.ndarray, max_gap: int) -> np.ndarray:
    """
    Linearly interpolates NaN gaps up to 'max_gap' in length (in samples).
    Longer gaps are left as NaN.
    Works on 1D numpy arrays.
    """
    arr = arr.copy()
    n = len(arr)

    nan_idx = np.where(np.isnan(arr))[0]
    if len(nan_idx) == 0:
        return arr

    def fill_gap(start, end):
        gap_len = end - start + 1
        if gap_len > max_gap:
            return
        left = start - 1
        right = end + 1
        if 0 <= left < n and 0 <= right < n and not np.isnan(arr[left]) and not np.isnan(arr[right]):
            arr[start:end+1] = np.interp(
                np.arange(start, end+1),
                [left, right],
                [arr[left], arr[right]]
            )

    # Walk segments
    start = nan_idx[0]
    for i in range(1, len(nan_idx)):
        if nan_idx[i] != nan_idx[i-1] + 1:
            end = nan_idx[i-1]
            fill_gap(start, end)
            start = nan_idx[i]
    # last segment
    end = nan_idx[-1]
    fill_gap(start, end)

    return arr

def get_joint_array(df: pd.DataFrame, j_idx: int) -> np.ndarray:
    cols = [f"joint_{j_idx}_x", f"joint_{j_idx}_y", f"joint_{j_idx}_z"]
    return df[cols].to_numpy()


## 3. Load raw data and derive time base

We:
- Load the raw CSV
- Derive `time` using timestamp if present, otherwise from frame index
- Estimate effective FPS


In [9]:
# ============================================================
# 3. LOAD RAW DATA & TIME BASE
# ============================================================
df_raw = pd.read_csv(INPUT_CSV_RAW)
print(f"Loaded: {INPUT_CSV_RAW}")
print(f"Shape: {df_raw.shape[0]} frames x {df_raw.shape[1]} columns")

# Detect joint columns and IDs
joint_cols = sorted(
    [c for c in df_raw.columns if c.startswith("joint_")],
    key=lambda x: (int(x.split("_")[1]), x.split("_")[2])
)
joint_ids = sorted({int(c.split("_")[1]) for c in joint_cols})

# --- Time handling ---
if TIMESTAMP_COLUMN is not None and TIMESTAMP_COLUMN in df_raw.columns:
    print("Python datetime timestamps detected → parsing")

    t_raw = pd.to_datetime(df_raw[TIMESTAMP_COLUMN], errors="coerce")

    if t_raw.isna().any():
        print("Warning: malformed timestamps found → repairing via ffill/bfill")
        t_raw = t_raw.ffill().bfill()

    t_sec = (t_raw - t_raw.iloc[0]).dt.total_seconds().to_numpy()

    dt = np.diff(t_sec, prepend=t_sec[0])
    bad = dt <= 0
    if np.any(bad):
        print(f"Warning: {bad.sum()} non-increasing timestamps → repairing (assuming 30 FPS)")
        nominal_dt = 1.0 / 30.0
        for i in range(1, len(t_sec)):
            if t_sec[i] <= t_sec[i-1]:
                t_sec[i] = t_sec[i-1] + nominal_dt

    time = t_sec
else:
    print("No timestamp column → using synthetic time @ 30 FPS")
    fps_guess = 30.0
    time = np.arange(len(df_raw)) / fps_guess

df_raw["time"] = time

dt = np.diff(time)
median_dt = np.nanmedian(dt)
fps_effective = 1.0 / median_dt

print(f"Median Δt: {median_dt:.6f} s")
print(f"Effective FPS: {fps_effective:.3f} Hz")


Loaded: data/28.11_raw.csv
Shape: 25410 frames x 100 columns
Python datetime timestamps detected → parsing
Median Δt: 0.066509 s
Effective FPS: 15.036 Hz


## 5. Preprocessing: interpolate short gaps & apply One Euro filter

We clean the data **in place**:
- Interpolate short NaN gaps (up to `MAX_INTERP_GAP` frames) for each joint coordinate.
- Apply One Euro filter to each coordinate time-series.
- Construct a `valid` mask (frames without remaining NaNs).
- Save a filtered CSV.


In [13]:
# ============================================================
# FAST PREPROCESSING BLOCK (NO FRAGMENTATION)
# ============================================================
df = df_raw.copy()
t = df["time"].to_numpy()

# Store filtered columns in a dict first (fast)
filtered_cols = {}

for j in joint_ids:
    # Create filters for each axis
    f_x = OneEuroFilter(min_cutoff=MIN_CUTOFF, beta=BETA, d_cutoff=DCUTOFF)
    f_y = OneEuroFilter(min_cutoff=MIN_CUTOFF, beta=BETA, d_cutoff=DCUTOFF)
    f_z = OneEuroFilter(min_cutoff=MIN_CUTOFF, beta=BETA, d_cutoff=DCUTOFF)

    for axis, f in zip(["x", "y", "z"], [f_x, f_y, f_z]):
        col = f"joint_{j}_{axis}"
        if col not in df:
            continue

        arr = df[col].astype(float).to_numpy()

        # Interpolate short gaps
        arr_interp = interpolate_short_gaps(arr, MAX_INTERP_GAP)

        # Filter all frames
        arr_filt = np.empty_like(arr_interp)
        for idx, (ti, xi) in enumerate(zip(t, arr_interp)):
            arr_filt[idx] = np.nan if np.isnan(xi) else f(ti, xi)

        filtered_cols[col] = arr_filt

# Build the final filtered DataFrame in one shot → no fragmentation
filtered = pd.DataFrame(filtered_cols)
filtered.insert(0, "time", t)

if TIMESTAMP_COLUMN in df:
    filtered.insert(1, TIMESTAMP_COLUMN, df[TIMESTAMP_COLUMN])

# Valid frame mask
joint_cols_filt = [c for c in filtered.columns if c.startswith("joint_")]
valid_mask = ~filtered[joint_cols_filt].isna().any(axis=1)
filtered["valid"] = valid_mask

print(f"Frames valid after preprocessing: {valid_mask.mean()*100:.2f}%")

filtered.to_csv(OUTPUT_CSV_FILTERED, index=False)
print(f"Saved filtered data → {OUTPUT_CSV_FILTERED}")


Frames valid after preprocessing: 99.95%
Saved filtered data → output/28.11_filtered.csv


## 6. Reduce to key joints and export preprocessed dataset

To keep files compact and analysis-focused, we save only:
- Time, optional timestamp
- Selected biomechanically relevant joints
- Validity mask


In [12]:
# ============================================================
# 6. JOINT REDUCTION & EXPORT
# ============================================================

KEEP = [
    0,      # Head
    11,12,  # Shoulders
    13,14,  # Elbows
    15,16,  # Wrists
    23,24,  # Hips
    25,26,  # Knees
    27,28,  # Ankles
    29,30,  # Heels
]

keep_cols = []
for j in KEEP:
    for axis in ["x", "y", "z"]:
        col = f"joint_{j}_{axis}"
        if col in filtered.columns:
            keep_cols.append(col)
        else:
            print(f"Warning: missing joint column → {col}")

base_cols = ["time"]
if TIMESTAMP_COLUMN is not None and TIMESTAMP_COLUMN in filtered.columns:
    base_cols.append(TIMESTAMP_COLUMN)

final_reduced = filtered[base_cols + keep_cols + ["valid"]].copy()
final_reduced.to_csv(OUTPUT_CSV_REDUCED, index=False)

print(f"Saved reduced dataset → {OUTPUT_CSV_REDUCED}")
print(f"Shape: {final_reduced.shape}")
print(f"Valid frames in reduced set: {final_reduced['valid'].mean()*100:.2f}%")


Saved reduced dataset → output/28.11_preprocessed.csv
Shape: (25410, 48)
Valid frames in reduced set: 99.95%
