# 중간 산출물 존재하는 방식
Frame, json, vis, output_mp4 총 4개의 산출물을 만들며 돌아가는 코드

해당 코드에서 Frame, vis 같은 Frame 당 output을 만드는데 시간이 꽤나 오래걸림.

추후, 수정 예정

In [1]:
import os, sys, torch
import mmpose, mmdet, mmengine, mmcv
from pathlib import Path

print("CUDA available:", torch.cuda.is_available())
print("GPU count:", torch.cuda.device_count())
print("mmpose:", mmpose.__version__, "| mmdet:", mmdet.__version__, "| mmengine:", mmengine.__version__, "| mmcv:", mmcv.__version__)

mp4_name = "M01_VISIT2_상지"  # 파일명만

paths_tpl = {
    "MP4_PATH":    "../../../../data/김원 보산진 연구/{mp4_name}.MP4",
    "FRAME_DIR":   "../data/Patient_data/new_code/frames/{mp4_name}_frame",
    "JSON_DIR":    "../data/Patient_data/new_code/json/{mp4_name}_json",
    "VIS_DIR":     "../data/Patient_data/new_code/vis/{mp4_name}_vis",
    "OUTPUT_MP4":  "../data/Patient_data/new_code/output/{mp4_name}_output.mp4",
    "DET_CONFIG":  "../sapiens/pose/demo/mmdetection_cfg/rtmdet_m_640-8xb32_coco-person_no_nms.py",
    "DET_CKPT":    "../sapiens/pose/checkpoints/rtmdet_m_8xb32-100e_coco-obj365-person-235e8209.pth",
    "POSE_CONFIG": "../sapiens/pose/configs/sapiens_pose/coco/sapiens_0.3b-210e_coco-1024x768.py",
    "POSE_CKPT":   "../sapiens/pose/checkpoints/sapiens_0.3b/sapiens_0.3b_coco_best_coco_AP_796.pth",
}

# 1) {mp4_name} 치환
paths = {k: v.format(mp4_name=mp4_name) for k, v in paths_tpl.items()}

# 2) 디렉터리 생성 (치환 후에!)
for k in ("FRAME_DIR", "JSON_DIR", "VIS_DIR"):
    Path(paths[k]).mkdir(parents=True, exist_ok=True)

# (선택) OUTPUT_MP4는 파일이므로 부모 폴더만 보장
Path(Path(paths["OUTPUT_MP4"]).parent).mkdir(parents=True, exist_ok=True)

# 3) 경로 체크
print("\n[PATH CHECK]")
for k in ("FRAME_DIR", "JSON_DIR", "VIS_DIR",
          "MP4_PATH", "OUTPUT_MP4", "DET_CONFIG", "DET_CKPT", "POSE_CONFIG", "POSE_CKPT"):
    p = paths[k]
    exists = Path(p).exists()
    typ = "DIR " if k in ("FRAME_DIR", "JSON_DIR", "VIS_DIR") else "FILE"
    print(f"{k:12s} | {typ} | {'OK' if exists else 'MISSING'} | {p}")


CUDA available: True
GPU count: 1
mmpose: 1.3.2 | mmdet: 3.2.0 | mmengine: 0.10.7 | mmcv: 2.1.0

[PATH CHECK]
FRAME_DIR    | DIR  | OK | ../data/Patient_data/new_code/frames/M01_VISIT2_상지_frame
JSON_DIR     | DIR  | OK | ../data/Patient_data/new_code/json/M01_VISIT2_상지_json
VIS_DIR      | DIR  | OK | ../data/Patient_data/new_code/vis/M01_VISIT2_상지_vis
MP4_PATH     | FILE | OK | ../../../../data/김원 보산진 연구/M01_VISIT2_상지.MP4
OUTPUT_MP4   | FILE | OK | ../data/Patient_data/new_code/output/M01_VISIT2_상지_output.mp4
DET_CONFIG   | FILE | OK | ../sapiens/pose/demo/mmdetection_cfg/rtmdet_m_640-8xb32_coco-person_no_nms.py
DET_CKPT     | FILE | OK | ../sapiens/pose/checkpoints/rtmdet_m_8xb32-100e_coco-obj365-person-235e8209.pth
POSE_CONFIG  | FILE | OK | ../sapiens/pose/configs/sapiens_pose/coco/sapiens_0.3b-210e_coco-1024x768.py
POSE_CKPT    | FILE | OK | ../sapiens/pose/checkpoints/sapiens_0.3b/sapiens_0.3b_coco_best_coco_AP_796.pth


In [2]:
# --- MP4_PATH → FRAME_DIR: 프레임 추출 저장 ---
import os
import cv2
from pathlib import Path
from tqdm import tqdm

# 기존 셀에서 만든 paths 사용
mp4_path   = Path(paths["MP4_PATH"])
frame_dir  = Path(paths["FRAME_DIR"])

# (옵션) 앞 N초만 저장하고 싶으면 숫자를 넣고, 전체 저장은 None
DURATION_SEC = None  # 예: 20

# 대소문자 확장자 이슈 대비(대안 경로 자동 탐색)
if not mp4_path.exists():
    alt = mp4_path.with_suffix(".mp4") if mp4_path.suffix != ".mp4" else mp4_path.with_suffix(".MP4")
    if alt.exists():
        print(f"[INFO] 입력 비디오 대안 경로 사용: {alt}")
        mp4_path = alt

print("[INFO] CWD:", os.getcwd())
print("[INFO] Video :", mp4_path.resolve())
print("[INFO] Output:", frame_dir.resolve())

cap = cv2.VideoCapture(str(mp4_path))
if not cap.isOpened():
    raise RuntimeError(f"비디오를 열 수 없습니다: {mp4_path}")

fps = cap.get(cv2.CAP_PROP_FPS) or 0.0
total_prop = cap.get(cv2.CAP_PROP_FRAME_COUNT)
total_frames = int(total_prop) if total_prop and total_prop > 0 else -1

# 저장할 목표 프레임 수 계산(전체 또는 앞 N초)
if DURATION_SEC is not None and fps > 0:
    target_frames = int(round(DURATION_SEC * fps))
    if total_frames > 0:
        target_frames = min(target_frames, total_frames)
else:
    target_frames = total_frames if total_frames > 0 else None

print(f"FPS: {fps:.3f} | 총 프레임: {total_frames} | 저장 예정: {target_frames if target_frames is not None else '전체(시간 기준)'}")

saved = 0
idx = 0
pbar_total = target_frames if target_frames is not None else (int(fps * (DURATION_SEC or 10)) or 100)
pbar = tqdm(total=pbar_total, desc="Extracting frames", unit="frame")

while True:
    ret, frame = cap.read()
    if not ret:
        break

    # 시간 제한 모드일 때( FPS를 못 읽는 경우 포함 )
    pos_msec = cap.get(cv2.CAP_PROP_POS_MSEC)

    if target_frames is not None:
        if idx >= target_frames:
            break
    elif DURATION_SEC is not None:
        if pos_msec >= DURATION_SEC * 1000:
            break

    out_path = frame_dir / f"frame_{idx:06d}.jpg"
    ok = cv2.imwrite(str(out_path), frame, [int(cv2.IMWRITE_JPEG_QUALITY), 95])
    if ok:
        saved += 1
        if target_frames is not None:
            pbar.update(1)
        else:
            # 시간 기준일 때 대략 진행률
            pbar.update(1 if pbar.n < pbar.total else 0)
    idx += 1

pbar.close()
cap.release()
print(f"완료: {saved}장 저장 → {frame_dir}")


[INFO] CWD: /workspace/nas203/ds_RehabilitationMedicineData/IDs/Kimjihoo/3_project_HCCmove/ipynb
[INFO] Video : /workspace/nas203/ds_RehabilitationMedicineData/data/김원 보산진 연구/M01_VISIT2_상지.MP4
[INFO] Output: /workspace/nas203/ds_RehabilitationMedicineData/IDs/Kimjihoo/3_project_HCCmove/data/Patient_data/new_code/frames/M01_VISIT2_상지_frame
FPS: 29.970 | 총 프레임: 11370 | 저장 예정: 11370


Extracting frames:   0% 30/11370 [00:05<37:50,  4.99frame/s] 

KeyboardInterrupt: 

In [3]:
# 앞 셀에서 paths를 만든 상태여야 함
assert 'paths' in globals(), "앞 셀에서 paths를 먼저 생성하세요."

import mmpretrain  # VisionTransformer 등록

import mmcv
from pathlib import Path
from mmdet.apis import init_detector, inference_detector
from mmpose.apis import init_model as init_pose_estimator, inference_topdown
from mmpose.utils import adapt_mmdet_pipeline
from mmpose.registry import VISUALIZERS
from mmpose.evaluation.functional import nms
from mmpose.structures import merge_data_samples, split_instances

# paths 딕셔너리에서 경로 바인딩
DET_CONFIG  = paths["DET_CONFIG"]
DET_CKPT    = paths["DET_CKPT"]
POSE_CONFIG = paths["POSE_CONFIG"]
POSE_CKPT   = paths["POSE_CKPT"]

# (선택) 프레임 폴더도 같이 써야 한다면
DATA_DIR = paths["FRAME_DIR"]

# (권장) 존재 여부 체크
for k in ("DET_CONFIG", "DET_CKPT", "POSE_CONFIG", "POSE_CKPT"):
    p = Path(globals()[k])
    assert p.exists(), f"{k} 경로를 찾을 수 없습니다: {p}"

# detector
detector = init_detector(DET_CONFIG, DET_CKPT, device="cuda:0")
detector.cfg = adapt_mmdet_pipeline(detector.cfg)

# pose estimator (override_ckpt_meta 제거)
pose_estimator = init_pose_estimator(
    POSE_CONFIG, POSE_CKPT,
    device="cuda:0",
    cfg_options=dict(model=dict(test_cfg=dict(output_heatmaps=False)))
)

# 시각화기
visualizer = VISUALIZERS.build(pose_estimator.cfg.visualizer)
visualizer.set_dataset_meta(pose_estimator.dataset_meta, skeleton_style="mmpose")

print("모델 초기화 완료 ✅")


  from pkg_resources import DistributionNotFound, get_distribution
  _bootstrap._exec(spec, module)


Loads checkpoint by local backend from path: ../sapiens/pose/checkpoints/rtmdet_m_8xb32-100e_coco-obj365-person-235e8209.pth
Loads checkpoint by local backend from path: ../sapiens/pose/checkpoints/sapiens_0.3b/sapiens_0.3b_coco_best_coco_AP_796.pth
The model and loaded state dict do not match exactly

missing keys in source state_dict: head.deconv_layers.1.weight, head.deconv_layers.1.bias, head.deconv_layers.1.running_mean, head.deconv_layers.1.running_var, head.deconv_layers.4.weight, head.deconv_layers.4.bias, head.deconv_layers.4.running_mean, head.deconv_layers.4.running_var, head.conv_layers.1.weight, head.conv_layers.1.bias, head.conv_layers.1.running_mean, head.conv_layers.1.running_var, head.conv_layers.4.weight, head.conv_layers.4.bias, head.conv_layers.4.running_mean, head.conv_layers.4.running_var

모델 초기화 완료 ✅


In [4]:
# --- 30→15fps 다운샘플 + 주기적 검출 + 이전 포즈 bbox 재사용 ---
import os, re, glob, json, numpy as np, mmcv, cv2
from tqdm import tqdm
from mmdet.apis import inference_detector
from mmpose.apis import inference_topdown
from mmpose.evaluation.functional import nms
from mmpose.structures import merge_data_samples, split_instances

# paths 딕셔너리에서 경로 가져오기 (앞 셀에서 paths 생성되어 있어야 함)
assert 'paths' in globals(), "앞 셀에서 paths를 먼저 생성하세요."
FRAME_DIR = paths["FRAME_DIR"]
JSON_DIR  = paths["JSON_DIR"]
os.makedirs(JSON_DIR, exist_ok=True)  # JSON 저장 폴더 보장

def natural_key(s: str):
    base = os.path.basename(s)
    return [int(t) if t.isdigit() else t.lower() for t in re.split(r'(\d+)', base)]

all_frames = sorted(
    [p for p in glob.glob(os.path.join(FRAME_DIR, "*"))
     if p.lower().endswith((".jpg", ".png", ".jpeg"))],
    key=natural_key)
assert all_frames, f"이미지 없음: {FRAME_DIR}"
frames = all_frames[::2]  # 30fps → 15fps
print(f"전체 {len(all_frames)}장 → 15fps용 {len(frames)}장 선택")

def to_py(obj):
    import numpy as _np
    if isinstance(obj, _np.ndarray): return obj.tolist()
    if isinstance(obj, (_np.floating,)): return float(obj)
    if isinstance(obj, (_np.integer,)):  return int(obj)
    if isinstance(obj, dict):  return {k: to_py(v) for k, v in obj.items()}
    if isinstance(obj, (list, tuple)): return [to_py(v) for v in obj]
    return obj

def clip_xyxy(xyxy, w, h):
    x1, y1, x2, y2 = xyxy
    return [max(0, x1), max(0, y1), min(w - 1, x2), min(h - 1, y2)]

def bbox_from_keypoints(kpts, scores, thr=0.3, pad_scale=1.25, img_wh=None):
    valid = scores >= thr
    if valid.sum() == 0:
        return None
    xy = kpts[valid]
    x1, y1 = xy.min(axis=0)
    x2, y2 = xy.max(axis=0)
    cx, cy = (x1 + x2) / 2, (y1 + y2) / 2
    w, h = max(2.0, x2 - x1), max(2.0, y2 - y1)
    w2, h2 = w * pad_scale / 2, h * pad_scale / 2
    bx = [cx - w2, cy - h2, cx + w2, cy + h2]
    if img_wh is not None:
        bx = clip_xyxy(bx, img_wh[0], img_wh[1])
    return bx

det_every = 5
det_thr   = 0.5
nms_thr   = 0.5
kpt_thr   = 0.3
pad_scale = 1.30
miss_max  = 2

prev_bbox = None
miss_cnt  = 0
ok, fail  = 0, 0

for i, img_path in enumerate(tqdm(frames)):
    try:
        img_bgr = cv2.imread(img_path)
        if img_bgr is None:
            continue
        H, W = img_bgr.shape[:2]
        img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)

        need_det = (i % det_every == 0) or (prev_bbox is None) or (miss_cnt > miss_max)
        if need_det:
            det = inference_detector(detector, img_rgb)
            pred = det.pred_instances.cpu().numpy()
            if len(pred.bboxes):
                bbs = np.concatenate((pred.bboxes, pred.scores[:, None]), axis=1)
                keep = (pred.labels == 0) & (pred.scores > det_thr)
                bbs = bbs[keep]
                if len(bbs) > 0:
                    bbs = bbs[nms(bbs, nms_thr), :4]
                if len(bbs) > 0:
                    areas = (bbs[:, 2] - bbs[:, 0]) * (bbs[:, 3] - bbs[:, 1])
                    prev_bbox = bbs[np.argmax(areas)].tolist()
                    miss_cnt = 0
                else:
                    prev_bbox = None
            else:
                prev_bbox = None

        bboxes_np = (np.array([prev_bbox], dtype=np.float32)
                     if prev_bbox is not None else np.empty((0, 4), dtype=np.float32))

        pose_results = inference_topdown(pose_estimator, img_rgb, bboxes_np)
        data_sample  = merge_data_samples(pose_results)

        inst = data_sample.get('pred_instances', None)
        if inst is not None and len(inst.get('keypoints', [])) > 0:
            kpts_all    = inst['keypoints']
            kscores_all = inst['keypoint_scores']
            idx = int(np.argmax(kscores_all.mean(axis=1)))
            kpts, kscores = kpts_all[idx], kscores_all[idx]
            nb = bbox_from_keypoints(kpts, kscores, thr=kpt_thr, pad_scale=pad_scale, img_wh=(W, H))
            if nb is not None:
                prev_bbox = nb
                miss_cnt = 0
            else:
                miss_cnt += 1
        else:
            miss_cnt += 1

        # JSON 저장
        if inst is not None:
            inst_list = split_instances(inst)
            payload = dict(meta_info=pose_estimator.dataset_meta, instance_info=inst_list)
            base = os.path.splitext(os.path.basename(img_path))[0]
            json_path = os.path.join(JSON_DIR, base + ".json")
            with open(json_path, "w", encoding="utf-8") as f:
                json.dump(to_py(payload), f, ensure_ascii=False, indent=2)

        ok += 1

    except Exception as e:
        fail += 1
        print("에러:", os.path.basename(img_path), "->", e)

print(f"완료. 성공 {ok}, 실패 {fail}, 출력: {JSON_DIR}")


전체 31장 → 15fps용 16장 선택



  return _VF.meshgrid(tensors, **kwargs)  # type: ignore[attr-defined]

  6% 1/16 [00:01<00:21,  1.43s/it][A
 12% 2/16 [00:02<00:13,  1.07it/s][A
 19% 3/16 [00:02<00:10,  1.27it/s][A
 25% 4/16 [00:03<00:08,  1.36it/s][A
 31% 5/16 [00:03<00:07,  1.48it/s][A
 38% 6/16 [00:04<00:06,  1.53it/s][A
 44% 7/16 [00:05<00:05,  1.58it/s][A
 50% 8/16 [00:05<00:04,  1.62it/s][A
 56% 9/16 [00:06<00:04,  1.63it/s][A
 62% 10/16 [00:06<00:03,  1.66it/s][A
 69% 11/16 [00:07<00:03,  1.66it/s][A
 75% 12/16 [00:08<00:02,  1.65it/s][A
 81% 13/16 [00:08<00:01,  1.61it/s][A
 88% 14/16 [00:09<00:01,  1.61it/s][A
 94% 15/16 [00:09<00:00,  1.64it/s][A
100% 16/16 [00:10<00:00,  1.52it/s][A

완료. 성공 16, 실패 0, 출력: ../data/Patient_data/new_code/json/M01_VISIT2_상지_json





In [5]:
# --- 15fps 예측(JSON) → 30fps 업샘플: JSON 저장 없이 '오버레이 이미지'만 저장 ---
import os, re, glob, json            # 경로/정렬/파일검색/JSON
import cv2                           # OpenCV (이미지 로드/저장, 그리기)
import numpy as np                   # 수치 계산
from tqdm import tqdm                # 진행률 표시

VIS_DIR=  paths["VIS_DIR"]

# ===== 자연 정렬 =====
def natural_key(s: str):
    base = os.path.basename(s)
    return [int(t) if t.isdigit() else t.lower() for t in re.split(r'(\d+)', base)]

# ===== 30fps 전체 프레임 목록 =====
all_frames = sorted(
    [p for p in glob.glob(os.path.join(FRAME_DIR, "*")) if p.lower().endswith((".jpg",".png",".jpeg"))],
    key=natural_key
)
assert all_frames, f"원본 프레임이 없습니다: {FRAME_DIR}"

# ===== 15fps JSON 맵 =====
src_json_files = sorted(glob.glob(os.path.join(JSON_DIR, "*.json")), key=natural_key)
assert src_json_files, f"15fps JSON이 없습니다: {JSON_DIR}"
src_json_map = {os.path.splitext(os.path.basename(p))[0]: p for p in src_json_files}

# ===== 시각화 파라미터 (단일 색) =====
KPT_THR         = 0.05                # 키포인트 신뢰도 임계값 (필요시 0.2~0.3으로 올려 노이즈 제거)
KEYPOINT_COLOR  = (0, 255, 0)         # 점 색 (BGR)
SKELETON_COLOR  = (255, 128, 0)       # 선 색 (BGR)
RADIUS          = 4                   # 점 반지름
THICKNESS       = 2                   # 선 두께
ANTI_ALIAS      = cv2.LINE_AA         # 안티앨리어싱

# ===== 스켈레톤 링크 얻기 =====
def get_links(meta: dict):
    if "skeleton_links" in meta:
        return meta["skeleton_links"]
    if "skeleton" in meta:
        return meta["skeleton"]
    return []

# ===== 한 인스턴스 그리기 (단일 색, bbox X) =====
def draw_instance_uniform(img_bgr, instance: dict, links, kpt_thr=0.05):
    kpts = np.array(instance["keypoints"], dtype=np.float32)
    ksc  = np.array(instance["keypoint_scores"], dtype=np.float32)
    # 키포인트(점)
    for xy, sc in zip(kpts, ksc):
        if sc < kpt_thr:
            continue
        x, y = int(xy[0]), int(xy[1])
        cv2.circle(img_bgr, (x, y), RADIUS, KEYPOINT_COLOR, -1, lineType=ANTI_ALIAS)
    # 스켈레톤(선)
    for link in links:
        if isinstance(link, dict) and "link" in link:
            i, j = link["link"]
        elif isinstance(link, (list, tuple)) and len(link) == 2:
            i, j = link
        else:
            continue
        if i >= len(kpts) or j >= len(kpts):
            continue
        if ksc[i] < kpt_thr or ksc[j] < kpt_thr:
            continue
        p1 = (int(kpts[i][0]), int(kpts[i][1]))
        p2 = (int(kpts[j][0]), int(kpts[j][1]))
        cv2.line(img_bgr, p1, p2, SKELETON_COLOR, THICKNESS, lineType=ANTI_ALIAS)

# ===== 메인: 30fps 모든 프레임에 대해 오버레이 이미지 저장 (JSON은 저장하지 않음) =====
last_json_data = None
written_img, skipped = 0, 0

for fpath in tqdm(all_frames, desc="Overlay 30fps", unit="frame"):
    base = os.path.splitext(os.path.basename(fpath))[0]           # 예: frame_000123
    # 15fps JSON 있으면 로드, 없으면 직전 JSON을 사용(Zero-Order Hold)
    src_json_path = src_json_map.get(base, None)
    if src_json_path is not None:
        with open(src_json_path, "r", encoding="utf-8") as f:
            json_data = json.load(f)
        last_json_data = json_data
    else:
        if last_json_data is None:
            # (초반부 예외 처리) 앞으로 보이는 첫 JSON 1회용 사용
            next_json_path = None
            for nb in sorted(src_json_map.keys(), key=natural_key):
                if nb > base:
                    next_json_path = src_json_map[nb]
                    break
            if next_json_path is None:
                skipped += 1
                continue
            with open(next_json_path, "r", encoding="utf-8") as f:
                json_data = json.load(f)
        else:
            json_data = last_json_data

    # 원본 프레임 로드
    img_bgr = cv2.imread(fpath)
    if img_bgr is None:
        skipped += 1
        continue

    # 스켈레톤/키포인트 그리기
    meta      = json_data.get("meta_info", {})
    instances = json_data.get("instance_info", [])
    links     = get_links(meta)
    for inst in instances:
        draw_instance_uniform(img_bgr, inst, links, kpt_thr=KPT_THR)

    # 오버레이 이미지 저장
    out_img_path = os.path.join(VIS_DIR, base + ".jpg")
    if cv2.imwrite(out_img_path, img_bgr):
        written_img += 1
    else:
        skipped += 1

print(f"완료: 오버레이 이미지 {written_img}개 저장, 스킵 {skipped}개")


Overlay 30fps:   0% 11/11370 [00:02<44:51,  4.22frame/s]


KeyboardInterrupt: 

In [None]:
# --- 업샘플된 30fps 시각화 프레임 → 30fps MP4 (tqdm 진행률, 지정 경로 저장) ---
import os, re, glob, cv2
from pathlib import Path
from tqdm import tqdm

assert 'paths' in globals(), "앞 셀에서 paths를 먼저 생성하세요."

# 프레임 폴더(VIS_DIR)와 출력 파일 경로(OUTPUT_MP4)를 paths에서 사용
VIS_DIR  = paths["VIS_DIR"]
SAVE_MP4 = paths["OUTPUT_MP4"]
Path(Path(SAVE_MP4).parent).mkdir(parents=True, exist_ok=True)  # 출력 폴더 보장

def natural_key(s: str):
    base = os.path.basename(s)
    return [int(t) if t.isdigit() else t.lower() for t in re.split(r'(\d+)', base)]

# 프레임 수집 (자연 정렬)
frames = sorted(
    [p for p in glob.glob(os.path.join(VIS_DIR, "*")) if p.lower().endswith((".jpg", ".png", ".jpeg"))],
    key=natural_key
)
assert frames, f"프레임 이미지가 없습니다: {VIS_DIR}"

# 비디오 라이터 초기화
first = cv2.imread(frames[0])
assert first is not None, f"첫 프레임 로드 실패: {frames[0]}"
h, w = first.shape[:2]
fourcc = cv2.VideoWriter_fourcc(*"mp4v")  # 호환성 좋은 코덱
writer = cv2.VideoWriter(SAVE_MP4, fourcc, 30, (w, h))

# 인코딩 루프 (tqdm 진행률)
for f in tqdm(frames, desc="Encoding 30fps MP4", unit="frame"):
    img = cv2.imread(f)
    if img is None:
        print(f"[경고] 프레임 로드 실패: {f}")
        continue
    if img.shape[:2] != (h, w):
        img = cv2.resize(img, (w, h), interpolation=cv2.INTER_AREA)
    writer.write(img)

writer.release()
print("30fps 비디오 저장 완료:", SAVE_MP4)


In [1]:
# --- 15fps 예측(JSON) → 30fps 업샘플: '오버레이 이미지' 저장 옵션 + tqdm 진행률로 바로 MP4 인코딩 ---
import os, re, glob, json                              # 경로/정렬/파일검색/JSON 파싱을 위한 표준 라이브러리를 임포트합니다.
import cv2                                             # 이미지 로드/그리기/비디오 인코딩을 위해 OpenCV를 임포트합니다.
import numpy as np                                     # 수치 연산을 위해 NumPy를 임포트합니다.
from tqdm import tqdm                                   # 진행률 표시를 위해 tqdm을 임포트합니다.
from pathlib import Path                                # 출력 디렉터리 보장을 위해 Path를 임포트합니다.
import time                                            # 처리 속도(FPS) 측정을 위해 time 모듈을 임포트합니다.

assert 'paths' in globals(), "앞 셀에서 paths를 먼저 생성하세요."  # 상위 셀에서 경로 딕셔너리(paths)가 정의되었는지 확인합니다.
FRAME_DIR = paths["FRAME_DIR"]                         # 원본 30fps 프레임 폴더 경로를 가져옵니다.
JSON_DIR  = paths["JSON_DIR"]                          # 15fps JSON 폴더 경로를 가져옵니다.
VIS_DIR   = paths["VIS_DIR"]                           # (선택) 오버레이 이미지를 저장할 폴더 경로를 가져옵니다.
SAVE_MP4  = paths["OUTPUT_MP4"]                        # 최종 출력 MP4 파일 경로를 가져옵니다.

SAVE_OVERLAY_IMAGES = False                            # 오버레이 이미지를 파일로도 저장할지 여부(False: 저장 안 함, True: 저장)를 설정합니다.
FPS_OUT = 30                                           # 출력 비디오 FPS를 설정합니다.

# ===== 자연 정렬 유틸 =====
def natural_key(s: str):                               # 파일명을 사람 친화적으로 정렬하기 위한 키를 생성하는 함수를 정의합니다.
    base = os.path.basename(s)                         # 경로에서 파일명만 분리합니다.
    return [int(t) if t.isdigit() else t.lower()       # 숫자는 정수로 변환, 나머지는 소문자 문자열로 변환하여
            for t in re.split(r'(\d+)', base)]         # 숫자/문자 경계 기준으로 분리된 토큰 리스트를 반환합니다.

# ===== 스켈레톤 링크 추출 =====
def get_links(meta: dict):                             # JSON 메타 정보에서 스켈레톤 연결 정보를 얻는 함수를 정의합니다.
    if "skeleton_links" in meta:                       # 새 키 이름(skeleton_links)이 존재하면
        return meta["skeleton_links"]                  # 해당 값을 반환합니다.
    if "skeleton" in meta:                             # 구 키 이름(skeleton)이 존재하면
        return meta["skeleton"]                        # 해당 값을 반환합니다.
    return []                                          # 없으면 빈 리스트를 반환합니다.

# ===== 시각화 파라미터 (단일 색) =====
KPT_THR        = 0.05                                  # 키포인트 신뢰도 임계값(낮은 점 필터링)을 설정합니다.
KEYPOINT_COLOR = (0, 255, 0)                           # 키포인트 점 색상(BGR, 녹색)을 지정합니다.
SKELETON_COLOR = (255, 128, 0)                         # 스켈레톤 선 색상(BGR, 주황)을 지정합니다.
RADIUS         = 4                                     # 키포인트 점 반지름을 지정합니다.
THICKNESS      = 2                                     # 스켈레톤 선 두께를 지정합니다.
ANTI_ALIAS     = cv2.LINE_AA                           # 안티앨리어싱 라인 타입을 사용합니다.

# ===== 단일 인스턴스 오버레이 그리기 =====
def draw_instance_uniform(img_bgr, instance: dict, links, kpt_thr=0.05):  # 한 인스턴스의 키포인트/스켈레톤을 그리는 함수를 정의합니다.
    kpts = np.array(instance["keypoints"], dtype=np.float32)               # 키포인트 좌표 배열을 float32로 변환합니다.
    ksc  = np.array(instance["keypoint_scores"], dtype=np.float32)         # 키포인트 신뢰도 배열을 float32로 변환합니다.
    for xy, sc in zip(kpts, ksc):                                          # 각 키포인트 좌표와 신뢰도를 순회합니다.
        if sc < kpt_thr:                                                   # 신뢰도가 임계값보다 낮으면
            continue                                                       # 그리기를 건너뜁니다.
        x, y = int(xy[0]), int(xy[1])                                      # 좌표를 정수 픽셀 값으로 변환합니다.
        cv2.circle(img_bgr, (x, y), RADIUS, KEYPOINT_COLOR, -1, lineType=ANTI_ALIAS)  # 채워진 원으로 점을 그립니다.
    for link in links:                                                     # 링크 정의를 순회하며 선을 그립니다.
        if isinstance(link, dict) and "link" in link:                      # 딕셔너리 형태 { "link": [i, j] }를 처리합니다.
            i, j = link["link"]                                            # 인덱스 쌍을 추출합니다.
        elif isinstance(link, (list, tuple)) and len(link) == 2:           # [i, j] 또는 (i, j) 형태를 처리합니다.
            i, j = link                                                    # 인덱스 쌍을 가져옵니다.
        else:                                                              # 그 외 형식은
            continue                                                       # 무시합니다.
        if i >= len(kpts) or j >= len(kpts):                               # 인덱스가 범위를 벗어나면
            continue                                                       # 그리기를 건너뜁니다.
        if ksc[i] < kpt_thr or ksc[j] < kpt_thr:                           # 연결된 점 중 하나라도 임계값 미만이면
            continue                                                       # 선을 그리지 않습니다.
        p1 = (int(kpts[i][0]), int(kpts[i][1]))                            # 첫 번째 점 좌표를 정수로 준비합니다.
        p2 = (int(kpts[j][0]), int(kpts[j][1]))                            # 두 번째 점 좌표를 정수로 준비합니다.
        cv2.line(img_bgr, p1, p2, SKELETON_COLOR, THICKNESS, lineType=ANTI_ALIAS)  # 두 점을 선으로 연결해 그립니다.

# ===== 30fps 전체 프레임 수집 =====
all_frames = sorted(                                                       # 모든 입력 프레임을 자연 정렬로 수집합니다.
    [p for p in glob.glob(os.path.join(FRAME_DIR, "*"))                    # 폴더 내 모든 항목 중에서
     if p.lower().endswith((".jpg", ".png", ".jpeg"))],                    # 이미지 확장자만 필터링합니다.
    key=natural_key                                                        # 자연 정렬 키를 사용합니다.
)
assert all_frames, f"원본 프레임이 없습니다: {FRAME_DIR}"                   # 프레임이 없으면 실행을 중단합니다.

# ===== 15fps JSON 맵 구성 =====
src_json_files = sorted(                                                   # 15fps JSON 파일 목록을 자연 정렬로 수집합니다.
    glob.glob(os.path.join(JSON_DIR, "*.json")),                           # JSON 폴더에서 *.json 파일을 찾습니다.
    key=natural_key                                                        # 자연 정렬 키를 사용합니다.
)
assert src_json_files, f"15fps JSON이 없습니다: {JSON_DIR}"                 # JSON이 없으면 실행을 중단합니다.
src_json_map = {os.path.splitext(os.path.basename(p))[0]: p                # 파일명(확장자 제외)을 키로 하고
                for p in src_json_files}                                   # 전체 경로를 값으로 하는 맵을 생성합니다.
sorted_json_keys = sorted(src_json_map.keys(), key=natural_key)            # 자연 정렬된 JSON 키 리스트를 미리 만들어 둡니다.

# ===== 비디오 라이터 초기화 =====
first_img = cv2.imread(all_frames[0])                                      # 첫 프레임을 로드해 해상도를 파악합니다.
assert first_img is not None, f"첫 프레임 로드 실패: {all_frames[0]}"     # 첫 프레임 로드 실패 시 중단합니다.
H, W = first_img.shape[:2]                                                 # 프레임의 높이/너비를 얻습니다.
Path(Path(SAVE_MP4).parent).mkdir(parents=True, exist_ok=True)             # 출력 비디오 폴더를 생성(이미 있으면 무시)합니다.
fourcc = cv2.VideoWriter_fourcc(*"mp4v")                                   # 호환성 좋은 mp4v 코덱을 사용합니다.
writer = cv2.VideoWriter(SAVE_MP4, fourcc, FPS_OUT, (W, H))                # 설정한 FPS/해상도로 비디오 라이터를 생성합니다.
assert writer.isOpened(), f"비디오 라이터 초기화 실패: {SAVE_MP4}"          # 라이터가 정상 오픈되었는지 확인합니다.

# ===== 선택적 오버레이 이미지 저장 준비 =====
if SAVE_OVERLAY_IMAGES:                                                    # 오버레이 이미지 파일 저장 옵션이 True인 경우
    Path(VIS_DIR).mkdir(parents=True, exist_ok=True)                       # 저장 디렉터리를 생성합니다.

# ===== 메인 루프: tqdm 진행률과 함께 오버레이 → 즉시 인코딩 =====
last_json_data = None                                                      # 직전(JSON 홀드)을 저장할 변수를 초기화합니다.
written_frames, skipped = 0, 0                                             # 기록된 프레임 수와 스킵 수를 초기화합니다.
t_start = time.time()                                                      # 처리 시작 시각을 기록합니다.

with tqdm(total=len(all_frames),                                           # 전체 프레임 수를 지정해 진행률 바를 생성합니다.
          desc="Overlay→Encode 30fps MP4",                                 # 진행률 바 설명을 설정합니다.
          unit="frame",                                                    # 단위를 frame으로 표시합니다.
          dynamic_ncols=True,                                              # 터미널 너비에 따라 칼럼을 동적으로 조정합니다.
          leave=True) as pbar:                                             # 완료 후에도 진행률 바를 남겨둡니다.
    for fpath in all_frames:                                               # 모든 프레임을 순회하며 처리합니다.
        base = os.path.splitext(os.path.basename(fpath))[0]                # 현재 프레임의 파일명(확장자 제외)을 구합니다.
        src_json_path = src_json_map.get(base, None)                       # 같은 이름의 15fps JSON이 있는지 조회합니다.

        if src_json_path is not None:                                      # 일치하는 JSON이 있는 경우
            with open(src_json_path, "r", encoding="utf-8") as f:          # JSON 파일을 텍스트 모드로 엽니다.
                json_data = json.load(f)                                   # JSON을 파싱하여 딕셔너리로 읽습니다.
            last_json_data = json_data                                     # 최근 JSON 캐시를 갱신합니다.
        else:                                                              # 일치 JSON이 없는 경우
            if last_json_data is None:                                     # 아직 어떤 JSON도 캐시되지 않은 초기 상태라면
                json_data = None                                           # 임시 변수 초기값을 None으로 둡니다.
                for k in sorted_json_keys:                                 # 자연 정렬된 키들을 순회합니다.
                    if natural_key(k) > natural_key(base):                 # 현재 프레임명보다 "이후"에 오는 JSON을 찾습니다.
                        with open(src_json_map[k], "r", encoding="utf-8") as f:  # 해당 JSON 파일을 엽니다.
                            json_data = json.load(f)                        # JSON을 파싱합니다.
                        last_json_data = json_data                          # 캐시를 업데이트합니다.
                        break                                              # 첫 후보를 찾았으므로 탐색을 중지합니다.
                if json_data is None:                                      # 끝까지 못 찾은 경우(비정상 시나리오)
                    skipped += 1                                           # 스킵 카운터를 증가시킵니다.
                    pbar.update(1)                                         # 진행률 바를 한 칸 진행합니다.
                    pbar.set_postfix(written=written_frames, skipped=skipped)  # 현재 누적 통계를 표기합니다.
                    continue                                               # 다음 프레임으로 넘어갑니다.
            else:                                                          # 직전 JSON이 있는 경우
                json_data = last_json_data                                 # Zero-Order Hold 방식으로 재사용합니다.

        img_bgr = cv2.imread(fpath)                                        # 현재 프레임 이미지를 로드합니다.
        if img_bgr is None:                                                # 로드 실패 시
            skipped += 1                                                   # 스킵 카운터를 증가시킵니다.
            pbar.update(1)                                                 # 진행률 바를 한 칸 진행합니다.
            pbar.set_postfix(written=written_frames, skipped=skipped)      # 현재 누적 통계를 표기합니다.
            continue                                                       # 다음 프레임으로 넘어갑니다.

        if img_bgr.shape[:2] != (H, W):                                    # 프레임 해상도가 기준과 다르면
            img_bgr = cv2.resize(img_bgr, (W, H), interpolation=cv2.INTER_AREA)  # 비디오 해상도에 맞게 리사이즈합니다.

        meta      = json_data.get("meta_info", {})                         # 메타 정보를 안전하게 가져옵니다.
        instances = json_data.get("instance_info", [])                     # 인스턴스 리스트를 안전하게 가져옵니다.
        links     = get_links(meta)                                        # 스켈레톤 링크 정보를 얻습니다.
        for inst in instances:                                             # 각 인스턴스를 순회하며
            draw_instance_uniform(img_bgr, inst, links, kpt_thr=KPT_THR)   # 키포인트와 스켈레톤을 단일 색으로 그립니다.

        writer.write(img_bgr)                                              # 그려진 프레임을 즉시 비디오에 기록합니다.
        written_frames += 1                                                # 기록된 프레임 수를 갱신합니다.

        if SAVE_OVERLAY_IMAGES:                                            # 이미지 저장 옵션이 활성화되었다면
            out_img_path = os.path.join(VIS_DIR, base + ".jpg")            # 저장 경로를 구성합니다.
            _ = cv2.imwrite(out_img_path, img_bgr)                         # 오버레이 이미지를 파일로 저장합니다.

        elapsed = time.time() - t_start                                    # 경과 시간을 계산합니다.
        fps_now = written_frames / max(elapsed, 1e-6)                      # 평균 처리 FPS를 계산합니다.
        pbar.update(1)                                                     # 진행률 바를 한 칸 진행합니다.
        pbar.set_postfix(written=written_frames, skipped=skipped, fps=f"{fps_now:0.2f}")  # 통계(FPS 포함)를 표시합니다.

writer.release()                                                            # 비디오 라이터를 닫아 파일을 완료합니다.
total_sec = max(time.time() - t_start, 1e-6)                                # 총 경과 시간을 계산합니다.
avg_fps = written_frames / total_sec                                        # 전체 평균 FPS를 계산합니다.
print(f"완료: 비디오 저장 {SAVE_MP4}, 프레임 {written_frames}개 기록, 스킵 {skipped}개, 평균 FPS {avg_fps:0.2f}")  # 처리 요약을 출력합니다.


  from .autonotebook import tqdm as notebook_tqdm
  from pkg_resources import DistributionNotFound, get_distribution


📂 Paths
MP4_PATH     : ../../../../data/김원 보산진 연구/M01_VISIT2_상지.MP4  ✅
FRAME_DIR    : ../data/Patient_data/new_code/frames/M01_VISIT2_상지_frame  ✅
JSON_DIR     : ../data/Patient_data/new_code/json/M01_VISIT2_상지_json  ✅
VIS_DIR      : ../data/Patient_data/new_code/vis/M01_VISIT2_상지_vis  ✅
OUTPUT_MP4   : ../data/Patient_data/new_code/output/M01_VISIT2_상지_output.mp4  ✅


Pass-1→FixedBBox:   6% 102/1798 [00:03<00:54, 30.89frame/s]


KeyboardInterrupt: 