In [33]:
if False:

    from pathlib import Path
    import os
    import json

    base_dir = Path(os.getcwd())
    test_files = sorted([f for f in base_dir.rglob("*smooth_*.tsv")])
    body_parts = {"Arm": [name+pos for name in ["Wrist","Forearm","Triceps"] for pos in ["","In","Out"]]}
    body_parts["RArm"] = ["R"+name for name in body_parts["Arm"]]
    body_parts["LArm"] = ["L"+name for name in body_parts["Arm"]]
    body_parts["BothArms"] = body_parts["RArm"] + body_parts["LArm"]
    body_parts.pop("Arm")

    from utils import read_custom_tsv

    # Prepare annotation mapping
    annotations = {}
    for i, file_test in enumerate(test_files):

        # Determine smoothness
        smoothness = "NOsmooth" not in file_test.name
        # Determine markers
        markers = None
        metadata, df = read_custom_tsv(file_test)
        marker_names = metadata["marker_names"]    
        allowed_keys = [bp for bp in body_parts.keys() if bp + ".tsv" in file_test.name]
        target_markers = [marker for marker in marker_names if any(any(mk == marker for mk in body_parts[bp]) for bp in allowed_keys)]
        # New filename
        new_name = f"sample_{i}.tsv"
        
        print(f"Renaming {file_test.name} to {new_name}")
        # Rename file
        movie_file = file_test.parent.joinpath(file_test.name.split('.')[0] + ".mp4")
        new_movie_name = f"sample_{i}.mp4"
        movie_file.rename(movie_file.with_name(new_movie_name))
        file_test.rename(file_test.with_name(new_name))
        
        # Add to annotations
        annotations[new_name.split('.')[0]] = {"markers": target_markers, "smoothness": smoothness, "original_mocap": str(file_test.name.split('_')[0])}

    # Save annotations
    with open(test_files[-1].parent / "annotations.json", "w") as f:
        json.dump(annotations, f, indent=4)

In [34]:
from pathlib import Path
import os
import json

base_dir = Path(os.getcwd())
test_files = sorted([f for f in base_dir.rglob("*sync_*.tsv")])
body_parts = {"Arm": [name+pos for name in ["Wrist","Forearm","Triceps"] for pos in ["","In","Out"]]}
body_parts["RArm"] = ["R"+name for name in body_parts["Arm"]]
body_parts["LArm"] = ["L"+name for name in body_parts["Arm"]]
body_parts["BothArms"] = body_parts["RArm"] + body_parts["LArm"]
body_parts.pop("Arm")

from utils import read_custom_tsv

# Prepare annotation mapping
annotations = {}
for i, file_test in enumerate(test_files):

    # Determine synchronization
    synch = "sync" in file_test.name
    # Determine markers
    markers = None
    metadata, df = read_custom_tsv(file_test)
    marker_names = metadata["marker_names"]    
    allowed_keys = [bp for bp in body_parts.keys() if bp + ".tsv" in file_test.name]
    target_markers = [marker for marker in marker_names if any(any(mk == marker for mk in body_parts[bp]) for bp in allowed_keys)]
    # New filename
    new_name = f"sample_{i}.tsv"
    
    print(f"Renaming {file_test.name} to {new_name}")
    # Rename file
    movie_file = file_test.parent.joinpath(file_test.name.split('.')[0] + ".mp4")
    new_movie_name = f"sample_{i}.mp4"
    movie_file.rename(movie_file.with_name(new_movie_name))
    file_test.rename(file_test.with_name(new_name))
    
    # Add to annotations
    annotations[new_name.split('.')[0]] = {"markers": target_markers, "synchronization": synch, "original_mocap": str(file_test.name.split('_')[0])}

# Save annotations
with open(test_files[-1].parent / "annotations.json", "w") as f:
    json.dump(annotations, f, indent=4)

Renaming Ele0003_sample1_sync_BothArms.tsv to sample_0.tsv
Renaming Ele0003_sample2_sync_BothArms.tsv to sample_1.tsv


In [None]:
import os
from pathlib import Path
import sys
sys.path.append(str(Path("../../pyeyesweb/").resolve()))
from pyeyesweb.data_models.sliding_window import SlidingWindow
from pyeyesweb.low_level.smoothness import Smoothness
from pyeyesweb.analysis_primitives.synchronization import Synchronization
import numpy as np
from pyeyesweb.utils.signal_processing import apply_savgol_filter
from utils import read_custom_tsv
import json
import sys



class FeatureBenchmarker:
    def __init__(self, sliding_window_lengths=[50, 100, 200]):
        self.sliding_window_lengths = sliding_window_lengths
        self.features_implementations = {"smoothness": Smoothness, 
                                         "synchronization": Synchronization}


class SmoothnessBenchmarker(FeatureBenchmarker):
    def __init__(self, sliding_window_lengths=[50, 100, 200]):
        super().__init__(sliding_window_lengths=sliding_window_lengths)

    def benchmark(self):
        FeatureClass = self.features_implementations["smoothness"]

        base_dir = Path(os.getcwd()) / "data" / "low_level" / "smoothness"
        with open(base_dir / "annotations.json", "r") as f:
            annotations = json.load(f)

        for sample_file_name in annotations.keys():
            test_file = base_dir / (sample_file_name + ".tsv")
            if not test_file.exists():
                continue
            out_file = base_dir / (sample_file_name + "_results.json")
            metadata, df = read_custom_tsv(test_file)
            rate_hz = metadata["frequency"]
            featureInstance = FeatureClass(rate_hz=rate_hz)
            target_markers = annotations[sample_file_name]["markers"]
            if any(marker not in metadata["marker_names"] for marker in target_markers):
                raise ValueError(f"Markers {target_markers} not found in file {test_file}. Available markers: {metadata['marker_names']}")
            out = {"features": [{"name": "smoothness",
                                "data": [],
                                "ground truth": annotations[sample_file_name]["smoothness"]
                                }]}
            for length in self.sliding_window_lengths:
                print(f"Benchmarking {sample_file_name} with sliding window length {length} ...")
                sliding_len_results = {"sliding_window_max_length": length,
                                        "sparc": {},
                                        "jerk_rms": {}}

                for marker in target_markers:
                    sliding_window_single = SlidingWindow(max_length=length, n_columns=1)
                    cols = [f"{marker} X", f"{marker} Y", f"{marker} Z"]
                    coords = apply_savgol_filter(df[cols].values, rate_hz=rate_hz)
                    vel = ((coords[1:] - coords[:-1]) ** 2).sum(axis=1) ** 0.5
                    vel = apply_savgol_filter(np.insert(vel, 0, 0), rate_hz=rate_hz)
                    if "sparc" not in sliding_len_results:
                        sliding_len_results["sparc"] = {}
                    if "jerk_rms" not in sliding_len_results:
                        sliding_len_results["jerk_rms"] = {}
                    sliding_len_results["sparc"][marker] = []
                    sliding_len_results["jerk_rms"][marker] = []
                    for v in vel:
                        sliding_window_single.append([v])
                        sm = featureInstance(sliding_window_single)
                        sparc, jerk = sm["sparc"], sm["jerk_rms"]
                        sliding_len_results["sparc"][marker].append(sparc)
                        sliding_len_results["jerk_rms"][marker].append(jerk)
                    sliding_len_results["sparc"][marker] = np.array(sliding_len_results["sparc"][marker]).tolist()
                    sliding_len_results["jerk_rms"][marker] = np.array(sliding_len_results["jerk_rms"][marker]).tolist()
                out["features"][0]["data"].append(sliding_len_results)
            with open(out_file, "w") as f:
                json.dump(out, f, indent=4)
        
class SynchronizationBenchmarker(FeatureBenchmarker):
    def __init__(self, sliding_window_lengths=[50, 100, 200]):
        super().__init__(sliding_window_lengths=sliding_window_lengths)

    def benchmark(self):
        FeatureClass = self.features_implementations["synchronization"]
        # Implementation would be similar to SmoothnessBenchmarker
        pass
        
SmoothnessBenchmarker(sliding_window_lengths=[50, 100, 200]).benchmark()

Benchmarking sample_0 with sliding window length 50 ...
Benchmarking sample_0 with sliding window length 100 ...
Benchmarking sample_0 with sliding window length 200 ...
Benchmarking sample_1 with sliding window length 50 ...
Benchmarking sample_1 with sliding window length 100 ...
Benchmarking sample_1 with sliding window length 200 ...
Benchmarking sample_2 with sliding window length 50 ...
Benchmarking sample_2 with sliding window length 100 ...
Benchmarking sample_2 with sliding window length 200 ...
Benchmarking sample_3 with sliding window length 50 ...
Benchmarking sample_3 with sliding window length 100 ...
Benchmarking sample_3 with sliding window length 200 ...
Benchmarking sample_4 with sliding window length 50 ...
Benchmarking sample_4 with sliding window length 100 ...
Benchmarking sample_4 with sliding window length 200 ...
Benchmarking sample_5 with sliding window length 50 ...
Benchmarking sample_5 with sliding window length 100 ...
Benchmarking sample_5 with sliding wi