## Code to receive the .C3D file from QTM, and perform downsample from 100Hz to 40Hz and proper formating for better OpenSim interpretation 

In [34]:
import os, numpy as np, pandas as pd, c3d
from scipy.interpolate import interp1d

# ====================== USER CONFIG ======================
input_dir      = "/Users/afonsoamaral/Desktop/validation/Inputs3"
output_dir     = "/Users/afonsoamaral/Desktop/validation/Outputs2"
target_rate_hz = 40.0

# Short-gap fill for CRITICAL markers only (after resampling)
critical_markers  = ["SpineTop", "C7", "CLAV"]  # edit as you like
max_gap_frames    = 10                          # at 40 Hz ≈ 0.25 s

# Safety thresholds to decide if we must write a clipped file too
min_visible_markers       = 10
require_critical_present  = True
# =========================================================

os.makedirs(output_dir, exist_ok=True)

def load_c3d_points(c3d_path):
    """Return (points_xyz, labels, rate_hz), where points_xyz=(3, nMarkers, nFrames), meters."""
    with open(c3d_path, 'rb') as fh:
        reader = c3d.Reader(fh)
        rate   = float(reader.header.frame_rate)
        labels = [s.strip() for s in reader.point_labels]
        frames = []
        for _, pts, _ in reader.read_frames():  # pts: (nMarkers, 4) -> X,Y,Z,residual
            frames.append(pts[:, :3])
    data = np.stack(frames, axis=0)            # (nFrames, nMarkers, 3)
    data = np.transpose(data, (2,1,0))         # (3, nMarkers, nFrames)
    return data.astype(float), labels, rate

def resample_with_gaps(pts, rate_in, rate_out):
    """
    Resample to rate_out with linear interpolation, preserving gaps:
    original (0,0,0) frames are treated as missing (NaN) and not extrapolated.
    pts: (3, nMarkers, nFrames)
    """
    n_frames = pts.shape[2]
    t_in  = np.arange(n_frames) / rate_in
    t_out = np.arange(0, t_in[-1] + 1e-12, 1.0 / rate_out)

    # A sample is valid only if NOT all three axes are exactly zero
    valid = ~(np.isclose(pts[0], 0.0) & np.isclose(pts[1], 0.0) & np.isclose(pts[2], 0.0))  # (nMarkers, nFrames)

    pts_out = np.full((3, pts.shape[1], len(t_out)), np.nan, dtype=float)
    for m in range(pts.shape[1]):
        mask = valid[m]                 # (nFrames,)
        if mask.sum() < 2:
            continue  # not enough to interpolate
        t_valid = t_in[mask]
        for ax in range(3):
            y_valid = pts[ax, m, mask]
            f = interp1d(t_valid, y_valid, kind="linear", bounds_error=False, fill_value=np.nan)
            pts_out[ax, m, :] = f(t_out)
    return pts_out, t_out

def fill_short_internal_gaps_1d(y, max_len):
    """
    Fill ONLY internal NaN runs (bounded by finite ends) of length <= max_len with linear values.
    Leading/trailing NaNs are kept as NaN.
    """
    y = np.asarray(y, float).copy()
    isn = np.isnan(y)
    if not isn.any():
        return y
    n = len(y)
    i = 0
    while i < n:
        if not isn[i]:
            i += 1
            continue
        j = i
        while j < n and isn[j]:
            j += 1
        run_len = j - i
        left = i - 1
        right = j
        if run_len <= max_len and left >= 0 and right < n and np.isfinite(y[left]) and np.isfinite(y[right]):
            y[i:j] = np.linspace(y[left], y[right], run_len + 2)[1:-1]
        i = j
    return y

def reorder_markers_for_trailing_nonblank(pts, labels):
    """
    Reorder markers so the LAST marker is likely non-blank on frame 0
    (prevents OpenSim dropping the final empty column).
    """
    nM = pts.shape[1]
    # present at first frame?
    present0 = ~(np.isnan(pts[0,:,0]) & np.isnan(pts[1,:,0]) & np.isnan(pts[2,:,0]))
    # fewest NaNs overall
    nan_counts = np.sum(np.isnan(pts[0]) & np.isnan(pts[1]) & np.isnan(pts[2]), axis=1)
    # prefer present at frame 0, then fewer NaNs
    order = list(np.lexsort((nan_counts, ~present0)))
    # move best to the end
    best = order[0]
    final_order = [i for i in range(nM) if i != best] + [best]
    pts2 = pts[:, final_order, :]
    labels2 = [labels[i] for i in final_order]
    return pts2, labels2

def write_trc(out_path, labels, t, pts, data_rate_hz):
    """
    Manual TRC writer with strict column count and non-empty last field.
    pts: (3, nMarkers, nFrames), t: (nFrames,)
    """
    # Reorder once to avoid trailing blank
    pts, labels = reorder_markers_for_trailing_nonblank(pts, labels)

    nM = len(labels)
    nF = pts.shape[2]
    n_cols_expected = 2 + 3 * nM

    with open(out_path, "w") as f:
        # Line 1
        f.write(f"PathFileType\t4\t(X/Y/Z)\t{os.path.basename(out_path)}\n")
        # Line 2
        f.write("DataRate\tCameraRate\tNumFrames\tNumMarkers\tUnits\tOrigDataRate\tOrigDataStartFrame\tOrigNumFrames\n")
        f.write(f"{data_rate_hz:.2f}\t{data_rate_hz:.2f}\t{nF}\t{nM}\tm\t{data_rate_hz:.2f}\t1\t{nF}\n")
        # Line 3: marker names (each consumes 3 columns)
        f.write("Frame#\tTime\t")
        for lab in labels: f.write(f"{lab}\t\t\t")
        f.write("\n")
        # Line 4: X1 Y1 Z1 X2 Y2 Z2 ...
        f.write("\t\t")
        for k in range(1, nM+1): f.write(f"X{k}\tY{k}\tZ{k}\t")
        f.write("\n")

        # Data lines
        for i in range(nF):
            row = [str(i+1), f"{t[i]:.5f}"]
            for j in range(nM):
                x, y, z = pts[:, j, i]
                row += [("" if np.isnan(x) else f"{x:.5f}"),
                        ("" if np.isnan(y) else f"{y:.5f}"),
                        ("" if np.isnan(z) else f"{z:.5f}")]
            # enforce width & non-empty last
            if len(row) < n_cols_expected:
                row += [""] * (n_cols_expected - len(row))
            elif len(row) > n_cols_expected:
                row = row[:n_cols_expected]
            if row[-1] == "": row[-1] = " "
            f.write("\t".join(row) + "\n")

def validate_and_clip(labels, t, pts, marker_names,
                      min_visible=10, require_crit=True, crit_list=None):
    """
    Returns (all_good, last_good_idx).
    all_good True if every frame meets visibility; otherwise last_good_idx is last passing frame.
    """
    nF = pts.shape[2]
    # Count visible markers (any axis finite) per frame
    vis_counts = np.zeros(nF, dtype=int)
    for j in range(len(labels)):
        V = pts[:, j, :]
        vis = np.isfinite(V).any(axis=0)
        vis_counts += vis.astype(int)

    ok = vis_counts >= min_visible

    if require_crit and crit_list:
        for name in crit_list:
            if name in marker_names:
                j = labels.index(name)
                V = pts[:, j, :]
                ok &= np.isfinite(V).any(axis=0)

    if ok.all():
        return True, nF - 1
    else:
        last_good = np.where(ok)[0][-1]
        return False, int(last_good)

def convert_one(c3d_path, out_trc_path):
    pts, labels, rate = load_c3d_points(c3d_path)
    print(f"\n{os.path.basename(c3d_path)} @ {rate:.1f} Hz | markers={len(labels)}, frames={pts.shape[2]}")

    # 1) resample w/ gaps preserved
    pts_ds, t_out = resample_with_gaps(pts, rate_in=rate, rate_out=target_rate_hz)

    # 2) targeted short-gap fill for critical markers (each axis independently)
    crit_present = [m for m in critical_markers if m in labels]
    for m in crit_present:
        j = labels.index(m)
        for ax in range(3):
            pts_ds[ax, j, :] = fill_short_internal_gaps_1d(pts_ds[ax, j, :], max_gap_frames)

    # 3) write full-length TRC
    write_trc(out_trc_path, labels, t_out, pts_ds, target_rate_hz)
    print(f"  ✅ Saved TRC: {out_trc_path}")

    # 4) validate coverage; if unsafe, also write a clipped TRC
    all_good, last_idx = validate_and_clip(labels, t_out, pts_ds, labels,
                                           min_visible=min_visible_markers,
                                           require_crit=require_critical_present,
                                           crit_list=crit_present)
    if not all_good:
        t_end = t_out[last_idx]
        clipped_path = out_trc_path.replace(".trc", "_clipped.trc")
        write_trc(clipped_path, labels, t_out[:last_idx+1], pts_ds[:, :, :last_idx+1], target_rate_hz)
        print(f"  ⚠️ Coverage too low near the end. Wrote clipped TRC: {clipped_path}")
        print(f"     Suggested IK end time: {t_end:.5f} s "
              f"(≥{min_visible_markers} markers visible"
              + (", all critical present" if (require_critical_present and crit_present) else "") + ").")

# -------- batch over folder ----------
for fname in sorted(os.listdir(input_dir)):
    if fname.lower().endswith(".c3d"):
        in_path  = os.path.join(input_dir, fname)
        out_name = fname[:-4] + f"_{int(target_rate_hz)}Hz.trc"
        out_path = os.path.join(output_dir, out_name)
        convert_one(in_path, out_path)

print("\nAll done.")


S3_M10.c3d @ 100.0 Hz | markers=42, frames=2667
  ✅ Saved TRC: /Users/afonsoamaral/Desktop/validation/Outputs2/S3_M10_40Hz.trc
  ⚠️ Coverage too low near the end. Wrote clipped TRC: /Users/afonsoamaral/Desktop/validation/Outputs2/S3_M10_40Hz_clipped.trc
     Suggested IK end time: 26.65000 s (≥10 markers visible, all critical present).

S3_M11.c3d @ 100.0 Hz | markers=42, frames=2164
  ✅ Saved TRC: /Users/afonsoamaral/Desktop/validation/Outputs2/S3_M11_40Hz.trc
  ⚠️ Coverage too low near the end. Wrote clipped TRC: /Users/afonsoamaral/Desktop/validation/Outputs2/S3_M11_40Hz_clipped.trc
     Suggested IK end time: 15.87500 s (≥10 markers visible, all critical present).

S3_M3.c3d @ 100.0 Hz | markers=39, frames=4377
  ✅ Saved TRC: /Users/afonsoamaral/Desktop/validation/Outputs2/S3_M3_40Hz.trc

S3_M4.c3d @ 100.0 Hz | markers=41, frames=3639
  ✅ Saved TRC: /Users/afonsoamaral/Desktop/validation/Outputs2/S3_M4_40Hz.trc

S3_M5.c3d @ 100.0 Hz | markers=37, frames=2896
  ✅ Saved TRC: /Users/

In [35]:
import os, numpy as np, pandas as pd

# ---- CONFIG: point to the specific file you want to salvage ----
trc_in  = "/Users/afonsoamaral/Desktop/validation/Outputs2/S5_M9_40Hz.trc"
trc_out = "/Users/afonsoamaral/Desktop/validation/Outputs2/S5_M9_40Hz_fixed_alltorso.trc"

# Aggressively stabilized torso set (edit if your labels differ)
torso_markers = ["SpineTop", "C7", "CLAV", "STRN", "T10"]

def read_trc(trc_path):
    with open(trc_path, "r", encoding="latin1") as f:
        header = [next(f) for _ in range(5)]
        # try tabs first, fallback to whitespace
        df = pd.read_csv(f, sep="\t", header=None, engine="python")
    # build column names from header (lines 4–5)
    name_line = header[3].rstrip("\n").split("\t")
    if not name_line or not name_line[0].startswith("Frame"):
        name_line = header[3].split()
    names = [t.strip() for t in name_line[2:] if t.strip()]
    cols = ["Frame#", "Time"]
    for n in names:
        cols += [f"{n}_X", f"{n}_Y", f"{n}_Z"]
    if df.shape[1] != len(cols):
        with open(trc_path, "r", encoding="latin1") as f:
            for _ in range(5): next(f)
            df = pd.read_csv(f, delim_whitespace=True, header=None, engine="python")
    df.columns = cols
    # numeric (support comma decimals), blanks -> NaN
    df = df.apply(lambda s: pd.to_numeric(s.astype(str).str.replace(",", ".", regex=False)
                                          .replace({"": np.nan}), errors="coerce"))
    return header, names, df

def write_trc(trc_path, header, df, data_rate_hz=None):
    vals = header[2].strip().split()
    if len(vals) >= 3:
        if data_rate_hz is not None:
            vals[0] = f"{float(data_rate_hz):.2f}"
            vals[1] = f"{float(data_rate_hz):.2f}"
        vals[2] = str(len(df))
        header[2] = "\t".join(vals) + "\n"
    cols = list(df.columns)
    n_cols = len(cols)
    with open(trc_path, "w") as f:
        f.writelines(header)
        for i in range(len(df)):
            row = []
            for c in cols:
                v = df.iloc[i][c]
                if pd.isna(v):
                    row.append("")
                else:
                    if c == "Frame#":
                        row.append(str(int(v)))
                    else:
                        row.append(f"{float(v):.5f}")
            if len(row) < n_cols: row += [""] * (n_cols - len(row))
            elif len(row) > n_cols: row = row[:n_cols]
            if row[-1] == "": row[-1] = " "  # avoid trailing-empty parse drop
            f.write("\t".join(row) + "\n")

def interpolate_internal(y):
    """Fill all NaN between first and last finite with linear interpolation."""
    y = np.asarray(y, float).copy()
    idx = np.arange(len(y))
    m = np.isfinite(y)
    if m.sum() < 2:
        return y
    i0, i1 = idx[m][0], idx[m][-1]
    mid = np.arange(i0, i1+1)
    ym = y[mid]
    mm = np.isfinite(ym)
    ym[~mm] = np.interp(mid[~mm], mid[mm], ym[mm])
    y[i0:i1+1] = ym
    return y

def extend_ends_hold(y):
    """Forward/Backward fill to the ends (no limit), if finite seed exists."""
    y = np.asarray(y, float).copy()
    n = len(y)
    # forward fill from first finite to start
    i = 0
    while i < n and np.isnan(y[i]): i += 1
    if i < n and np.isfinite(y[i]):
        y[:i] = y[i]
    # back fill from last finite to end
    j = n-1
    while j >= 0 and np.isnan(y[j]): j -= 1
    if j >= 0 and np.isfinite(y[j]):
        y[j+1:] = y[j]
    return y

# ---- run once on this file ----
header, names, df = read_trc(trc_in)
df["Frame#"] = np.arange(1, len(df)+1, dtype=int)  # ensure sane frame numbers

present = [m for m in torso_markers if m in names]
if not present:
    print("⚠️ None of the specified torso markers found in this TRC; writing unchanged copy.")
else:
    for m in present:
        for ax in ("X","Y","Z"):
            c = f"{m}_{ax}"
            v = df[c].to_numpy()
            v = interpolate_internal(v)  # fill between first & last finite
            v = extend_ends_hold(v)      # extend to start & end
            df[c] = v

# Keep DataRate from header (line 3) if it’s already correct; otherwise pass 40.0
write_trc(trc_out, header[:], df, data_rate_hz=None)
print("✅ Wrote aggressively stabilized TRC:", trc_out)


✅ Wrote aggressively stabilized TRC: /Users/afonsoamaral/Desktop/validation/Outputs2/S5_M9_40Hz_fixed_alltorso.trc
