In [1]:
# -*- coding: utf-8 -*-
"""
gait_cycle_frames.py
DeepLabCut(3-level header) の CSV から、犬の歩行周期を「つま先が初期位置に戻る」瞬間で
検出し、周期長をフレーム数で推定します（likelihood は使用しません）。
- 既定は tail_set を原点にした相対座標で距離 d(t)=||paw(t) - paw(0)|| を作り、局所極小を
  閾値かつ不応期（最小間隔）でフィルタして周期境界とみなす
- フォールバックとして自己相関法でも推定
"""

import os, re, argparse
import numpy as np
import pandas as pd

# =============== ユーティリティ ===============
def _norm_name(s: str) -> str:
    """小文字化 + 空白/アンダースコア/ハイフン除去で正規化"""
    return "".join(ch for ch in s.lower() if ch not in " _-")

def _resolve_first_present(all_bodyparts, candidates):
    """
    DLC の bodyparts（実名リスト）から、candidates のうち最初に見つかったもの（実名）を返す。
    無ければ ValueError
    """
    norm2orig = {}
    for bp in all_bodyparts:
        k = _norm_name(bp)
        if k not in norm2orig:
            norm2orig[k] = bp
    for cand in candidates:
        k = _norm_name(cand)
        if k in norm2orig:
            return norm2orig[k]
    raise ValueError(f"指定キーポイント候補が見つかりません: {candidates}\n利用可能: {sorted(set(all_bodyparts))}")

def _read_xy(csv_path: str, bodypart: str) -> np.ndarray:
    """
    DLC CSV(3段ヘッダ)から bodypart の (x,y) を (T,2) で返す（likelihood 未使用）。
    欠損はそのまま（本スクリプトでは欠損行は後で安全に除去）。
    """
    df = pd.read_csv(csv_path, header=[0,1,2], index_col=0)
    arr_x = df.xs((bodypart, "x"), axis=1, level=[1,2]).values.reshape(-1, 1)
    arr_y = df.xs((bodypart, "y"), axis=1, level=[1,2]).values.reshape(-1, 1)
    arr = np.concatenate([arr_x, arr_y], axis=1).astype(np.float32)
    return arr

def _moving_average(x: np.ndarray, k: int) -> np.ndarray:
    """単純移動平均（端はパディング反映のため 'same' 仕様に近い）"""
    if k <= 1:
        return x
    k = int(k)
    pad = k // 2
    xpad = np.pad(x, (pad, pad), mode='edge')
    kernel = np.ones(k, dtype=np.float32) / k
    return np.convolve(xpad, kernel, mode='valid')

def _local_minima_with_threshold(d: np.ndarray, min_separation: int, thr: float):
    """
    離散信号 d から、局所極小かつ d[i] < thr のインデックスを返す。
    さらに最小間隔 min_separation フレームの不応期を適用。
    """
    n = len(d)
    if n < 3:
        return np.array([], dtype=int)
    # 素朴な極小条件
    cand = [i for i in range(1, n-1) if (d[i] <= d[i-1] and d[i] <= d[i+1] and d[i] < thr)]
    if not cand:
        return np.array([], dtype=int)
    # 不応期フィルタ
    picks = []
    last = -10**9
    for i in cand:
        if i - last >= min_separation:
            picks.append(i)
            last = i
        elif d[i] < d[last]:  # より深い方を残す（optional）
            picks[-1] = i
            last = i
    return np.array(picks, dtype=int)

def _autocorr_period(x: np.ndarray, min_lag: int, max_lag: int) -> int:
    """
    自己相関から周期候補（ラグ）を返す。x は1次元信号。min_lag..max_lag の範囲で最大ピーク。
    見つからなければ 0。
    """
    x = np.asarray(x, dtype=np.float32)
    x = x - x.mean()
    if len(x) < max(10, max_lag+1):
        return 0
    ac = np.correlate(x, x, mode='full')
    ac = ac[len(x)-1:]  # 非負ラグ側
    # 正規化（不要なら省略可）
    if ac[0] > 0:
        ac = ac / (ac[0] + 1e-8)
    lo = max(1, min_lag)
    hi = min(len(ac)-1, max_lag)
    if hi <= lo:
        return 0
    lag = lo + np.argmax(ac[lo:hi+1])
    return int(lag)

# =============== 周期推定 ===============
def estimate_cycle_frames_from_csv(
    csv_path: str,
    paw_candidates = ("right_paw","right back paw","right_front_paw","right front paw",
                      "left_paw","left back paw","left_front_paw","left front paw"),
    ref_candidates = ("tail_set","tail root","tail_base","tail base"),
    smooth = 7,
    eps_ratio = 0.15,
    min_separation = 10,
    min_lag = 8,
    max_lag = 90,
):
    """
    戻り値: dict
      {
        'method': 'minima' or 'acf',
        'cycle_frames': int（推定周期; フレーム）,
        'minima_indices': ndarray（極小インデックス列; 無い場合は空）,
        'diffs': ndarray（連続極小の差; 周期サンプル）,
        'acf_frames': int（自己相関での候補; 無い場合0）,
        'fps': None（引数では与えない; 呼び出し側で秒変換したい場合は別途）
      }
    """
    # DLC 読み込み（候補から実際のボディパーツ名を決定）
    df = pd.read_csv(csv_path, header=[0,1,2], index_col=0)
    bodyparts = list({bp for (_, bp, _) in df.columns})
    paw = _resolve_first_present(bodyparts, paw_candidates)
    ref = _resolve_first_present(bodyparts, ref_candidates)

    paw_xy = _read_xy(csv_path, paw)  # (T,2)
    ref_xy = _read_xy(csv_path, ref)  # (T,2)

    # 欠損がある行は落とす（likelihood 未使用・補間もしない仕様）
    keep = np.isfinite(paw_xy).all(axis=1) & np.isfinite(ref_xy).all(axis=1)
    paw_xy = paw_xy[keep]
    ref_xy = ref_xy[keep]
    T = len(paw_xy)
    if T < 5:
        raise RuntimeError(f"有効フレームが少なすぎます（{T}フレーム）。欠損が多い/キー名が違う可能性。")

    # 相対座標（tail_set を原点）
    rel = paw_xy - ref_xy  # (T,2)

    # 初期基準点 p0 を安定化（最初の K=5 フレームの中央値）
    K = min(5, T)
    p0 = np.median(rel[:K, :], axis=0)

    # 距離 d(t) = ||rel(t) - p0||
    d = np.linalg.norm(rel - p0, axis=1)

    # 平滑化（移動平均）
    d_s = _moving_average(d, smooth)

    # 閾値 ε = eps_ratio * 尺度（ロバスト; IQR でもOK。ここでは中央値からの偏差中央値を利用）
    med = np.median(d_s)
    mad = np.median(np.abs(d_s - med)) + 1e-8
    scale = mad if mad > 1e-6 else (np.percentile(d_s, 75) - np.percentile(d_s, 25) + 1e-8)
    thr = med + eps_ratio * scale  # “初期に戻る”ので、med 付近より少し下（or上）を閾に…はデータ依存。
    # 実務的には絶対閾値より “初期に近い”を直接見る方が直感的：
    # thr = eps_ratio * np.median(np.linalg.norm(rel, axis=1))
    # などに差し替えてもOK

    # 極小検出
    mins = _local_minima_with_threshold(d_s, min_separation=min_separation, thr=thr)

    # 周期候補
    diffs = np.diff(mins) if len(mins) >= 2 else np.array([], dtype=int)
    if len(diffs) >= 1:
        # 安定性重視で中央値
        cycle_frames_minima = int(np.median(diffs))
    else:
        cycle_frames_minima = 0

    # フォールバック: 自己相関（x または y 片方でもOK。ここでは距離 d を使用）
    acf_frames = _autocorr_period(d_s, min_lag=min_lag, max_lag=max_lag)

    # 採択ロジック：極小が十分あれば極小法、無ければ ACF
    if cycle_frames_minima > 0:
        return {
            "method": "minima",
            "cycle_frames": cycle_frames_minima,
            "minima_indices": mins,
            "diffs": diffs,
            "acf_frames": acf_frames,
            "fps": None,
            "paw_used": paw,
            "ref_used": ref,
        }
    else:
        return {
            "method": "acf",
            "cycle_frames": int(acf_frames) if acf_frames > 0 else 0,
            "minima_indices": mins,
            "diffs": diffs,
            "acf_frames": acf_frames,
            "fps": None,
            "paw_used": paw,
            "ref_used": ref,
        }

# =============== CLI ===============
def main():
    ap = argparse.ArgumentParser(description="DLC CSV から歩行周期（フレーム）を推定")
    ap.add_argument("--csv", required=True, help="入力CSV（DLC 3-level header）")
    ap.add_argument("--paw", default="right_paw,right back paw,right_front_paw,right front paw,left_paw,left back paw",
                    help="つま先キーポイント候補（カンマ区切り）。最初に見つかったものを使用")
    ap.add_argument("--ref", default="tail_set,tail root,tail_base,tail base",
                    help="原点にする参照キーポイント候補（カンマ区切り）")
    ap.add_argument("--smooth", type=int, default=7, help="距離信号の移動平均窓幅")
    ap.add_argument("--eps-ratio", type=float, default=0.15, help="初期位置近傍の判定に用いる比率（閾値スケーリング）")
    ap.add_argument("--min-sep", type=int, default=10, help="極小検出の最小間隔（不応期; フレーム）")
    ap.add_argument("--min-lag", type=int, default=8, help="ACFの最小ラグ（フレーム）")
    ap.add_argument("--max-lag", type=int, default=90, help="ACFの最大ラグ（フレーム）")
    ap.add_argument("--fps", type=float, default=0.0, help="任意: FPS を渡すと秒換算も表示")
    args = ap.parse_args()

    paw_candidates = [s.strip() for s in args.paw.split(",") if s.strip()]
    ref_candidates = [s.strip() for s in args.ref.split(",") if s.strip()]

    res = estimate_cycle_frames_from_csv(
        csv_path=args.csv,
        paw_candidates=paw_candidates,
        ref_candidates=ref_candidates,
        smooth=args.smooth,
        eps_ratio=args.eps_ratio,
        min_separation=args.min_sep,
        min_lag=args.min_lag,
        max_lag=args.max_lag,
    )

    print("\n=== 推定結果 ===")
    print("ファイル:", args.csv)
    print("使用つま先:", res["paw_used"])
    print("参照(原点):", res["ref_used"])
    print("方式:", res["method"])
    print("推定周期(フレーム):", res["cycle_frames"])

    if args.fps and res["cycle_frames"] > 0:
        sec = res["cycle_frames"] / args.fps
        print(f"推定周期(秒) @ {args.fps:.3f} FPS:", f"{sec:.3f} s")

    if res["method"] == "minima":
        diffs = res["diffs"]
        print("検出した極小フレーム数:", len(res["minima_indices"]))
        if len(diffs) > 0:
            print("周期サンプル（連続極小差）:", diffs.tolist())
    else:
        print("ACF候補(フレーム):", res["acf_frames"])

if __name__ == "__main__":
    main()


usage: ipykernel_launcher.py [-h] --csv CSV [--paw PAW] [--ref REF]
                             [--smooth SMOOTH] [--eps-ratio EPS_RATIO]
                             [--min-sep MIN_SEP] [--min-lag MIN_LAG]
                             [--max-lag MAX_LAG] [--fps FPS]
ipykernel_launcher.py: error: argument --fps: invalid float value: 'c:\\Users\\菅野皓太\\AppData\\Roaming\\jupyter\\runtime\\kernel-v390605f14e727627fdff3bf6f7052576c9295f029.json'


SystemExit: 2

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)
