# 개요

## function

### Path

In [1]:
import sys
sys.path.append("/workspace/nas203/ds_RehabilitationMedicineData/IDs/tojihoo/ASAN_01_Repeatition_Counter")

from pathlib import Path

# --- 설정된 경로 변수 ---
TXT_PATH = Path('/workspace/nas203/ds_RehabilitationMedicineData/IDs/tojihoo/Won_Kim_research_at_Bosanjin/M02/M02_VISIT2/segment_frames.txt')
FRAME_PATH = Path('/workspace/nas203/ds_RehabilitationMedicineData/IDs/tojihoo/3_project_HCCmove/data/1_FRAME/Won_Kim_research_at_Bosanjin/M02_VISIT2')
KEYPOINTS_PATH = Path('/workspace/nas203/ds_RehabilitationMedicineData/IDs/tojihoo/3_project_HCCmove/data/2_KEYPOINTS/Won_Kim_research_at_Bosanjin/M02_VISIT2')
OUTPUT_DIR = Path('/workspace/nas203/ds_RehabilitationMedicineData/IDs/tojihoo/ASAN_01_Repeatition_Counter/')

### TxT parsing

In [2]:
from pathlib import Path
from typing import Optional, Tuple

# 반환 타입이 확장되어 (레이블, 시작 프레임, 종료 프레임, 촬영 각도, 동작 이름)이 됩니다.
def parse_nth_segment_info_from_file(txt_path: Path, num: int = 1) -> Optional[Tuple[str, int, int, str, str]]:
    # 순서 번호(num) 유효성 검사
    if num <= 0:
        print("[ERROR] 순서 번호(num)는 1 이상이어야 합니다.")
        return None
        
    data_line = "" # ValueError에서 참조할 수 있도록 초기화
    
    try:
        # 1. 파일 내용 읽기
        with open(txt_path, 'r', encoding='utf-8') as f:
            lines = f.readlines()
        
        valid_lines = []
        # 2. 주석 (#) 라인과 빈 라인을 제외한 유효한 데이터 라인만 수집
        for line in lines:
            line = line.strip()
            if line and not line.startswith('#'):
                valid_lines.append(line)
        
        # 요청된 세그먼트가 존재하는지 확인
        if len(valid_lines) < num:
            print(f"[ERROR] 파일에 유효한 세그먼트 데이터가 {len(valid_lines)}개 있습니다. {num}번째 세그먼트는 존재하지 않습니다.")
            return None

        # 3. N번째 데이터 라인 파싱 (리스트 인덱스는 num - 1)
        data_line = valid_lines[num - 1]
        
        # 쉼표 또는 공백으로 구분된 데이터를 파싱
        # 예시: "frontal__biceps_curl__1 390 1860"
        parts = data_line.replace(',', ' ').split() 
        
        if len(parts) < 3:
            print(f"[ERROR] 데이터 라인 형식이 'label, start, end'가 아닙니다: {data_line}")
            return None

        # 4. 프레임 정보 추출 및 타입 변환
        full_label = parts[0].strip()
        start_frame = int(parts[1].strip())
        end_frame = int(parts[2].strip())
        
        # 5. 레이블 세분화 (추가된 부분)
        # full_label: "frontal__biceps_curl__1"을 "__" 기준으로 분리
        label_parts = full_label.split('__')
        
        if len(label_parts) >= 3:
            # 촬영 각도(예: frontal)
            view_angle = label_parts[0].strip()
            # 동작 이름(예: biceps_curl)
            action_name = label_parts[1].strip()
            # 세트 번호(예: 1) - 현재는 사용하지 않으므로 포함하지 않아도 되지만, 
            # 일관성을 위해 label_parts[2]는 무시하고 촬영각도와 동작 이름만 반환합니다.
        elif len(label_parts) == 2:
             # 구분자가 부족할 경우 예외 처리
            view_angle = label_parts[0].strip()
            action_name = label_parts[1].strip()
            print(f"[WARN] 세트 번호(__1)가 없는 레이블 형식입니다. ({full_label})")
        else:
            # 레이블 형식이 'A__B__C'가 아닌 경우
            print(f"[WARN] 레이블 형식(A__B__C)이 예상과 다릅니다. 전체 레이블 사용: {full_label}")
            view_angle = "UNKNOWN"
            action_name = full_label
            
        # 전체 레이블, 시작, 종료, 촬영각도, 동작이름을 반환
        return full_label, start_frame, end_frame, view_angle, action_name
            
    except FileNotFoundError:
        print(f"[FATAL] TXT 파일 찾을 수 없음: {txt_path}")
        return None
    except ValueError as e:
        # 프레임 번호 변환 오류 시 data_line을 표시
        print(f"[ERROR] 프레임 번호 변환 오류. 데이터 라인({data_line})을 확인하세요. 오류: {e}")
        return None
    except Exception as e:
        print(f"[ERROR] 예기치 않은 오류 발생: {e}")
        return None

#### test

In [3]:
# --- 사용 예시 ---
# 전역 변수 설정 (사용자 정의 경로)
TXT_PATH = Path('/workspace/nas203/ds_RehabilitationMedicineData/IDs/tojihoo/Won_Kim_research_at_Bosanjin/M02/M02_VISIT2/segment_frames.txt')
parse_nth_segment_info_from_file(TXT_PATH,1)

('frontal__biceps_curl__1', 390, 1860, 'frontal', 'biceps_curl')

### create video function

In [4]:
import cv2
import json
import numpy as np
from pathlib import Path
from tqdm import tqdm
from typing import Optional, List, Any
from functions.constants_skeleton.registry import load_skeleton_constants

def render_skeleton_video(
    frame_dir: str,
    json_dir: str,
    out_mp4: str,
    start_frame: Optional[int] = None, # <--- 구간 시작 번호
    end_frame: Optional[int] = None,   # <--- 구간 종료 번호
    fps: int = 30,
    kp_radius: int = 4,
    line_thickness: int = 2,
    model_type: str = "coco17",
    flip_horizontal: bool = True
):
    """
    프레임 + keypoints JSON → skeleton overlay mp4 생성
    - start_frame, end_frame이 지정되면 해당 구간만 처리합니다.
    - 프레임 파일명은 6자리 0-패딩 (예: 000000.jpg)을 가정합니다.
    - model_type: 'coco17', 'yolo12' 등 constants_skeleton에서 로드
    - flip_horizontal: 좌우 반전 여부 (기본 False)
    """
    frame_dir_path = Path(frame_dir)
    json_dir_path = Path(json_dir)
    out_mp4_path = Path(out_mp4)
    out_mp4_path.parent.mkdir(parents=True, exist_ok=True)
    
    frame_files: List[Path] = []
    
    # 1. 처리할 프레임 파일 목록 결정
    if start_frame is not None and end_frame is not None and start_frame <= end_frame:
        # Case 1: 시작/종료 번호가 지정된 경우
        frame_numbers = range(start_frame, end_frame + 1)
        # 6자리 0-패딩 파일명으로 경로를 생성합니다.
        frame_files = [frame_dir_path / f"{fn:06d}.jpg" for fn in frame_numbers]
        print(f"[INFO] Rendering segment: Frames {start_frame} to {end_frame} ({len(frame_files)} expected).")
    else:
        # Case 2: 전체 프레임 처리 (기존 glob 동작)
        frame_files = sorted(frame_dir_path.glob("*.jpg"))
        
        if frame_files:
            first = frame_files[0].stem
            last = frame_files[-1].stem
            print(f"[INFO] Rendering all frames ({len(frame_files)} found). Range: {first} to {last}.")
        else:
            print(f"[WARN] No frames found in {frame_dir_path} directory.")
            return

    # 2. 상수 로드
    const = load_skeleton_constants(model_type)
    COLOR_SK = const.COLOR_SK
    COLOR_L = const.COLOR_L
    COLOR_R = const.COLOR_R
    COLOR_NEUTRAL = const.COLOR_NEUTRAL
    LEFT_POINTS = const.LEFT_POINTS
    RIGHT_POINTS = const.RIGHT_POINTS
    EXCLUDE_POINTS = getattr(const, "EXCLUDE_POINTS", [])
    SKELETON_LINKS = getattr(const, "SKELETON_LINKS", [])

    # 3. 해상도 확인 및 VideoWriter 설정
    # 실제로 존재하는 프레임만 필터링하여 첫 번째 프레임으로 사용
    valid_frame_paths = [p for p in frame_files if p.exists()]
    if not valid_frame_paths:
         print("[WARN] No actual frame files exist in the specified range.")
         return
         
    sample = cv2.imread(str(valid_frame_paths[0]))
    # ⚠️ 사용자의 요청에 따라 첫 번째 프레임 로드 실패 시 오류 처리 코드를 제거했습니다.
    # if sample is None:
    #     print(f"[ERROR] Could not read first valid frame: {valid_frame_paths[0]}")
    #     return
        
    h, w = sample.shape[:2]
    
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    writer = cv2.VideoWriter(str(out_mp4_path), fourcc, fps, (w, h))
    
    if not writer.isOpened():
        print(f"[ERROR] VideoWriter failed to open. Check codec/path: {out_mp4_path}")
        return

    # 4. 프레임 순회 및 렌더링
    for frame_path in tqdm(frame_files, total=len(frame_files),
                           desc=f"Rendering to {out_mp4_path.name}", unit="frame"):
                           
        if not frame_path.exists():
            continue # 해당 프레임이 지정된 구간에 없으면 건너뛰기

        frame = cv2.imread(str(frame_path))
        if frame is None:
            continue
            
        json_path = json_dir_path / (frame_path.stem + ".json")

        if not json_path.exists():
            writer.write(frame)
            continue

        # JSON 로드 및 렌더링
        try:
            with open(json_path, "r", encoding="utf-8") as f:
                data = json.load(f)

            if "instance_info" in data and len(data["instance_info"]) > 0:
                for person in data["instance_info"]:
                    kpts = np.array(person.get("keypoints", []))

                    if kpts.ndim != 2 or kpts.shape[0] == 0:
                        continue

                    # Skeleton 라인
                    for i, j in SKELETON_LINKS:
                        if i >= len(kpts) or j >= len(kpts) or i in EXCLUDE_POINTS or j in EXCLUDE_POINTS:
                            continue
                        
                        # 좌표 유효성 검사 (0보다 큰 값)
                        pt1_x, pt1_y = kpts[i]
                        pt2_x, pt2_y = kpts[j]
                        if pt1_x <= 0 or pt1_y <= 0 or pt2_x <= 0 or pt2_y <= 0:
                             continue
                             
                        pt1, pt2 = tuple(map(int, kpts[i])), tuple(map(int, kpts[j]))
                        cv2.line(frame, pt1, pt2, COLOR_SK, line_thickness)

                    # Keypoints 점
                    for idx, (x, y) in enumerate(kpts):
                        if idx in EXCLUDE_POINTS or x <= 0 or y <= 0:
                            continue
                        if idx in LEFT_POINTS:
                            color = COLOR_L
                        elif idx in RIGHT_POINTS:
                            color = COLOR_R
                        else:
                            color = COLOR_NEUTRAL
                        cv2.circle(frame, (int(x), int(y)), kp_radius, color, -1)
        except Exception as e:
            print(f"[WARN] JSON processing error for {frame_path.stem}: {e}")
            # 오류 발생 시 스켈레톤 없이 원본 프레임만 기록
            pass 

        # 안내 문구 및 프레임 번호 표시
        frame_num_int = int(frame_path.stem)
        legend_text = f"L: Blue | R: Red | Frame: {frame_num_int}"
        cv2.putText(frame, legend_text, (20, h - 30),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 255, 255), 2, cv2.LINE_AA)

        # 좌우 반전 (선택)
        if flip_horizontal:
            frame = cv2.flip(frame, 1)

        writer.write(frame)

    writer.release()
    print(f"✅ Skeleton overlay 완료 → {out_mp4_path}")

#### test

In [5]:


# # ----------------------------------------------------------------------
# # 예시 실행 코드 (경로 및 실행 설정)
# # ----------------------------------------------------------------------

# # ⚠️ 이 경로는 사용자의 환경에 맞춰 실제 데이터 경로로 변경되어야 합니다.
# FRAME_DIR = '/workspace/nas203/ds_RehabilitationMedicineData/IDs/tojihoo/3_project_HCCmove/data/1_FRAME/Won_Kim_research_at_Bosanjin/M02_VISIT2'
# JSON_DIR = '/workspace/nas203/ds_RehabilitationMedicineData/IDs/tojihoo/3_project_HCCmove/data/2_KEYPOINTS/Won_Kim_research_at_Bosanjin/M02_VISIT2'
# OUTPUT_DIR = '/workspace/nas203/ds_RehabilitationMedicineData/IDs/tojihoo/ASAN_01_Repeatition_Counter/'
# OUTPUT_DIR_PATH = Path(OUTPUT_DIR)
# OUTPUT_DIR_PATH.mkdir(exist_ok=True)

# # --- 실행 1: 특정 구간만 렌더링 (start/end num 지정) ---
# # 예시: 프레임 100부터 150까지 렌더링
# OUT_FILE_SEGMENT = OUTPUT_DIR_PATH / "segment_100_150.mp4"
# print("\n--- [실행 1] 특정 구간 렌더링 (프레임 100~150) ---")

# render_skeleton_video(
#     frame_dir=FRAME_DIR,
#     json_dir=JSON_DIR,
#     out_mp4=str(OUT_FILE_SEGMENT),
#     start_frame=100,
#     end_frame=150,
#     flip_horizontal=False 
# )

### load_kpt

In [6]:
import json
import numpy as np
from pathlib import Path
from typing import List, Tuple, Dict, Any

def load_kpt(
    json_dir: str,
    start_frame: int,
    end_frame: int
) -> Dict[str, Any]:
    """
    지정된 디렉토리에서 시작 프레임부터 끝 프레임까지의 키포인트 JSON 파일을 로드하고,
    'keypoint_id2name', 'skeleton_links'는 메타 정보로 한 번만 추출하며,
    'instance_info'는 프레임별로 추출하여 단일 딕셔너리로 반환합니다.
    """
    
    # 최종 결과 딕셔너리 초기화
    result_data = {
        'meta_info': {},  # keypoint_id2name, skeleton_links 저장
        'frame_data': []  # (프레임 번호, instance_info) 리스트 저장
    }
    
    base_path = Path(json_dir)
    meta_loaded = False
    
    # 1. 프레임 순회 및 파일 경로 조합
    for frame_num in range(start_frame, end_frame + 1): 
        
        filename = f"{frame_num:06d}.json" 
        json_path = base_path / filename

        # 2. JSON 로드 및 데이터 필터링
        try:
            with open(json_path, "r", encoding="utf-8") as f:
                data = json.load(f)
            
            # 메타 정보는 첫 번째 프레임에서 한 번만 추출하여 저장
            if not meta_loaded:
                meta_info = data.get('meta_info', {})
                result_data['meta_info'] = {
                    'keypoint_id2name': meta_info.get('keypoint_id2name', {}),
                    'skeleton_links': meta_info.get('skeleton_links', []),
                }
                meta_loaded = True
            
            # 인스턴스 ID 할당
            instance_info = data.get('instance_info', [])
            processed_instances = []
            for i, instance in enumerate(instance_info):
                # 인스턴스 리스트 인덱스를 1부터 시작하는 ID로 할당
                # (프레임 내에서 '1번 사람', '2번 사람'으로 구분)
                instance['instance_id'] = i + 1 
                processed_instances.append(instance)

            # (프레임 번호, 인스턴스 정보 리스트) 형태로 저장
            result_data['frame_data'].append((frame_num, processed_instances))
        except json.JSONDecodeError:
            print(f"Error: JSON decoding failed for {json_path}")
        except Exception as e:
            print(f"An unexpected error occurred while loading {json_path}: {e}")
            
    return result_data

#### test

In [7]:
print(type(load_kpt(KEYPOINTS_PATH,5,6)))
load_kpt(KEYPOINTS_PATH,5,6)

An unexpected error occurred while loading /workspace/nas203/ds_RehabilitationMedicineData/IDs/tojihoo/3_project_HCCmove/data/2_KEYPOINTS/Won_Kim_research_at_Bosanjin/M02_VISIT2/000005.json: [Errno 2] No such file or directory: '/workspace/nas203/ds_RehabilitationMedicineData/IDs/tojihoo/3_project_HCCmove/data/2_KEYPOINTS/Won_Kim_research_at_Bosanjin/M02_VISIT2/000005.json'
An unexpected error occurred while loading /workspace/nas203/ds_RehabilitationMedicineData/IDs/tojihoo/3_project_HCCmove/data/2_KEYPOINTS/Won_Kim_research_at_Bosanjin/M02_VISIT2/000006.json: [Errno 2] No such file or directory: '/workspace/nas203/ds_RehabilitationMedicineData/IDs/tojihoo/3_project_HCCmove/data/2_KEYPOINTS/Won_Kim_research_at_Bosanjin/M02_VISIT2/000006.json'
<class 'dict'>
An unexpected error occurred while loading /workspace/nas203/ds_RehabilitationMedicineData/IDs/tojihoo/3_project_HCCmove/data/2_KEYPOINTS/Won_Kim_research_at_Bosanjin/M02_VISIT2/000005.json: [Errno 2] No such file or directory: '/w

{'meta_info': {}, 'frame_data': []}

### Calculate Angle

In [8]:
def calculate_angle(a, b, c):
    a = np.array(a) # 첫 번째 점을 numpy 배열로 변환
    b = np.array(b) # 중간 점을 numpy 배열로 변환
    c = np.array(c) # 세 번째 점을 numpy 배열로 변환

    radians = np.arctan2(c[1]-b[1], c[0]-b[0]) - np.arctan2(a[1]-b[1], a[0]-b[0]) # 아크탄젠트를 이용해 라디안 각도 계산
    angle = np.abs(radians*180.0/np.pi) # 라디안을 도(degree) 단위로 변환하고 절대값 취함

    if angle > 180.0: # 각도가 180도를 넘어가면
        angle = 360 - angle # 360도에서 뺀 값을 사용하여 내각을 구함

    return angle # 계산된 각도 반환

### track_keypoints_with_heuristics

In [9]:
import numpy as np
from typing import Dict, Any, List, Tuple
from scipy.spatial.distance import euclidean

# --- 헬퍼 함수 1: IOU 계산 ---
def calculate_iou(boxA: List[float], boxB: List[float]) -> float:

    # 좌표 추출
    xA = max(boxA[0], boxB[0])
    yA = max(boxA[1], boxB[1])
    xB = min(boxA[2], boxB[2])
    yB = min(boxA[3], boxB[3])

    # 교차 영역 계산 (겹치지 않으면 0)
    interArea = max(0, xB - xA) * max(0, yB - yA)
    
    # 두 박스의 개별 영역 계산
    boxAArea = (boxA[2] - boxA[0]) * (boxA[3] - boxA[1])
    boxBArea = (boxB[2] - boxB[0]) * (boxB[3] - boxB[1])

    # IOU 계산: 교차 영역 / 합집합 영역
    iou = interArea / float(boxAArea + boxBArea - interArea)
    return iou

# Pose 유사도 검사
def calculate_pose_distance(kpts_t: List[List[float]], kpts_t_plus_1: List[List[float]]) -> float:
       
    # 1. 키포인트 벡터 평탄화 (Flatten)
    vec_t = np.array(kpts_t).flatten()
    vec_t_plus_1 = np.array(kpts_t_plus_1).flatten()

    # 2. 정규화 (예시: 모든 키포인트를 평균 위치를 기준으로 이동)
    # 실제 Pose 트래킹에서는 BBox나 특정 관절(예: 엉덩이 중앙)을 기준으로 스케일 및 위치를 정규화해야 정확함.
    # 여기서는 간단히 전체 평균을 이용해 위치만 정규화합니다.
    
    if len(vec_t) != len(vec_t_plus_1) or len(vec_t) == 0:
        return float('inf')

    # 중심점 이동 (Center Shift Normalization)
    center_t = np.mean(vec_t.reshape(-1, 2), axis=0)
    center_t_plus_1 = np.mean(vec_t_plus_1.reshape(-1, 2), axis=0)
    
    # 정규화된 벡터 생성 (중심점 위치 이동)
    norm_vec_t = vec_t.reshape(-1, 2) - center_t
    norm_vec_t_plus_1 = vec_t_plus_1.reshape(-1, 2) - center_t_plus_1
    
    # 3. L2 거리 (유클리드 거리) 계산
    # 거리가 작을수록 자세가 유사함
    return euclidean(norm_vec_t.flatten(), norm_vec_t_plus_1.flatten())

# --- 메인 트래킹 함수 ---
def track_keypoints_with_heuristics(
    data: Dict[str, Any],
    iou_threshold: float = 0.5,
    max_center_distance: float = 150.0 # 픽셀 단위 최대 이동 거리 가정
) -> Dict[str, Any]:
    
    tracked_data = data.copy()
    
    # 1. 트래커 상태 초기화
    current_tid_counter = 1000  # TID는 1000부터 시작 (재활용되지 않음)
    # {TID: {'center': (x, y), 'bbox': [x1, y1, x2, y2], 'kpts': [[x, y], ...], 'lost_count': 0}}
    active_trackers = {} 

    # 2. BBox 형식 변환 헬퍼 함수
    def get_bbox_coord(instance):
        # instance['bbox']가 [[x1, y1, x2, y2]] 형태이므로 첫 번째 요소를 사용
        return instance['bbox'][0]
    
    def get_bbox_center(box):
        return ((box[0] + box[2]) / 2, (box[1] + box[3]) / 2)

    # 3. 프레임 순회하며 트래킹 수행
    new_frame_data = []
    for frame_num, current_instances in tracked_data['frame_data']:
        
        # 현재 프레임에 TID를 부여할 딕셔너리 리스트
        instances_with_tid = [] 
        
        # 현재 프레임의 인스턴스 중 아직 매칭되지 않은 인스턴스의 인덱스 리스트
        unmatched_new_indices = list(range(len(current_instances)))
        
        # 다음 프레임 추적을 위한 트래커 업데이트 딕셔너리
        next_active_trackers = {}
        
        # --- A. 계층적 매칭 단계 (현재 TID를 새로운 인스턴스와 매칭) ---
        
        for tid, tracker_state in active_trackers.items():
            
            best_match_index = -1
            best_iou = 0.0
            min_center_dist = float('inf')
            min_pose_dist = float('inf')
            
            matched_by_iou = False
            matched_by_dist = False

            # 매칭되지 않은 새로운 인스턴스들만 확인
            for i in unmatched_new_indices:
                new_instance = current_instances[i]
                new_bbox = get_bbox_coord(new_instance)
                
                # 1순위: IOU 기반 매칭
                iou = calculate_iou(tracker_state['bbox'], new_bbox)
                if iou >= iou_threshold and iou > best_iou:
                    best_iou = iou
                    best_match_index = i
                    matched_by_iou = True
                
                # 2순위: 중심점 거리 기반 매칭 (IOU 매칭이 없거나 약할 경우 대비)
                new_center = get_bbox_center(new_bbox)
                center_dist = euclidean(tracker_state['center'], new_center)
                if center_dist <= max_center_distance and center_dist < min_center_dist:
                    min_center_dist = center_dist
                    # IOU 매칭이 안됐거나, 현재 거리 매칭이 IOU 매칭보다 더 나은 대안일 경우를 고려
                    if not matched_by_iou and best_iou < iou_threshold:
                         best_match_index = i
                         matched_by_dist = True
            
            # --- B. 매칭 결과 처리 ---
            
            # 1. IOU나 거리로 확실한 매칭이 된 경우
            if best_match_index != -1 and (best_iou >= iou_threshold or min_center_dist < max_center_distance):
                matched_instance = current_instances[best_match_index]
                
                # Pose 유사도 검증 (선택적: 유사도가 너무 낮으면 매칭을 거부할 수도 있음)
                pose_dist = calculate_pose_distance(tracker_state['kpts'], matched_instance['keypoints'])
                # if pose_dist > 500: # 예시 임계값
                #    continue # Pose가 너무 다르면 매칭 거부
                
                # 매칭된 인스턴스에 TID 할당 및 업데이트
                matched_instance['track_id'] = tid
                instances_with_tid.append(matched_instance)
                unmatched_new_indices.remove(best_match_index)
                
                # 트래커 상태 업데이트
                next_active_trackers[tid] = {
                    'center': get_bbox_center(get_bbox_coord(matched_instance)),
                    'bbox': get_bbox_coord(matched_instance),
                    'kpts': matched_instance['keypoints'],
                    'lost_count': 0
                }
            
            # 2. 매칭 실패 (LOST 상태)
            else:
                new_lost_count = tracker_state['lost_count'] + 1
                if new_lost_count <= 5: # 5프레임 동안 못 찾으면 트랙 종료
                    # 상태만 업데이트하여 다음 프레임에서 재시도
                    tracker_state['lost_count'] = new_lost_count
                    next_active_trackers[tid] = tracker_state
                # else: 5프레임 이상이면 트랙 종료 (next_active_trackers에 추가 안함)

        # --- C. 신규 인스턴스 (New Track Initialization) ---
        
        for i in unmatched_new_indices:
            new_instance = current_instances[i]
            
            # 신규 TID 할당
            current_tid_counter += 1
            new_tid = current_tid_counter
            
            new_instance['track_id'] = new_tid
            instances_with_tid.append(new_instance)

            # 새 트래커 상태 초기화
            next_active_trackers[new_tid] = {
                'center': get_bbox_center(get_bbox_coord(new_instance)),
                'bbox': get_bbox_coord(new_instance),
                'kpts': new_instance['keypoints'],
                'lost_count': 0
            }
            
        # 4. 트래커 상태 업데이트 및 결과 저장
        active_trackers = next_active_trackers
        new_frame_data.append((frame_num, instances_with_tid))

    # 5. 최종 데이터 반환
    tracked_data['frame_data'] = new_frame_data
    return tracked_data


### create_skeleton_video

In [10]:
import cv2
import numpy as np
from typing import Dict, Any, List, Tuple
from pathlib import Path

# --- 전역 상수 ---
TRACK_COLORS = [(0, 0, 255), (0, 255, 0), (255, 0, 0), (0, 255, 255), (255, 0, 255), (255, 255, 0), (128, 0, 0), (0, 128, 0), (0, 0, 128), (128, 128, 0), (128, 0, 128), (0, 128, 128)]
# 비디오 프레임 크기 (원본 이미지 크기에 맞게 설정해야 합니다.)
FRAME_WIDTH = 1280 
FRAME_HEIGHT = 720 

# --- 핵심 시각화 함수 (검은 배경 사용) ---
def draw_skeleton_on_black(
    frame_data_list: List[Dict[str, Any]], 
    skeleton_links: List[Tuple[int, int]]
) -> np.ndarray:
    """
    검은색 배경에 스켈레톤, BBox, Track ID를 그립니다.
    """
    # 1. 검은색 캔버스 생성 (FRAME_HEIGHT x FRAME_WIDTH)
    img = np.zeros((FRAME_HEIGHT, FRAME_WIDTH, 3), dtype=np.uint8)

    # 2. 인스턴스 정보 순회 (Track ID 기반 시각화)
    for instance in frame_data_list:
        track_id = instance.get('track_id')
        kpts = np.array(instance['keypoints'], dtype=np.int32)
        bbox = instance['bbox'][0]  # [[x1, y1, x2, y2]] 형태 가정

        if track_id is None:
            continue
            
        # Track ID에 따른 고유 색상 설정
        color_idx = track_id % len(TRACK_COLORS)
        color = TRACK_COLORS[color_idx]
        
        # A. Bounding Box 및 Track ID 그리기
        x1, y1, x2, y2 = map(int, bbox)
        
        # BBox 그리기 (선 두께 2)
        cv2.rectangle(img, (x1, y1), (x2, y2), color, 2)
        
        # Track ID 텍스트 오버레이
        text = f"ID: {track_id}"
        cv2.putText(img, text, (x1, y1 - 10), 
                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)

        # B. Skeleton (골격) 그리기
        for start_idx, end_idx in skeleton_links:
            if start_idx < len(kpts) and end_idx < len(kpts):
                start_point = tuple(kpts[start_idx])
                end_point = tuple(kpts[end_idx])
                # 스켈레톤 선 그리기
                cv2.line(img, start_point, end_point, color, 2)

        # C. Keypoints (관절점) 그리기
        for pt in kpts:
            cv2.circle(img, tuple(pt), 4, (255, 255, 255), -1) 
            cv2.circle(img, tuple(pt), 2, color, -1)

    return img

# --- 비디오 생성 로직 (이미지 로드 제거) ---
def create_skeleton_video(
    tracking_data: Dict[str, Any], 
    output_video_path: str,
    fps: int = 30
):
    """
    추적 결과를 기반으로 검은색 배경의 스켈레톤 비디오 파일을 생성합니다.
    """
    frame_data_list = tracking_data['frame_data']
    meta_links = tracking_data['meta_info'].get('skeleton_links', [])
    
    if not frame_data_list or not meta_links:
        print("프레임 데이터 또는 골격 연결 정보가 없습니다.")
        return

    # 비디오 라이터 설정 (전역 변수 FRAME_WIDTH, FRAME_HEIGHT 사용)
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_video_path, fourcc, fps, (FRAME_WIDTH, FRAME_HEIGHT))

    print(f"검은 배경 비디오 생성 시작: {output_video_path}...")
    
    # 모든 프레임을 순회하며 시각화 및 저장
    for frame_num, instances in frame_data_list:
        
        # 스켈레톤만 그리는 함수 호출
        visualized_img = draw_skeleton_on_black(
            instances, 
            meta_links
        )
        
        if visualized_img is not None:
            out.write(visualized_img)
        
    out.release()
    print("비디오 생성 완료.")

### extract_main_patient_data
중앙에 위치하면서 BBox가 가장 큰 환자의 TID 를 찾고 patient_data로 저장

In [11]:
import numpy as np
from typing import Dict, Any, Optional, List

# 전역 상수 (화질 및 ROI 설정)
FRAME_WIDTH = 1280
FRAME_HEIGHT = 720
ROI_CENTER = (FRAME_WIDTH // 2, FRAME_HEIGHT // 2)
ROI_TOLERANCE = 200 # 중앙에서 200픽셀 내의 영역

def extract_main_patient_data(tracking_data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
    """
    트래킹 데이터에서 중앙에 위치하며 가장 큰 BBox를 가진 환자를 식별하고,
    해당 환자의 데이터만 필터링하여 반환합니다.
    
    Returns:
        필터링된 tracking_data (메인 환자만 포함) 또는 None
    """
    frame_data_list = tracking_data.get('frame_data', [])
    if not frame_data_list:
        return None

    # 분석할 프레임 수 (초반 5프레임 분석)
    ANALYSIS_FRAMES = 5
    
    # {TID: [총 BBox 면적 합, 횟수]}
    tid_metrics = {}
    
    # 1. 초기 프레임 분석하여 메인 환자 TID 찾기
    for i, (frame_num, instances) in enumerate(frame_data_list):
        if i >= ANALYSIS_FRAMES:
            break

        for instance in instances:
            tid = instance.get('track_id')
            if tid is None:
                continue

            bbox = instance['bbox'][0] # [x1, y1, x2, y2]
            
            # 중앙 근접성 확인
            x1, y1, x2, y2 = bbox
            center_x = (x1 + x2) / 2
            center_y = (y1 + y2) / 2
            
            # 중앙 ROI 기준 충족 여부
            is_central = (
                abs(center_x - ROI_CENTER[0]) < ROI_TOLERANCE and
                abs(center_y - ROI_CENTER[1]) < ROI_TOLERANCE
            )
            
            if is_central:
                # BBox 면적 계산
                bbox_area = (x2 - x1) * (y2 - y1)
                
                if tid not in tid_metrics:
                    tid_metrics[tid] = [0, 0] # [면적 합, 횟수]
                
                tid_metrics[tid][0] += bbox_area
                tid_metrics[tid][1] += 1

    if not tid_metrics:
        print("경고: 중앙 ROI 내에 적합한 인스턴스가 없습니다. (TID를 찾을 수 없음)")
        return None

    # 2. 평균 면적이 가장 큰 TID 선택
    best_tid = None
    max_avg_area = -1

    for tid, (area_sum, count) in tid_metrics.items():
        if count > 0:
            avg_area = area_sum / count
            
            if avg_area > max_avg_area:
                max_avg_area = avg_area
                best_tid = tid

    # 3. 해당 TID 데이터만 필터링
    filtered_data = tracking_data.copy()
    new_frame_data = []

    for frame_num, instances in tracking_data['frame_data']:
        # 해당 TID만 찾아서 리스트에 저장
        patient_instance = [inst for inst in instances if inst.get('track_id') == best_tid]
        
        if patient_instance:
            new_frame_data.append((frame_num, patient_instance))

    filtered_data['frame_data'] = new_frame_data
    
    return filtered_data

### New fuction

In [None]:
from abc import ABC, abstractmethod
import numpy as np
from typing import Dict, List, Tuple, Optional

class BaseRepetitionCounter(ABC):
   
    def __init__(self, view_angle: str):
        self.view_angle = view_angle
        self.count = 0
        self.state = "neutral"
        self.previous_value = None
        
    def normalize_skeleton(self, skeleton: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
        """
        공통 Normalization 로직
        - Hip center를 원점으로
        - Torso length로 스케일 정규화
        """
        hip = skeleton.get('hip_center') or skeleton.get('pelvis')
        normalized = {}
        
        for joint_name, joint_pos in skeleton.items():
            normalized[joint_name] = joint_pos - hip
        
        # Torso length 계산
        neck = normalized.get('neck') or normalized.get('head')
        pelvis = normalized.get('pelvis') or normalized.get('hip_center')
        torso_length = np.linalg.norm(neck - pelvis)
        
        for joint_name in normalized:
            normalized[joint_name] = normalized[joint_name] / torso_length
        
        return normalized
    
    @abstractmethod
    def get_key_joints(self) -> List[str]:
        """각 운동에서 추적할 주요 관절 반환"""
        pass
    
    @abstractmethod
    def get_thresholds(self) -> Dict[str, float]:
        """View angle에 따른 threshold 반환"""
        pass
    
    @abstractmethod
    def update(self, skeleton: Dict[str, np.ndarray]) -> bool:
        """
        프레임별로 호출되어 상태 업데이트 및 카운팅
        Returns: 카운트가 증가했으면 True
        """
        pass
    
    def reset(self):
        """카운터 리셋"""
        self.count = 0
        self.state = "neutral"
        self.previous_value = None

# Pipeline

In [12]:
from pathlib import Path

TARGET_VIDEO_DIR = 'M02'
TARGET_VIDEO = 'M02_VISIT2'

TXT_DIR = Path("/workspace/nas203/ds_RehabilitationMedicineData/IDs/tojihoo/Won_Kim_research_at_Bosanjin/")
FRAME_DIR = Path("/workspace/nas203/ds_RehabilitationMedicineData/IDs/tojihoo/3_project_HCCmove/data/1_FRAME/Won_Kim_research_at_Bosanjin/")
KEYPOINTS_DIR = Path("/workspace/nas203/ds_RehabilitationMedicineData/IDs/tojihoo/3_project_HCCmove/data/2_KEYPOINTS/Won_Kim_research_at_Bosanjin/")
OUTPUT_DIR = Path('/workspace/nas203/ds_RehabilitationMedicineData/IDs/tojihoo/ASAN_01_Repeatition_Counter/')

TXT_PATH = TXT_DIR / TARGET_VIDEO_DIR / TARGET_VIDEO / "segment_frames.txt"
FRAME_PATH = FRAME_DIR / TARGET_VIDEO_DIR / TARGET_VIDEO
KEYPOINTS_PATH = KEYPOINTS_DIR / TARGET_VIDEO_DIR / TARGET_VIDEO 


In [13]:
full_label, start_frame, end_frame, view_angle, action_name = parse_nth_segment_info_from_file(TXT_PATH,1)
print(parse_nth_segment_info_from_file(TXT_PATH,1))

('frontal__biceps_curl__1', 390, 1860, 'frontal', 'biceps_curl')


In [14]:
data = load_kpt(KEYPOINTS_PATH,start_frame,end_frame)
print(data['meta_info'])

{'keypoint_id2name': {'0': 'nose', '1': 'left_eye', '2': 'right_eye', '3': 'left_ear', '4': 'right_ear', '5': 'left_shoulder', '6': 'right_shoulder', '7': 'left_elbow', '8': 'right_elbow', '9': 'left_wrist', '10': 'right_wrist', '11': 'left_hip', '12': 'right_hip', '13': 'left_knee', '14': 'right_knee', '15': 'left_ankle', '16': 'right_ankle'}, 'skeleton_links': [[15, 13], [13, 11], [16, 14], [14, 12], [11, 12], [5, 11], [6, 12], [5, 6], [5, 7], [6, 8], [7, 9], [8, 10], [1, 2], [0, 1], [0, 2], [1, 3], [2, 4], [3, 5], [4, 6]]}


In [15]:
data = track_keypoints_with_heuristics(data)
print(data['frame_data'][0])

(390, [{'keypoints': [[623.116569494809, 276.58871374999944], [632.0300657182678, 273.26429975043493], [617.213324244178, 275.89828321752134], [652.723818741729, 286.16960966286035], [608.6365766619523, 290.1093789921881], [687.9399624990729, 337.98545105249127], [583.0273832350395, 338.896641384624], [709.1141864235723, 396.6670181435643], [560.640438566923, 407.75919326930034], [723.8945584481885, 433.93101430077877], [539.0682291890646, 444.8229061857852], [660.440203605054, 447.5955943565432], [590.9682451102794, 445.6246015097727], [679.5426441589224, 503.388014915476], [575.1900484220305, 499.49022418594745], [684.0827803298957, 620.3137314261102], [562.2892200664218, 618.7605232335299]], 'keypoint_scores': [0.04641585797071457, 0.04142773523926735, 0.03796147555112839, 0.04945866018533707, 0.029720377177000046, 0.11248690634965897, 0.08959366381168365, 0.12298426777124405, 0.09696049988269806, 0.1587880253791809, 0.12082652747631073, 0.08149617910385132, 0.07353484630584717, 0.0

In [17]:
patient_data = extract_main_patient_data(data)
print(data['frame_data'][0])

(390, [{'keypoints': [[623.116569494809, 276.58871374999944], [632.0300657182678, 273.26429975043493], [617.213324244178, 275.89828321752134], [652.723818741729, 286.16960966286035], [608.6365766619523, 290.1093789921881], [687.9399624990729, 337.98545105249127], [583.0273832350395, 338.896641384624], [709.1141864235723, 396.6670181435643], [560.640438566923, 407.75919326930034], [723.8945584481885, 433.93101430077877], [539.0682291890646, 444.8229061857852], [660.440203605054, 447.5955943565432], [590.9682451102794, 445.6246015097727], [679.5426441589224, 503.388014915476], [575.1900484220305, 499.49022418594745], [684.0827803298957, 620.3137314261102], [562.2892200664218, 618.7605232335299]], 'keypoint_scores': [0.04641585797071457, 0.04142773523926735, 0.03796147555112839, 0.04945866018533707, 0.029720377177000046, 0.11248690634965897, 0.08959366381168365, 0.12298426777124405, 0.09696049988269806, 0.1587880253791809, 0.12082652747631073, 0.08149617910385132, 0.07353484630584717, 0.0