In [1]:
#@title 依存パッケージのインストール
%pip -q install mediapipe==0.10.14 opencv-python pandas numpy pyarrow tqdm


[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m35.7/35.7 MB[0m [31m48.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m294.9/294.9 kB[0m [31m17.2 MB/s[0m eta [36m0:00:00[0m
[?25h[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
grpcio-status 1.71.2 requires protobuf<6.0dev,>=5.26.1, but you have protobuf 4.25.8 which is incompatible.
ydf 0.13.0 requires protobuf<7.0.0,>=5.29.1, but you have protobuf 4.25.8 which is incompatible.[0m[31m
[0m

In [2]:
#@title 解析対象の動画ファイルをアップロード（1ファイル）
from google.colab import files
up = files.upload()  # ここで動画(.mp4 等)を選択
assert len(up) == 1, "1ファイルのみアップロードしてください。"
VIDEO_PATH = list(up.keys())[0]
print("Uploaded:", VIDEO_PATH)


Saving WIN_20250829_12_11_22_Pro.mp4 to WIN_20250829_12_11_22_Pro.mp4
Uploaded: WIN_20250829_12_11_22_Pro.mp4


In [3]:
#@title 解析実行セル（実行するだけでOK）
import os
import json
import math
import csv
from pathlib import Path
from typing import List, Tuple, Dict, Optional

import cv2
import numpy as np
import pandas as pd
from tqdm import tqdm

import mediapipe as mp

# ---- パラメータ（必要に応じて調整） ----
PROCESS_MAX_SIDE = 1280   # 推論入力の最大辺長（高速化のためリサイズ）。原寸で良ければ None
MIN_DET_CONF = 0.5        # 検出信頼度しきい値
MIN_TRK_CONF = 0.5        # トラッキング信頼度しきい値
DRAW_SCALE = 1.0          # オーバーレイの線太さスケール
FLUSH_INTERVAL = 300      # CSVストリーム出力のフレーム数間隔（メモリ節約）
DETAILED_INDEX_LABEL = True  # 詳細動画で各キーポイント番号を表示

# ---- 出力先の準備 ----
base = Path(VIDEO_PATH).stem
out_dir = Path(f"/content/output_{base}")
out_dir.mkdir(parents=True, exist_ok=True)

overlay_detailed_path = str(out_dir / f"{base}_overlay_detailed.mp4")
overlay_simple_path   = str(out_dir / f"{base}_overlay_simple.mp4")
long_csv_path         = str(out_dir / f"{base}_landmarks_long.csv")
summary_csv_path      = str(out_dir / f"{base}_summary_metrics.csv")
schema_json_path      = str(out_dir / f"{base}_schema_and_legend.json")

# ---- MediaPipe のインデックスと名前（後解析と可視化のため） ----
POSE_NAMES = {
    0:"nose",1:"left_eye_inner",2:"left_eye",3:"left_eye_outer",4:"right_eye_inner",5:"right_eye",6:"right_eye_outer",
    7:"left_ear",8:"right_ear",9:"mouth_left",10:"mouth_right",11:"left_shoulder",12:"right_shoulder",13:"left_elbow",
    14:"right_elbow",15:"left_wrist",16:"right_wrist",17:"left_pinky",18:"right_pinky",19:"left_index",20:"right_index",
    21:"left_thumb",22:"right_thumb",23:"left_hip",24:"right_hip",25:"left_knee",26:"right_knee",27:"left_ankle",
    28:"right_ankle",29:"left_heel",30:"right_heel",31:"left_foot_index",32:"right_foot_index"
}
HAND_NAMES = {
    0:"wrist",1:"thumb_cmc",2:"thumb_mcp",3:"thumb_ip",4:"thumb_tip",
    5:"index_mcp",6:"index_pip",7:"index_dip",8:"index_tip",
    9:"middle_mcp",10:"middle_pip",11:"middle_dip",12:"middle_tip",
    13:"ring_mcp",14:"ring_pip",15:"ring_dip",16:"ring_tip",
    17:"pinky_mcp",18:"pinky_pip",19:"pinky_dip",20:"pinky_tip"
}
# 顔の唇コンター（MediaPipe FaceMeshの代表インデックス）
LIPS_OUTER = [61,146,91,181,84,17,314,405,321,375,291]
LIPS_INNER = [78,95,88,178,87,14,317,402,318,324,308]

# ---- 色定義（BGR） ----
COLOR_POSE = (0, 255, 255)        # 体（シアン）
COLOR_LH   = (0, 255, 0)          # 左手（緑）
COLOR_RH   = (0, 0, 255)          # 右手（赤）
COLOR_LIPS_OUT = (255, 0, 255)    # 唇外周（マゼンタ）
COLOR_LIPS_IN  = (180, 0, 180)    # 唇内周（濃いマゼンタ）
COLOR_TEXT_BG  = (0, 0, 0)
COLOR_TEXT_FG  = (255, 255, 255)

# ---- 接続（スケルトン） ----
POSE_CONNECTIONS = list(mp.solutions.pose.POSE_CONNECTIONS)
HAND_CONNECTIONS = list(mp.solutions.hands.HAND_CONNECTIONS)

# ---- ユーティリティ ----
def put_text_with_bg(img, text, org, scale=0.4, thickness=1, fg=COLOR_TEXT_FG, bg=COLOR_TEXT_BG):
    (w, h), baseline = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, scale, thickness)
    x, y = org
    cv2.rectangle(img, (x, y-h-baseline), (x+w, y+baseline//2), bg, -1)
    cv2.putText(img, text, (x, y), cv2.FONT_HERSHEY_SIMPLEX, scale, fg, thickness, cv2.LINE_AA)

def clamp01(x):
    return max(0.0, min(1.0, x))

def norm_to_pix(x_norm, y_norm, w, h, flip_back=True):
    # 解析は左右反転済み：元向きに戻すには x を 1 - x にする
    x = (1.0 - x_norm) if flip_back else x_norm
    return int(round(clamp01(x) * (w-1))), int(round(clamp01(y_norm) * (h-1)))

def angle_deg(a, b, c):
    # 3点 a-b-c での∠B を度で返す（a,b,c は (x,y)）
    ba = np.array([a[0]-b[0], a[1]-b[1]], dtype=float)
    bc = np.array([c[0]-b[0], c[1]-b[1]], dtype=float)
    na = np.linalg.norm(ba); nc = np.linalg.norm(bc)
    if na < 1e-6 or nc < 1e-6:
        return np.nan
    cosang = np.dot(ba, bc) / (na*nc)
    cosang = max(-1.0, min(1.0, cosang))
    return math.degrees(math.acos(cosang))

def distance(p, q):
    return float(np.linalg.norm(np.array(p, dtype=float) - np.array(q, dtype=float)))

def ensure_fps(fps):
    # 一部の動画で fps=0 が返ることがある
    return fps if fps and fps > 1e-3 else 30.0

# ---- 動画I/Oの準備 ----
cap = cv2.VideoCapture(VIDEO_PATH)
assert cap.isOpened(), "動画を開けませんでした。ファイル形式をご確認ください。"

W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
FPS = ensure_fps(cap.get(cv2.CAP_PROP_FPS))
N   = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) if cap.get(cv2.CAP_PROP_FRAME_COUNT) > 0 else None

fourcc = cv2.VideoWriter_fourcc(*"mp4v")
vw_detail = cv2.VideoWriter(overlay_detailed_path, fourcc, FPS, (W, H))
vw_simple = cv2.VideoWriter(overlay_simple_path,   fourcc, FPS, (W, H))

# ---- CSV（長い形式）のストリーム出力を準備 ----
long_header = ["frame","time_ms","part","side","landmark_index","name","x","y","z_norm","visibility","world_x","world_y","world_z"]
first_chunk_written = False

# 要約CSVの準備
summary_rows = []
summary_header = [
    "frame","time_ms",
    "mouth_open_px","mouth_width_px","mouth_open_ratio",
    "left_hand_open","right_hand_open",
    "left_elbow_deg","right_elbow_deg"
]

# スキーマ（インデックス→名前・色・凡例）
schema = {
    "pose_names": POSE_NAMES,
    "hand_names": HAND_NAMES,
    "lips_indices": {
        "outer": LIPS_OUTER,
        "inner": LIPS_INNER
    },
    "colors_bgr": {
        "pose": COLOR_POSE,
        "left_hand": COLOR_LH,
        "right_hand": COLOR_RH,
        "lips_outer": COLOR_LIPS_OUT,
        "lips_inner": COLOR_LIPS_IN
    },
    "notes": {
        "orientation": "解析は鏡像で実施し、保存する座標・動画は元動画と同じ向き（非反転）に戻しています。",
        "coordinates": "x, y はピクセル座標（左上原点、右向きx・下向きy）。z_norm はMediaPipeの正規化zです。world_* はPose/Handsの世界座標（xは鏡像補正で符号反転）。"
    }
}
with open(schema_json_path, "w", encoding="utf-8") as f:
    json.dump(schema, f, ensure_ascii=False, indent=2)

# ---- MediaPipe 初期化（Holisticで一括推論） ----
mp_drawing = mp.solutions.drawing_utils
mp_dspec = mp.solutions.drawing_styles
holistic = mp.solutions.holistic.Holistic(
    static_image_mode=False,
    model_complexity=2,
    refine_face_landmarks=True,
    enable_segmentation=False,
    min_detection_confidence=MIN_DET_CONF,
    min_tracking_confidence=MIN_TRK_CONF,
)

# ---- メインループ ----
pbar = tqdm(total=N if N else 0, desc="Processing", disable=(N is None))
rows_buf = []

frame_idx = 0
while True:
    ret, frame = cap.read()
    if not ret:
        break

    # 元フレーム（非反転）
    orig_bgr = frame
    h, w = orig_bgr.shape[:2]

    # 解析用に左右反転＆（必要なら）縮小
    anal_bgr = cv2.flip(orig_bgr, 1)
    if PROCESS_MAX_SIDE is not None:
        scale = min(1.0, PROCESS_MAX_SIDE / max(h, w))
    else:
        scale = 1.0
    if scale < 1.0:
        anal_bgr = cv2.resize(anal_bgr, (int(w*scale), int(h*scale)), interpolation=cv2.INTER_AREA)

    # MediaPipe は RGB 入力
    anal_rgb = cv2.cvtColor(anal_bgr, cv2.COLOR_BGR2RGB)
    res = holistic.process(anal_rgb)

    # 正規化 → 元向きピクセル座標に戻す
    def to_pix_list(landmarks, have_world=False):
        pix = []
        world = []
        if landmarks is None:
            return None, None
        for lm in landmarks.landmark:
            x_px, y_px = norm_to_pix(lm.x, lm.y, w, h, flip_back=True)
            pix.append((x_px, y_px, lm.z))
            if have_world:
                # world座標は鏡像補正としてxの符号を反転
                world.append((-lm.x, lm.y, lm.z))  # 注意: MediaPipeのworldはm単位・相対座標（ここは概念上）
            else:
                world.append((None, None, None))
        return pix, world

    # Pose
    pose_pix, pose_world = (None, None)
    if res.pose_landmarks:
        pose_pix, pose_world = to_pix_list(res.pose_landmarks, have_world=False)  # holisticはpose_world_landmarksもある
    # Hands
    lh_pix, lh_world = (None, None)
    rh_pix, rh_world = (None, None)
    if res.left_hand_landmarks:
        # 解析は鏡像入力なので、元向きでは左右が入れ替わる：保存時は Right として扱う
        rh_pix, rh_world = to_pix_list(res.left_hand_landmarks, have_world=False)
    if res.right_hand_landmarks:
        # 解析は鏡像入力なので、元向きでは左右が入れ替わる：保存時は Left として扱う
        lh_pix, lh_world = to_pix_list(res.right_hand_landmarks, have_world=False)
    # Face（唇の外周・内周のみ抽出）
    lips_outer_pts = []
    lips_inner_pts = []
    if res.face_landmarks:
        fl = res.face_landmarks.landmark
        for idx in LIPS_OUTER:
            x_px, y_px = norm_to_pix(fl[idx].x, fl[idx].y, w, h, flip_back=True)
            lips_outer_pts.append((x_px, y_px, fl[idx].z))
        for idx in LIPS_INNER:
            x_px, y_px = norm_to_pix(fl[idx].x, fl[idx].y, w, h, flip_back=True)
            lips_inner_pts.append((x_px, y_px, fl[idx].z))

    # ---- 長い形式CSVへ追記用レコード作成 ----
    t_ms = int(round(1000.0 * frame_idx / FPS))
    # Pose 33点
    if pose_pix is not None:
        for i, (x, y, z) in enumerate(pose_pix):
            name = POSE_NAMES.get(i, "")
            # MediaPipe Poseには visibility があるが holistic.pose_landmarks.landmark[i].visibility 取得には res.pose_landmarks を参照
            vis = res.pose_landmarks.landmark[i].visibility if res.pose_landmarks else None
            rows_buf.append([frame_idx, t_ms, "pose", None, i, name, x, y, z, vis, None, None, None])
    # Left/Right hands（元向きでの左右に注意）
    if lh_pix is not None:
        for i, (x, y, z) in enumerate(lh_pix):
            rows_buf.append([frame_idx, t_ms, "hand", "Left", i, HAND_NAMES.get(i, ""), x, y, z, None, None, None, None])
    if rh_pix is not None:
        for i, (x, y, z) in enumerate(rh_pix):
            rows_buf.append([frame_idx, t_ms, "hand", "Right", i, HAND_NAMES.get(i, ""), x, y, z, None, None, None, None])
    # Lips (outer/inner)
    for k, pts in [("lips_outer", lips_outer_pts), ("lips_inner", lips_inner_pts)]:
        for j, (x, y, z) in enumerate(pts):
            rows_buf.append([frame_idx, t_ms, k, None, j, k, x, y, z, None, None, None, None])

    # ---- 要約指標の計算 ----
    # 口の開閉
    def safe_get(pt_list, idx):
        return (pt_list[idx][0], pt_list[idx][1]) if pt_list and idx < len(pt_list) else None

    mouth_open_px = mouth_width_px = mouth_ratio = np.nan
    # 内唇の上下（13,14）と口角（61,291）を使う
    if res.face_landmarks:
        fl = res.face_landmarks.landmark
        def n2p(i):
            return norm_to_pix(fl[i].x, fl[i].y, w, h, flip_back=True)
        upper = n2p(13); lower = n2p(14); leftc = n2p(61); rightc = n2p(291)
        if upper and lower and leftc and rightc:
            mouth_open_px  = distance(upper, lower)
            mouth_width_px = distance(leftc, rightc)
            mouth_ratio    = (mouth_open_px / mouth_width_px) if mouth_width_px > 1e-6 else np.nan

    # 手の開き（各指先-手首距離の平均 / 手の基準長）
    def hand_open(hand_pix):
        if hand_pix is None:
            return np.nan
        wrist = hand_pix[0][:2]
        # 中指MCP(9)を基準長に
        base = distance(wrist, hand_pix[9][:2]) if len(hand_pix) > 9 else 0.0
        tips = [4,8,12,16,20]
        dists = [distance(wrist, hand_pix[i][:2]) for i in tips if len(hand_pix) > i]
        if base < 1e-6 or len(dists) == 0:
            return np.nan
        return float(np.mean(dists) / base)

    left_hand_open  = hand_open(lh_pix)
    right_hand_open = hand_open(rh_pix)

    # 肘角度（左：11-13-15, 右：12-14-16）
    left_elbow_deg = right_elbow_deg = np.nan
    if pose_pix is not None and len(pose_pix) >= 17:
        l_sh, l_el, l_wr = pose_pix[11][:2], pose_pix[13][:2], pose_pix[15][:2]
        r_sh, r_el, r_wr = pose_pix[12][:2], pose_pix[14][:2], pose_pix[16][:2]
        left_elbow_deg  = angle_deg(l_sh, l_el, l_wr)
        right_elbow_deg = angle_deg(r_sh, r_el, r_wr)

    summary_rows.append([
        frame_idx, t_ms,
        mouth_open_px, mouth_width_px, mouth_ratio,
        left_hand_open, right_hand_open,
        left_elbow_deg, right_elbow_deg
    ])

    # ---- オーバーレイ描画（元向きのまま） ----
    detailed = orig_bgr.copy()
    simple   = orig_bgr.copy()

    # スケール（解像度に応じた太さと半径）
    thk = max(1, int(2 * DRAW_SCALE))
    rad = max(1, int(2 * DRAW_SCALE))
    thk_bold = max(2, int(4 * DRAW_SCALE))
    rad_bold = max(2, int(3 * DRAW_SCALE))

    # 体：スケルトン＆点
    if pose_pix is not None:
        # 線
        for (i, j) in POSE_CONNECTIONS:
            if i < len(pose_pix) and j < len(pose_pix):
                pi = (pose_pix[i][0], pose_pix[i][1])
                pj = (pose_pix[j][0], pose_pix[j][1])
                cv2.line(detailed, pi, pj, COLOR_POSE, thk)
                cv2.line(simple,   pi, pj, COLOR_POSE, thk_bold)
        # 点＋番号（詳細）
        for i, (x, y, _) in enumerate(pose_pix):
            cv2.circle(detailed, (x, y), rad, COLOR_POSE, -1)
            if DETAILED_INDEX_LABEL:
                put_text_with_bg(detailed, f"P{i}", (x+4, y-4), scale=0.45, thickness=1)
        # 簡易では点は省略（線だけ）

    # 手：左
    if lh_pix is not None:
        for (i, j) in HAND_CONNECTIONS:
            if i < len(lh_pix) and j < len(lh_pix):
                cv2.line(detailed, (lh_pix[i][0], lh_pix[i][1]), (lh_pix[j][0], lh_pix[j][1]), COLOR_LH, thk)
                cv2.line(simple,   (lh_pix[i][0], lh_pix[i][1]), (lh_pix[j][0], lh_pix[j][1]), COLOR_LH, thk_bold)
        for i, (x, y, _) in enumerate(lh_pix):
            cv2.circle(detailed, (x, y), rad, COLOR_LH, -1)
            if DETAILED_INDEX_LABEL:
                put_text_with_bg(detailed, f"L{i}", (x+3, y-3), scale=0.45, thickness=1, fg=(0,0,0), bg=(180,255,180))

    # 手：右
    if rh_pix is not None:
        for (i, j) in HAND_CONNECTIONS:
            if i < len(rh_pix) and j < len(rh_pix):
                cv2.line(detailed, (rh_pix[i][0], rh_pix[i][1]), (rh_pix[j][0], rh_pix[j][1]), COLOR_RH, thk)
                cv2.line(simple,   (rh_pix[i][0], rh_pix[i][1]), (rh_pix[j][0], rh_pix[j][1]), COLOR_RH, thk_bold)
        for i, (x, y, _) in enumerate(rh_pix):
            cv2.circle(detailed, (x, y), rad, COLOR_RH, -1)
            if DETAILED_INDEX_LABEL:
                put_text_with_bg(detailed, f"R{i}", (x+3, y-3), scale=0.45, thickness=1, fg=(255,255,255), bg=(80,80,180))

    # 唇（外周・内周）
    def draw_poly(points, img_detail, img_simple, color, label_prefix):
        if len(points) >= 2:
            # ポリライン（閉曲線にしたいので最初を最後に追加）
            pts = np.array([(p[0], p[1]) for p in points] + [(points[0][0], points[0][1])], dtype=np.int32)
            cv2.polylines(img_detail, [pts], isClosed=True, color=color, thickness=thk)
            cv2.polylines(img_simple, [pts], isClosed=True, color=color, thickness=thk_bold)
            # 詳細：点と番号
            for k, (x, y, _) in enumerate(points):
                cv2.circle(img_detail, (x, y), rad, color, -1)
                if DETAILED_INDEX_LABEL:
                    put_text_with_bg(img_detail, f"{label_prefix}{k}", (x+2, y-2), scale=0.45, thickness=1)

    if lips_outer_pts:
        draw_poly(lips_outer_pts, detailed, simple, COLOR_LIPS_OUT, "Lo")
    if lips_inner_pts:
        draw_poly(lips_inner_pts, detailed, simple, COLOR_LIPS_IN,  "Li")

    # 2本の動画に書き込み
    vw_detail.write(detailed)
    vw_simple.write(simple)

    # 長い形式CSV：バッファを一定間隔でフラッシュ
    if (frame_idx + 1) % FLUSH_INTERVAL == 0:
        df_chunk = pd.DataFrame(rows_buf, columns=long_header)
        mode = "w" if not first_chunk_written else "a"
        header = not first_chunk_written
        df_chunk.to_csv(long_csv_path, index=False, mode=mode, header=header, encoding="utf-8")
        rows_buf = []
        first_chunk_written = True

    frame_idx += 1
    if N:
        pbar.update(1)

# ループ終了後のフラッシュ
if rows_buf:
    df_chunk = pd.DataFrame(rows_buf, columns=long_header)
    mode = "w" if not first_chunk_written else "a"
    header = not first_chunk_written
    df_chunk.to_csv(long_csv_path, index=False, mode=mode, header=header, encoding="utf-8")
    rows_buf = []
    first_chunk_written = True

cap.release()
vw_detail.release()
vw_simple.release()
holistic.close()
pbar.close()

# 要約CSVを保存
pd.DataFrame(summary_rows, columns=summary_header).to_csv(summary_csv_path, index=False, encoding="utf-8")

print("== 完了 ==")
print("出力フォルダ:", out_dir)
print("詳細オーバーレイ: ", overlay_detailed_path)
print("簡易オーバーレイ: ", overlay_simple_path)
print("長い形式CSV:      ", long_csv_path)
print("要約CSV:          ", summary_csv_path)
print("スキーマJSON:     ", schema_json_path)


Downloading model to /usr/local/lib/python3.12/dist-packages/mediapipe/modules/pose_landmark/pose_landmark_heavy.tflite


Processing: 100%|██████████| 482/482 [02:13<00:00,  3.60it/s]

== 完了 ==
出力フォルダ: /content/output_WIN_20250829_12_11_22_Pro
詳細オーバーレイ:  /content/output_WIN_20250829_12_11_22_Pro/WIN_20250829_12_11_22_Pro_overlay_detailed.mp4
簡易オーバーレイ:  /content/output_WIN_20250829_12_11_22_Pro/WIN_20250829_12_11_22_Pro_overlay_simple.mp4
長い形式CSV:       /content/output_WIN_20250829_12_11_22_Pro/WIN_20250829_12_11_22_Pro_landmarks_long.csv
要約CSV:           /content/output_WIN_20250829_12_11_22_Pro/WIN_20250829_12_11_22_Pro_summary_metrics.csv
スキーマJSON:      /content/output_WIN_20250829_12_11_22_Pro/WIN_20250829_12_11_22_Pro_schema_and_legend.json



