In [None]:
!pip install -r requirements.txt

In [None]:
# mediapipe 라이브러리를 사용하여 기본 스켈레톤 추출
# YOLO 라이브러리를 사용하여, 다중 객체 탐지 (Top_down 방식)
# 칼만 필터(Kalman Filter)를 사용하여, 각 관절의 다음 위치를 예측하고 보정하여 성능 개선
# 상태 예측 및 보간: 추적을 잠시 놓친 스켈레톤의 위치를 예측하여, 시각적 끊김 없이 보이도록 처리
# 슬롯 기반 할당(Slot-Based Assignment)' 개념을 도입하여 성능 향상
# 헝가리안 알고리즘(scipy.linear_sum_assignment)을 사용:
# - 매 프레임마다 새로 탐지된 사람들과 기존 '댄서 슬롯'들의 위치를 비교하여, 전체적으로 가장 거리가 가까운 최적의 짝을 확인
# 3D 칼만 필터: 개별 댄서의 물리적 움직임을 3차원 공간에서 부드럽고 정확하게 예측

import cv2
import mediapipe as mp
from ultralytics import YOLO
import numpy as np
import random
import csv
from collections import Counter
from scipy.optimize import linear_sum_assignment

# ----------------------------------
# 1. 클래스 및 함수 정의
# ----------------------------------
class KalmanFilter3D:
    """A simple Kalman filter for 3D point tracking."""
    def __init__(self, dt=1, std_acc=1, x_std_meas=0.1, y_std_meas=0.1, z_std_meas=0.1):
        self.state = np.zeros((6, 1))   # 상태 변수를 6차원으로 확장: [x, y, z, vx, vy, vz]
        # 상태 전이 행렬을 6x6으로 확장
        self.F = np.array([[1,0,0,dt,0,0], [0,1,0,0,dt,0], [0,0,1,0,0,dt],
                           [0,0,0,1,0,0], [0,0,0,0,1,0], [0,0,0,0,0,1]])
        # 측정 행렬을 3x6으로 확장
        self.H = np.array([[1,0,0,0,0,0], [0,1,0,0,0,0], [0,0,1,0,0,0]])
        self.Q = np.eye(6)*std_acc**2
        self.R = np.diag([x_std_meas**2, y_std_meas**2, z_std_meas**2])
        self.P = np.eye(6)
    def predict(self):
        self.state = np.dot(self.F, self.state)
        self.P = np.dot(np.dot(self.F, self.P), self.F.T) + self.Q
        return self.state
    def update(self, z):
        # 입력 z는 이제 3D 벡터 [x, y, z]
        S = np.dot(self.H, np.dot(self.P, self.H.T)) + self.R
        K = np.dot(np.dot(self.P, self.H.T), np.linalg.inv(S))
        self.state += np.dot(K, (z - np.dot(self.H, self.state)))
        self.P -= np.dot(np.dot(K, self.H), self.P)
        return self.state

class DancerSlot:
    """영구적인 댄서 슬롯의 모든 데이터를 저장하는 클래스"""
    def __init__(self, slot_id, initial_bbox):
        self.id = slot_id
        self.kalman_filters = [KalmanFilter3D() for _ in range(33)]
        self.landmarks = np.zeros((33, 4))
        self.bbox = initial_bbox
        self.disappeared_frames = 0
        self.is_active = True
        self.color = (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255))

    def reinitialize_kalman_filters(self):
        """칼만 필터를 초기 상태로 리셋합니다."""
        self.kalman_filters = [KalmanFilter3D() for _ in range(33)]

def mouse_callback(event, x, y, flags, param):
    """마우스 이벤트를 처리하여 스켈레톤 위치를 수정하는 함수"""
    global is_dragging, selected_landmark, dancer_slots, paused, swap_mode, selected_for_swap

    # ID 교체 로직
    if swap_mode and event == cv2.EVENT_LBUTTONDOWN:
        clicked_slot_id = None
        for slot_id, slot in dancer_slots.items():
            if not slot.is_active: continue
            x1, y1, x2, y2 = slot.bbox
            if x1 < x < x2 and y1 < y < y2:
                clicked_slot_id = slot_id
                break
        
        if clicked_slot_id is not None and clicked_slot_id not in selected_for_swap:
            selected_for_swap.append(clicked_slot_id)
            print(f"ID 교체를 위해 Slot {clicked_slot_id} 선택됨. ({len(selected_for_swap)}/2)")

            if len(selected_for_swap) == 2:
                id1, id2 = selected_for_swap
                dancer_slots[id1].id, dancer_slots[id2].id = dancer_slots[id2].id, dancer_slots[id1].id
                dancer_slots[id1].color, dancer_slots[id2].color = dancer_slots[id2].color, dancer_slots[id1].color
                print(f"!!! ID 교체 완료: Slot {id1} <-> Slot {id2}")
                selected_for_swap = []
                swap_mode = False
        return

    if not paused: return

    if event == cv2.EVENT_LBUTTONDOWN:
        min_dist = float('inf')
        for slot_id, slot in dancer_slots.items():
            if not slot.is_active: continue
            for i, landmark in enumerate(slot.landmarks):
                dist = np.linalg.norm(np.array([landmark[0], landmark[1]]) - np.array([x, y]))
                if dist < 10 and dist < min_dist:
                    min_dist = dist
                    selected_landmark['slot_id'] = slot_id
                    selected_landmark['landmark_idx'] = i
                    is_dragging = True
                    print(f"선택됨: Slot {slot_id}, Landmark {i}")
                    break

    elif event == cv2.EVENT_MOUSEMOVE:
        if is_dragging:
            slot_id = selected_landmark['slot_id']
            lm_idx = selected_landmark['landmark_idx']
            dancer_slots[slot_id].landmarks[lm_idx, 0] = x
            dancer_slots[slot_id].landmarks[lm_idx, 1] = y

    elif event == cv2.EVENT_LBUTTONUP:
        if is_dragging:
            slot_id = selected_landmark['slot_id']
            lm_idx = selected_landmark['landmark_idx']
            kf = dancer_slots[slot_id].kalman_filters[lm_idx]
            kf.state[0, 0] = dancer_slots[slot_id].landmarks[lm_idx, 0]
            kf.state[1, 0] = dancer_slots[slot_id].landmarks[lm_idx, 1]
            print(f"수정 완료: Slot {slot_id}, Landmark {lm_idx} 위치가 ({kf.state[0,0]:.1f}, {kf.state[1,0]:.1f})로 업데이트됨")
        is_dragging = False
        selected_landmark = {'slot_id': None, 'landmark_idx': None}

# ----------------------------------
# 2. 초기 설정
# ----------------------------------
TARGET_PERSON_COUNT = 4
yolo_model = YOLO('yolov8s.pt')
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(static_image_mode=True, min_detection_confidence=0.5)
video_path = "aespa_test.mp4"
cap = cv2.VideoCapture(video_path)

frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))
output_video_path = video_path.replace('.mp4', 'test_output.mp4')
out = cv2.VideoWriter(output_video_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (frame_width, frame_height))
csv_output_path = video_path.replace('.mp4', 'test_skeletons_data.csv')
csv_file = open(csv_output_path, 'w', newline='', encoding='utf-8')
csv_writer = csv.writer(csv_file)
csv_writer.writerow(['frame', 'slot_id', 'landmark_id', 'x', 'y', 'z', 'visibility', 'bbox_x1', 'bbox_y1', 'bbox_x2', 'bbox_y2'])

dancer_slots = {}
MAX_DISAPPEARED_FRAMES = 15
frame_counter = 0

# 추가 변수
is_dragging = False
selected_landmark = {'slot_id': None, 'landmark_idx': None}
paused = False
swap_mode = False
selected_for_swap = []

print(f"3D 칼만 필터 기반 추적 시스템을 시작합니다. 목표 인원수: {TARGET_PERSON_COUNT}명")

# ----------------------------------
# 3. 메인 루프: 프레임별 처리
# ----------------------------------
while cap.isOpened():
    if not paused:

        success, frame = cap.read()
        if not success: break

        # STEP 1: YOLO로 모든 사람 '탐지' (track 대신 predict 사용)
        results = yolo_model.predict(frame, classes=[0], conf=0.4, verbose=False)
        detections = results[0].boxes.xyxy.cpu().numpy()

        # --- 슬롯 초기화 로직 ---
        if len(dancer_slots) < TARGET_PERSON_COUNT:
            for i, bbox in enumerate(detections):
                # 이미 존재하는 슬롯의 개수를 ID로 사용
                slot_id = len(dancer_slots)
                if slot_id < TARGET_PERSON_COUNT:
                    print(f"프레임 {frame_counter}: 댄서 슬롯 {slot_id} 초기화.")
                    dancer_slots[slot_id] = DancerSlot(slot_id, bbox)
                else:
                    break
        
        # --- 슬롯 할당 로직 (매 프레임 실행) ---
        matched_detection_indices = set()
        if detections.shape[0] > 0 and len(dancer_slots) > 0:
            # 1. 활성화된 슬롯과 모든 탐지 객체 간의 매칭
            active_slots = {sid: slot for sid, slot in dancer_slots.items() if slot.is_active}
            slot_keys = list(active_slots.keys())
            
            if len(slot_keys) > 0:
                # 비용 행렬 계산
                cost_matrix = np.zeros((len(slot_keys), len(detections)))
                for i, slot_id in enumerate(slot_keys):
                    slot_center = ((active_slots[slot_id].bbox[0] + active_slots[slot_id].bbox[2]) / 2, (active_slots[slot_id].bbox[1] + active_slots[slot_id].bbox[3]) / 2)
                    for j, det_box in enumerate(detections):
                        det_center = ((det_box[0] + det_box[2]) / 2, (det_box[1] + det_box[3]) / 2)
                        cost_matrix[i, j] = np.linalg.norm(np.array(slot_center) - np.array(det_center))
                
                # 헝가리안 알고리즘으로 최적 할당
                row_ind, col_ind = linear_sum_assignment(cost_matrix)
                
                # 할당된 슬롯 업데이트
                for r, c in zip(row_ind, col_ind):
                    if cost_matrix[r, c] < 250: # 너무 멀리 떨어진 매칭은 무시
                        slot_id = slot_keys[r]
                        person_slot = dancer_slots[slot_id]
                        person_slot.disappeared_frames = 0
                        person_slot.is_active = True
                        person_slot.bbox = detections[c]
                        matched_detection_indices.add(c)

        # 2. ID 부활 로직: 비활성화된 슬롯과 매칭되지 않은 탐지 객체 간의 매칭
        unmatched_detection_indices = list(set(range(len(detections))) - matched_detection_indices)
        inactive_slots = {sid: slot for sid, slot in dancer_slots.items() if not slot.is_active}

        if len(unmatched_detection_indices) > 0 and len(inactive_slots) > 0:
                unmatched_detections = detections[unmatched_detection_indices]
                inactive_slot_keys = list(inactive_slots.keys())

                # 비활성 슬롯과 남은 탐지 객체 간의 실제 거리로 비용 행렬 계산
                cost_matrix_revival = np.zeros((len(inactive_slot_keys), len(unmatched_detections)))
                for i, slot_id in enumerate(inactive_slot_keys):
                    # 비활성 슬롯의 마지막 예측 위치를 사용
                    slot_center = ((inactive_slots[slot_id].bbox[0] + inactive_slots[slot_id].bbox[2]) / 2, (inactive_slots[slot_id].bbox[1] + inactive_slots[slot_id].bbox[3]) / 2)
                    for j, det_box in enumerate(unmatched_detections):
                        det_center = ((det_box[0] + det_box[2]) / 2, (det_box[1] + det_box[3]) / 2)
                        cost_matrix_revival[i, j] = np.linalg.norm(np.array(slot_center) - np.array(det_center))

                row_ind_rev, col_ind_rev = linear_sum_assignment(cost_matrix_revival)
                for r, c in zip(row_ind_rev, col_ind_rev):
                            slot_id = inactive_slot_keys[r]
                            detection_idx = list(unmatched_detection_indices)[c]
                            
                            print(f"프레임 {frame_counter}: 슬롯 {slot_id} 부활!")
                            person_slot = dancer_slots[slot_id]
                            person_slot.is_active = True
                            person_slot.disappeared_frames = 0
                            person_slot.bbox = detections[detection_idx]
                            person_slot.reinitialize_kalman_filters() # 칼만 필터 초기화
                            matched_detection_indices.add(detection_idx)

        # --- 3. 포즈 추정 및 상태 업데이트/예측 ---
        for slot_id, person_slot in dancer_slots.items():
            if not person_slot.is_active: continue

            # 현재 프레임에서 업데이트된(매칭된) 슬롯인 경우
            if person_slot.disappeared_frames == 0:
                x1, y1, x2, y2 = [int(coord) for coord in person_slot.bbox]
                padding = 0.15; box_w, box_h = x2 - x1, y2 - y1
                x1_pad = max(0, int(x1 - box_w * padding)); y1_pad = max(0, int(y1 - box_h * padding))
                x2_pad = min(frame_width, int(x2 + box_w * padding)); y2_pad = min(frame_height, int(y2 + box_h * padding))
                person_crop = frame[y1_pad:y2_pad, x1_pad:x2_pad]

                if person_crop.shape[0] > 0 and person_crop.shape[1] > 0:
                    crop_rgb = cv2.cvtColor(person_crop, cv2.COLOR_BGR2RGB); pose_results = pose.process(crop_rgb)
                    if pose_results.pose_landmarks and len(pose_results.pose_landmarks.landmark) == 33:
                        crop_h, crop_w, _ = person_crop.shape
                        for i, landmark in enumerate(pose_results.pose_landmarks.landmark):
                            measured_x, measured_y = x1_pad + landmark.x * crop_w, y1_pad + landmark.y * crop_h
                            measured_z = landmark.z * crop_w 
                            kf = person_slot.kalman_filters[i]
                            if np.all(kf.state[0:3] == 0): kf.state[0], kf.state[1], kf.state[2] = measured_x, measured_y, measured_z
                            kf.predict(); updated_state = kf.update(np.array([[measured_x], [measured_y], [measured_z]]))
                            person_slot.landmarks[i] = [updated_state[0, 0], updated_state[1, 0], updated_state[2, 0], landmark.visibility]
            else: # 사라진 슬롯인 경우
                person_slot.disappeared_frames += 1
                if person_slot.disappeared_frames > MAX_DISAPPEARED_FRAMES:
                    print(f"프레임 {frame_counter}: 슬롯 {slot_id} 비활성화.")
                    person_slot.is_active = False
                    continue
                
                for i in range(33):
                    kf = person_slot.kalman_filters[i]; predicted_state = kf.predict()
                    person_slot.landmarks[i, 0:3] = predicted_state[0:3, 0].T
                
                center_x = np.mean(person_slot.landmarks[:, 0]); center_y = np.mean(person_slot.landmarks[:, 1])
                if not (np.isnan(center_x) or np.isnan(center_y)):
                    w, h = person_slot.bbox[2] - person_slot.bbox[0], person_slot.bbox[3] - person_slot.bbox[1]
                    person_slot.bbox = [center_x - w/2, center_y - h/2, center_x + w/2, center_y + h/2]

        # --- 4. 최종 시각화 및 데이터 저장 ---
        for slot_id, person_slot in dancer_slots.items():
            if not person_slot.is_active: continue

            color = person_slot.color; label = f"ID: {person_slot.id}"
            if person_slot.disappeared_frames > 0:
                color = tuple(c // 2 for c in color)
                label += " (Lost)"
            x1, y1, x2, y2 = [int(c) for c in person_slot.bbox]
            cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
            cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
            
            landmarks_to_process = person_slot.landmarks
            bbox_for_csv = person_slot.bbox.tolist()    # 후처리를 위해 bbox 좌표도 함께 저장

            for i, landmark in enumerate(landmarks_to_process):
                csv_writer.writerow([frame_counter, person_slot.id, i, landmark[0], landmark[1], landmark[2], landmark[3]] + bbox_for_csv)
                if landmark[3] > 0.5:
                    cv2.circle(frame, (int(landmark[0]), int(landmark[1])), 3, color, -1)

            connections = mp_pose.POSE_CONNECTIONS
            for connection in connections:
                start_idx, end_idx = connection
                if landmarks_to_process[start_idx, 3] > 0.5 and landmarks_to_process[end_idx, 3] > 0.5:
                    start_point = (int(landmarks_to_process[start_idx, 0]), int(landmarks_to_process[start_idx, 1]))
                    end_point = (int(landmarks_to_process[end_idx, 0]), int(landmarks_to_process[end_idx, 1]))
                    cv2.line(frame, start_point, end_point, color, 2)

        out.write(frame)
        frame_counter += 1
    # cv2.imshow('Slot-Based High-Performance Tracking', frame)
    window_name = 'Slot-Based High-Performance Tracking'
    cv2.imshow(window_name, frame)
    cv2.setMouseCallback(window_name, mouse_callback)

    # 
    # if cv2.waitKey(1) & 0xFF == ord('q'): break
    key = cv2.waitKey(1) & 0xFF
    if key == ord('q'):
        break
    elif key == ord(' '):  # 스페이스바: 일시정지/재생
        paused = not paused
        if paused:
            print("--- 영상 일시정지 ---")
        else:
            print("--- 영상 재생 ---")
            swap_mode = False # 재생 시작 시 교체 모드 해제
            selected_for_swap = []
            
    elif key == ord('s'):  # 's' 키: ID 교체 모드
        if paused:
            swap_mode = True
            selected_for_swap = []
            print("--- ID 교체 모드 활성화: 교체할 두 사람을 클릭하세요. ---")
        else:
            print("ID를 교체하려면 먼저 영상을 일시정지(스페이스바)하세요.")

# ----------------------------------
# 4. 종료 처리
# ----------------------------------
csv_file.close(); cap.release(); out.release(); cv2.destroyAllWindows(); pose.close()
print(f"처리 완료! 최종 결과가 '{output_video_path}'와 '{csv_output_path}'에 저장되었습니다.")


PRO TIP  Replace 'model=yolov5s.pt' with new 'model=yolov5su.pt'.
YOLOv5 'u' models are trained with https://github.com/ultralytics/ultralytics and feature improved performance vs standard YOLOv5 models trained with https://github.com/ultralytics/yolov5.

Downloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolov5su.pt to 'yolov5su.pt'...


100%|██████████| 17.7M/17.7M [00:01<00:00, 11.8MB/s]


3D 칼만 필터 기반 추적 시스템을 시작합니다. 목표 인원수: 4명
프레임 1: 댄서 슬롯 0 초기화.
프레임 1: 댄서 슬롯 1 초기화.
프레임 1: 댄서 슬롯 2 초기화.
프레임 1: 댄서 슬롯 3 초기화.
처리 완료! 최종 결과가 'aespa_testtest_output.mp4'와 'aespa_testtest_skeletons_data.csv'에 저장되었습니다.


In [5]:
!pip install networkx

^C




In [6]:
# 후처리1
# 트랙릿 재연결 (ID 스위치 보정): slot_id의 이동 경로를 하나의 '트랙릿(Tracklet)'으로 간주
#    → 각 트랙릿의 특정 지점(예: 경로가 끊기기 직전, ID가 바뀐 직후)에서 **외모 특징(옷 색상)**과 **움직임 특징(위치, 속도)**을 추출
#    → 한 트랙릿이 끝나는 지점과 다른 트랙릿이 시작하는 지점의 특징들을 비교
#    → 만약 "A 트랙릿의 끝과 B 트랙릿의 시작이 동일 인물일 확률이 매우 높다"고 판단되면, 두 트랙릿을 하나로 합쳐 ID를 통일

import pandas as pd
import numpy as np
import cv2
from scipy.optimize import linear_sum_assignment
from collections import defaultdict
import time
import networkx as nx
from collections import Counter

# -- 설정 --
# 생성된 원본 데이터 파일
INPUT_CSV_PATH = 'aespa_testtest_skeletons_data.csv'

# 원본 비디오 파일 (외모 특징 추출에 필요)
VIDEO_PATH = 'aespa_test.mp4'

# 후처리가 완료된 데이터가 저장될 파일
OUTPUT_CSV_PATH = 'aespa_test_skeletons_postprocessed.csv'

# -- 파라미터 --
JUMP_THRESHOLD = 150  # ID 스위치를 의심할 위치 '점프' 픽셀 거리, 값이 작을 수록 더 꼼꼼하게 검사
MAX_LINK_FRAMES = 45  # 두 트랙릿을 연결할 최대 프레임 간격
APPEARANCE_WEIGHT = 0.7 # 외모 특징의 가중치 (0~1), 의상이 비슷하면 값을 낮추기
MOTION_WEIGHT = 0.3     # 움직임 특징의 가중치 (0~1)
COST_THRESHOLD = 0.65   # 연결을 위한 비용 임계값

DISTANCE_GATE_THRESHOLD = 100   # 물리적 거리 제한 추가


# --- 2. 헬퍼 함수 ---
def get_color_histogram(frame, bbox):
    """지정된 바운딩 박스 영역에서 컬러 히스토그램을 계산합니다."""
    x1, y1, x2, y2 = [int(c) for c in bbox]
    roi = frame[y1:y2, x1:x2]
    if roi.size == 0: return None
    hsv_roi = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV)
    hist = cv2.calcHist([hsv_roi], [0], None, [180], [0, 180])
    cv2.normalize(hist, hist, 0, 1, cv2.NORM_MINMAX)
    return hist.flatten()

# --- 3. 메인 후처리 로직 ---
def relink_switched_ids_final_corrected(csv_path, video_path, output_path):
    start_time = time.time()
    print("후처리 1단계 시작: 자동화된 ID 스위치 보정 (오류 수정 최종본)")
    df = pd.read_csv(csv_path)
    cap = cv2.VideoCapture(video_path)

    # 💡 [오류 수정] 1. 올바른 트랙릿 분리 로직
    print("  - 1/5: 트랙릿 분리 로직 수정 및 실행 중...")
    df['tracklet_id'] = -1
    tracklet_counter = 0
    
    # 원본 slot_id별로 순회
    for slot_id in df['slot_id'].unique():
        person_df = df[df['slot_id'] == slot_id].sort_values(by='frame')
        if person_df.empty: continue

        # 코(landmark_id=0)의 위치를 기준으로 점프가 일어나는 '프레임 번호'를 찾음
        nose_df = person_df[person_df['landmark_id'] == 0]
        if len(nose_df) < 2:
            # 트랙릿이 너무 짧으면 하나의 그룹으로 간주
            df.loc[person_df.index, 'tracklet_id'] = tracklet_counter
            tracklet_counter += 1
            continue
            
        positions = nose_df[['x', 'y']].values
        distances = np.linalg.norm(positions[1:] - positions[:-1], axis=1)
        # 점프가 일어난 행의 index가 아니라, '프레임 번호'를 저장
        jump_frames = nose_df.iloc[np.where(distances > JUMP_THRESHOLD)[0] + 1]['frame'].tolist()
        
        # 트랙릿의 시작과 끝 프레임 번호를 정의
        tracklet_start_frame = person_df['frame'].min()
        for jump_frame in jump_frames:
            # 이전 트랙릿에 ID 할당
            df.loc[(df['slot_id'] == slot_id) & (df['frame'] >= tracklet_start_frame) & (df['frame'] < jump_frame), 'tracklet_id'] = tracklet_counter
            tracklet_counter += 1
            tracklet_start_frame = jump_frame
        
        # 마지막 트랙릿에 ID 할당
        df.loc[(df['slot_id'] == slot_id) & (df['frame'] >= tracklet_start_frame), 'tracklet_id'] = tracklet_counter
        tracklet_counter += 1
        
    # 2. 각 트랙릿의 시작/끝 정보 수집 (이전과 동일)
    print("  - 2/5: 각 트랙릿의 시작/끝 정보 수집 중...")
    tracklet_info = defaultdict(dict)
    for tid, group in df.groupby('tracklet_id'):
        start_frame_info = group.sort_values(by='frame').iloc[0]
        end_frame_info = group.sort_values(by='frame').iloc[-1]
        tracklet_info[tid] = {
            'original_slot_id': start_frame_info['slot_id'], 'start_frame': start_frame_info['frame'], 'end_frame': end_frame_info['frame'],
            'start_motion': start_frame_info[['x', 'y']].values, 'end_motion': end_frame_info[['x', 'y']].values,
            'start_bbox': start_frame_info[['bbox_x1', 'bbox_y1', 'bbox_x2', 'bbox_y2']].values, 'end_bbox': end_frame_info[['bbox_x1', 'bbox_y1', 'bbox_x2', 'bbox_y2']].values,
            'start_appearance': None, 'end_appearance': None
        }

    # 3. 'Single-Pass'로 외모 특징 추출 (이전과 동일)
    print("  - 3/5: 비디오 단일 스캔으로 모든 외모 특징 추출 중...")
    features_to_extract = defaultdict(list)
    for tid, info in tracklet_info.items():
        if info['start_frame'] == info['end_frame']: # 단일 프레임 트랙릿 처리
             features_to_extract[info['start_frame']].append({'tid': tid, 'role': 'single', 'bbox': info['start_bbox']})
        else:
            features_to_extract[info['start_frame']].append({'tid': tid, 'role': 'start', 'bbox': info['start_bbox']})
            features_to_extract[info['end_frame']].append({'tid': tid, 'role': 'end', 'bbox': info['end_bbox']})
    frame_idx = 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret: break
        if frame_idx in features_to_extract:
            for item in features_to_extract[frame_idx]:
                hist = get_color_histogram(frame, item['bbox'])
                if hist is not None:
                    if item['role'] == 'single':
                        tracklet_info[item['tid']]['start_appearance'] = hist
                        tracklet_info[item['tid']]['end_appearance'] = hist
                    else:
                        tracklet_info[item['tid']][f"{item['role']}_appearance"] = hist
        frame_idx += 1
    cap.release()
    
    # 4. 비용 행렬 계산 및 연결 그래프 생성 (이전과 동일)
    print("  - 4/5: 트랙릿 간 연결 그래프 생성 중...")
    tids = list(tracklet_info.keys())
    cost_matrix = np.full((len(tids), len(tids)), np.inf)
    for i, tid1 in enumerate(tids):
        for j, tid2 in enumerate(tids):
            if i == j: continue
            end_info = tracklet_info[tid1]; start_info = tracklet_info[tid2]
            frame_gap = start_info['start_frame'] - end_info['end_frame']
            if 0 < frame_gap < MAX_LINK_FRAMES:
                if end_info['end_appearance'] is not None and start_info['start_appearance'] is not None:
                    motion_cost = np.linalg.norm(end_info['end_motion'] - start_info['start_motion'])
                    if motion_cost > DISTANCE_GATE_THRESHOLD: continue
                    appearance_cost = 1 - cv2.compareHist(end_info['end_appearance'], start_info['start_appearance'], cv2.HISTCMP_CORREL)
                    norm_motion_cost = min(motion_cost / DISTANCE_GATE_THRESHOLD, 1.0)
                    total_cost = (MOTION_WEIGHT * norm_motion_cost) + (APPEARANCE_WEIGHT * appearance_cost)
                    cost_matrix[i, j] = total_cost
    
    cost_matrix[np.isinf(cost_matrix)] = 1e6
    row_ind, col_ind = linear_sum_assignment(cost_matrix)

    # 5. ID 재할당 ('가장 긴 트랙릿' 기반)
    print("  - 5/5: 연결 그룹 분석 및 최종 ID 할당 중...")
    G = nx.Graph()
    for r, c in zip(row_ind, col_ind):
        if cost_matrix[r, c] < COST_THRESHOLD:
            G.add_edge(tids[r], tids[c])
            
    connected_components = list(nx.connected_components(G))
    final_id_map = {}
    
    for component in connected_components:
        longest_tracklet_id = -1; max_length = -1
        for tid in component:
            length = tracklet_info[tid]['end_frame'] - tracklet_info[tid]['start_frame']
            if length > max_length:
                max_length = length; longest_tracklet_id = tid
        if longest_tracklet_id != -1:
            final_id = tracklet_info[longest_tracklet_id]['original_slot_id']
            for tid in component: final_id_map[tid] = final_id

    for tid in tids:
        if tid not in final_id_map:
            final_id_map[tid] = tracklet_info[tid]['original_slot_id']
            
    df['slot_id'] = df['tracklet_id'].map(final_id_map)
    df.dropna(subset=['slot_id'], inplace=True)
    df['slot_id'] = df['slot_id'].astype(int)
    df.drop(columns=['tracklet_id'], inplace=True)
    
    df.to_csv(output_path, index=False)
    
    print(f"ID가 보정된 데이터가 '{output_path}'에 저장되었습니다.")

# --- 스크립트 실행 ---
if __name__ == "__main__":
    relink_switched_ids_final_corrected(INPUT_CSV_PATH, VIDEO_PATH, OUTPUT_CSV_PATH)

후처리 1단계 시작: 자동화된 ID 스위치 보정 (오류 수정 최종본)
  - 1/5: 트랙릿 분리 로직 수정 및 실행 중...
  - 2/5: 각 트랙릿의 시작/끝 정보 수집 중...
  - 3/5: 비디오 단일 스캔으로 모든 외모 특징 추출 중...
  - 4/5: 트랙릿 간 연결 그래프 생성 중...
  - 5/5: 연결 그룹 분석 및 최종 ID 할당 중...
ID가 보정된 데이터가 'aespa_test_skeletons_postprocessed.csv'에 저장되었습니다.


In [21]:
# 후처리 2-2
# 전역적 스무딩

import pandas as pd
from scipy.signal import savgol_filter
import time
import os

# --- 1. 설정 및 파라미터 ---
# 이전 단계('ID 재연결')에서 생성된 데이터 파일
INPUT_CSV_PATH = 'aespa_test_skeletons_postprocessed.csv'
# 최종적으로 스무딩된 데이터가 저장될 파일
OUTPUT_CSV_PATH = 'aespa_test_skeletons_smoothed.csv'

# Savitzky-Golay 필터 파라미터 (이 값들을 조정하여 스무딩 강도 조절 가능)
# 더 강력한 스무딩을 위해 윈도우 크기를 이전보다 늘림
WINDOW_LENGTH = 15 
POLYORDER = 3     

# --- 2. 메인 후처리 로직 ---
def apply_global_smoothing(csv_path, output_path):
    """
    ID가 보정된 데이터에 Savitzky-Golay 필터를 적용하여 미세한 떨림과 이상치를 제거합니다.
    """
    start_time = time.time()
    
    # 1. 파일 존재 여부 확인
    if not os.path.exists(csv_path):
        print(f"오류: 입력 파일 '{csv_path}'를 찾을 수 없습니다. ID 재연결 단계를 먼저 실행해주세요.")
        return
        
    print("후처리 2단계 시작: 전역적 스무딩 (최종 떨림 제거)")
    df = pd.read_csv(csv_path)
    print(f"  - 원본 데이터 로드 완료: {len(df)} 행")

    # 스무딩할 좌표 컬럼
    coords_to_smooth = ['x', 'y', 'z']
    
    # 2. 각 ID, 각 관절의 이동 경로에 개별적으로 필터 적용
    print(f"  - Savitzky-Golay 필터 적용 중 (window={WINDOW_LENGTH}, polyorder={POLYORDER})...")
    
    # 그룹별로 데이터를 처리할 때 데이터가 필터 윈도우보다 짧으면 오류가 발생할 수 있음
    # .transform()은 이러한 그룹별 처리를 안전하고 효율적으로 수행
    def smooth(series):
        # 데이터가 윈도우 길이보다 짧으면 스무딩을 적용할 수 없으므로 원본을 반환
        if len(series) < WINDOW_LENGTH:
            return series
        return savgol_filter(series, window_length=WINDOW_LENGTH, polyorder=POLYORDER, mode='interp')

    # slot_id와 landmark_id로 그룹화하여 각 시계열(x, y, z)에 스무딩 함수 적용
    df[coords_to_smooth] = df.groupby(['slot_id', 'landmark_id'])[coords_to_smooth].transform(smooth)
    
    # 3. 최종 데이터 저장
    df.to_csv(output_path, index=False)
    
    end_time = time.time()
    print(f"\n후처리 2단계 완료! (총 소요 시간: {end_time - start_time:.2f}초)")
    print(f"미세 떨림이 제거된 최종 데이터가 '{output_path}'에 저장되었습니다.")


# --- 3. 스크립트 실행 ---
if __name__ == "__main__":
    apply_global_smoothing(INPUT_CSV_PATH, OUTPUT_CSV_PATH)

후처리 2단계 시작: 전역적 스무딩 (최종 떨림 제거)
  - 원본 데이터 로드 완료: 717948 행
  - Savitzky-Golay 필터 적용 중 (window=15, polyorder=3)...

후처리 2단계 완료! (총 소요 시간: 10.76초)
미세 떨림이 제거된 최종 데이터가 'aespa_test_skeletons_smoothed.csv'에 저장되었습니다.


In [22]:
# 후처리 3-2
# 물리적 타당성 검증

import pandas as pd
import numpy as np
from collections import defaultdict
import time

# --- 1. 설정 ---
# 이전 단계('전역적 스무딩')에서 생성된 데이터 파일
INPUT_CSV_PATH = 'aespa_test_skeletons_smoothed.csv'
# 최종적으로 점수가 추가된 데이터가 저장될 파일
OUTPUT_CSV_PATH = 'aespa_test_data_final_with_scores.csv'

# --- 2. 데이터 정의 ---
BONE_MAPPING = {
    'LEFT_ARM': (11, 13), 'RIGHT_ARM': (12, 14), 'LEFT_FOREARM': (13, 15), 'RIGHT_FOREARM': (14, 16),
    'LEFT_UPPER_LEG': (23, 25), 'RIGHT_UPPER_LEG': (24, 26), 'LEFT_LOWER_LEG': (25, 27), 'RIGHT_LOWER_LEG': (26, 28)
}

# --- 3. 헬퍼 함수 ---
def get_3d_distance(p1, p2):
    return np.linalg.norm(p1 - p2)

# --- 4. 메인 후처리 로직 ---
def validate_kinematic_plausibility(csv_path, output_path):
    """
    스무딩된 데이터의 물리적 타당성을 검증하고, 오류 점수를 계산하여 추가합니다.
    """
    start_time = time.time()
    print("후처리 3단계 시작: 물리적 타당성 검증")
    df = pd.read_csv(csv_path)
    
    # 분석의 편의를 위해 피벗 테이블 생성
    df_pivot = df.pivot_table(index=['frame', 'slot_id'], columns='landmark_id', 
                              values=['x', 'y', 'z', 'visibility'])

    # 1. 각 사람의 평균 '신체 비율' 계산
    print("  - 1/3: 각 인물의 평균 신체 비율 계산 중...")
    avg_bone_ratios = defaultdict(dict)
    for slot_id in df['slot_id'].unique():
        person_df = df_pivot.loc[pd.IndexSlice[:, slot_id], :]
        ratios = defaultdict(list)
        for frame_idx in person_df.index.get_level_values('frame'):
            try:
                shoulder_l = person_df.loc[(frame_idx, slot_id), [('x', 11), ('y', 11), ('z', 11)]].values
                shoulder_r = person_df.loc[(frame_idx, slot_id), [('x', 12), ('y', 12), ('z', 12)]].values
                hip_l = person_df.loc[(frame_idx, slot_id), [('x', 23), ('y', 23), ('z', 23)]].values
                hip_r = person_df.loc[(frame_idx, slot_id), [('x', 24), ('y', 24), ('z', 24)]].values
                vis_shoulders = person_df.loc[(frame_idx, slot_id), [('visibility', 11), ('visibility', 12)]].values
                vis_hips = person_df.loc[(frame_idx, slot_id), [('visibility', 23), ('visibility', 24)]].values
                if np.all(vis_shoulders > 0.7) and np.all(vis_hips > 0.7):
                    torso_len = get_3d_distance((shoulder_l + shoulder_r) / 2, (hip_l + hip_r) / 2)
                    if torso_len < 10: continue
                    for bone_name, (start_idx, end_idx) in BONE_MAPPING.items():
                        p1_vis = person_df.loc[(frame_idx, slot_id), ('visibility', start_idx)]
                        p2_vis = person_df.loc[(frame_idx, slot_id), ('visibility', end_idx)]
                        if p1_vis > 0.7 and p2_vis > 0.7:
                            p1 = person_df.loc[(frame_idx, slot_id), [('x',start_idx),('y',start_idx),('z',start_idx)]].values
                            p2 = person_df.loc[(frame_idx, slot_id), [('x',end_idx),('y',end_idx),('z',end_idx)]].values
                            bone_len = get_3d_distance(p1, p2)
                            ratios[bone_name].append(bone_len / torso_len)
            except (KeyError, IndexError): continue
        for bone_name, ratio_list in ratios.items():
            if ratio_list: avg_bone_ratios[slot_id][bone_name] = np.mean(ratio_list)

    # 2. 각 프레임의 '운동학적 오류 점수' 계산
    print("  - 2/3: 각 프레임의 물리적 오류 점수 계산 중...")
    error_scores = []
    for (frame_idx, slot_id), row in df_pivot.iterrows():
        bone_errors = []
        try:
            shoulder_l = row[[('x', 11), ('y', 11), ('z', 11)]].values; shoulder_r = row[[('x', 12), ('y', 12), ('z', 12)]].values
            hip_l = row[[('x', 23), ('y', 23), ('z', 23)]].values; hip_r = row[[('x', 24), ('y', 24), ('z', 24)]].values
            current_torso_len = get_3d_distance((shoulder_l + shoulder_r) / 2, (hip_l + hip_r) / 2)
            if current_torso_len < 10: 
                error_scores.append({'frame': frame_idx, 'slot_id': slot_id, 'kinematic_error': 1.0}); continue
        except (KeyError, IndexError):
            error_scores.append({'frame': frame_idx, 'slot_id': slot_id, 'kinematic_error': 1.0}); continue

        for bone_name, (start_idx, end_idx) in BONE_MAPPING.items():
            if bone_name not in avg_bone_ratios.get(slot_id, {}): continue
            target_ratio = avg_bone_ratios[slot_id][bone_name]
            target_length = current_torso_len * target_ratio
            
            p_start = row[[('x',start_idx),('y',start_idx),('z',start_idx)]].values
            p_end = row[[('x',end_idx),('y',end_idx),('z',end_idx)]].values
            current_length = get_3d_distance(p_start, p_end)
            
            # 비율 오차 계산
            error = abs(current_length - target_length) / (target_length + 1e-6)
            bone_errors.append(error)
        
        # 해당 프레임의 모든 뼈 오차 중 가장 큰 값을 대표 오류 점수로 사용
        frame_error = max(bone_errors) if bone_errors else 0
        error_scores.append({'frame': frame_idx, 'slot_id': slot_id, 'kinematic_error': frame_error})

    # 3. 원본 데이터에 오류 점수 추가 및 저장
    print("  - 3/3: 원본 데이터에 오류 점수 추가 및 저장 중...")
    error_df = pd.DataFrame(error_scores)
    final_df = pd.merge(df, error_df, on=['frame', 'slot_id'])
    final_df.to_csv(output_path, index=False)
    
    end_time = time.time()
    print(f"\n후처리 3단계 완료! (총 소요 시간: {end_time - start_time:.2f}초)")
    print(f"물리적 타당성 점수가 추가된 최종 데이터가 '{output_path}'에 저장되었습니다.")

# --- 스크립트 실행 ---
if __name__ == "__main__":
    validate_kinematic_plausibility(INPUT_CSV_PATH, OUTPUT_CSV_PATH)

후처리 3단계 시작: 물리적 타당성 검증
  - 1/3: 각 인물의 평균 신체 비율 계산 중...
  - 2/3: 각 프레임의 물리적 오류 점수 계산 중...
  - 3/3: 원본 데이터에 오류 점수 추가 및 저장 중...

후처리 3단계 완료! (총 소요 시간: 1389.80초)
물리적 타당성 점수가 추가된 최종 데이터가 'aespa_test_data_final_with_scores.csv'에 저장되었습니다.


In [26]:
# 결과 확인용 시각화 스크립트
import cv2
import pandas as pd
import numpy as np
import mediapipe as mp
import os
import random

# --- 1. 설정 ---
# 최종 보정된 데이터 파일 경로
# ('인체 모델 제약 적용' 단계에서 생성된 최종 결과물)
FINAL_CSV_PATH = 'TWICE_ICANTSTOPME_final_smoothed_ver2.csv'

# 원본 비디오 파일 경로
VIDEO_PATH = 'aespa_test.mp4'

# 최종 결과 영상이 저장될 경로
OUTPUT_VIDEO_PATH = 'aespa_test_visualization_ver2.mp4'

# MediaPipe 관절 연결 정보 (뼈대를 그리기 위함)
POSE_CONNECTIONS = mp.solutions.pose.POSE_CONNECTIONS

# --- 2. 헬퍼 함수 ---
def get_color_for_id(slot_id):
    """ID별로 일관된 색상을 생성하기 위한 간단한 함수"""
    # ID 번호를 기반으로 색상 생성
    random.seed(slot_id)
    return (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255))

# --- 3. 메인 시각화 로직 ---
def visualize_postprocessed_skeletons(csv_path, video_path, output_path):
    """
    최종 보정된 스켈레톤 데이터를 읽어 원본 영상 위에 그려,
    결과 동영상을 생성합니다.
    """
    # 1. 파일 존재 여부 확인
    if not os.path.exists(csv_path):
        print(f"오류: 데이터 파일 '{csv_path}'를 찾을 수 없습니다.")
        return
    if not os.path.exists(video_path):
        print(f"오류: 비디오 파일 '{video_path}'를 찾을 수 없습니다.")
        return

    print("시각화 시작: 최종 보정 데이터를 영상에 그립니다.")
    
    # 2. 데이터 로드 및 전처리
    # 프레임별로 데이터를 빠르게 조회하기 위해 'frame'을 인덱스로 설정
    df = pd.read_csv(csv_path)
    df.set_index('frame', inplace=True)
    
    # 3. 비디오 파일 및 결과 영상 설정
    cap = cv2.VideoCapture(video_path)
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (frame_width, frame_height))

    frame_counter = 0
    while cap.isOpened():
        success, frame = cap.read()
        if not success:
            break

        # 4. 현재 프레임에 해당하는 스켈레톤 데이터 조회
        try:
            frame_data = df.loc[frame_counter]
        except KeyError:
            # 해당 프레임에 데이터가 없으면 원본 프레임만 저장
            out.write(frame)
            frame_counter += 1
            continue

        # 5. 각 ID별로 스켈레톤 그리기
        # slot_id별로 데이터를 그룹화
        for slot_id, group in frame_data.groupby('slot_id'):
            landmarks = group.sort_values(by='landmark_id')
            
            # 랜드마크 좌표와 가시성 추출
            points = landmarks[['x', 'y']].values
            visibility = landmarks['visibility'].values
            
            color = get_color_for_id(int(slot_id))
            
            # 바운딩 박스 그리기
            bbox = group.iloc[0][['bbox_x1', 'bbox_y1', 'bbox_x2', 'bbox_y2']].values
            x1, y1, x2, y2 = [int(c) for c in bbox]
            cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
            cv2.putText(frame, f"ID: {int(slot_id)}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)

            # 관절(점) 그리기
            for i, point in enumerate(points):
                if visibility[i] > 0.5:
                    cv2.circle(frame, (int(point[0]), int(point[1])), 5, color, -1)
            
            # 뼈대(선) 그리기
            for connection in POSE_CONNECTIONS:
                start_idx, end_idx = connection
                if visibility[start_idx] > 0.5 and visibility[end_idx] > 0.5:
                    start_point = (int(points[start_idx, 0]), int(points[start_idx, 1]))
                    end_point = (int(points[end_idx, 0]), int(points[end_idx, 1]))
                    cv2.line(frame, start_point, end_point, color, 2)

        out.write(frame)
        frame_counter += 1
        
        # 실시간으로 보기 (선택사항)
        # cv2.imshow('Final Visualization', frame)
        # if cv2.waitKey(1) & 0xFF == ord('q'):
        #     break

    # 6. 종료 처리
    cap.release()
    out.release()
    cv2.destroyAllWindows()
    print(f"\n시각화 완료! 최종 결과 영상이 '{output_path}'에 저장되었습니다.")


# --- 4. 스크립트 실행 ---
if __name__ == "__main__":
    visualize_postprocessed_skeletons(FINAL_CSV_PATH, VIDEO_PATH, OUTPUT_VIDEO_PATH)

시각화 시작: 최종 보정 데이터를 영상에 그립니다.

시각화 완료! 최종 결과 영상이 'aespa_test_visualization_ver2.mp4'에 저장되었습니다.


In [4]:
import cv2  # OpenCV 라이브러리
import imageio
from PIL import Image
import numpy as np

def create_gif_with_opencv(video_path, output_path, start_sec, end_sec, fps=10, resize_factor=0.5):
    """
    OpenCV와 ImageIO를 사용해 동영상의 특정 구간을 GIF로 변환합니다.
    """
    print("🆕 새로운 방법(OpenCV + ImageIO)으로 변환을 시작합니다.")
    try:
        # 동영상 파일 열기
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            print(f"🚨 오류: '{video_path}' 동영상 파일을 열 수 없습니다.")
            return

        # 동영상의 원본 FPS와 프레임 번호 계산
        original_fps = cap.get(cv2.CAP_PROP_FPS)
        start_frame = int(start_sec * original_fps)
        end_frame = int(end_sec * original_fps)

        # 변환할 프레임들을 저장할 리스트
        frames = []

        # 동영상의 특정 위치(시작 프레임)로 이동
        cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
        
        print(f"🖼️ {start_frame}번 부터 {end_frame}번 프레임까지 읽는 중...")
        
        current_frame = start_frame
        while current_frame < end_frame:
            ret, frame = cap.read()
            if not ret:
                break # 프레임을 더 이상 읽을 수 없으면 중단

            # OpenCV는 색상 채널이 BGR 순서이므로, 일반적인 RGB 순서로 변경
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

            # Pillow를 사용하여 이미지 크기 조절
            pil_img = Image.fromarray(frame_rgb)
            original_width, original_height = pil_img.size
            new_size = (int(original_width * resize_factor), int(original_height * resize_factor))
            
            # Pillow 10.0.0 이후 버전을 위한 Resampling.LANCZOS 사용
            resized_img = pil_img.resize(new_size, Image.Resampling.LANCZOS)

            frames.append(np.array(resized_img))
            current_frame += 1

        # 리소스 해제
        cap.release()
        
        if not frames:
            print("🚨 오류: 변환할 프레임을 읽지 못했습니다. 시간 설정을 확인하세요.")
            return

        print(f"✨ GIF 파일 생성 중... (FPS: {fps})")
        # ImageIO를 사용해 GIF 저장
        imageio.mimsave(output_path, frames, fps=fps)
        
        print(f"✅ 성공! GIF 파일이 다음 경로에 저장되었습니다: {output_path}")

    except Exception as e:
        print(f"🚨 예상치 못한 오류가 발생했습니다: {e}")

In [12]:
# 1. 변환할 동영상 경로 (파일이 노트북과 같은 폴더에 있다면 파일명만 적어도 됩니다)
video_path = "data/aespa_test_visualization_ver1.mp4"

# 2. 저장할 GIF 파일명
output_path = "data/result.gif"

# 3. 시작 시간 (초)
start_sec = 15

# 4. 종료 시간 (초)
end_sec = 18

# 5. (선택) 초당 프레임 수 (FPS)
fps = 8

# 6. (선택) 크기 조절 비율 (0.5 = 50% 크기)
resize_factor = 0.2

# --- 위 값들을 설정한 후 아래 함수를 실행하세요 ---
create_gif_with_opencv(video_path, output_path, start_sec, end_sec, fps, resize_factor)

🆕 새로운 방법(OpenCV + ImageIO)으로 변환을 시작합니다.
🖼️ 345번 부터 414번 프레임까지 읽는 중...
✨ GIF 파일 생성 중... (FPS: 8)
✅ 성공! GIF 파일이 다음 경로에 저장되었습니다: data/result.gif


In [3]:
!pip install opencv-python imageio Pillow

Collecting opencv-python
  Downloading opencv_python-4.12.0.88-cp37-abi3-win_amd64.whl.metadata (19 kB)
Collecting numpy<2.3.0,>=2 (from opencv-python)
  Using cached numpy-2.2.6-cp311-cp311-win_amd64.whl.metadata (60 kB)
Downloading opencv_python-4.12.0.88-cp37-abi3-win_amd64.whl (39.0 MB)
   ---------------------------------------- 0.0/39.0 MB ? eta -:--:--
   -- ------------------------------------- 2.4/39.0 MB 11.2 MB/s eta 0:00:04
   ---- ----------------------------------- 4.7/39.0 MB 11.4 MB/s eta 0:00:04
   ------- -------------------------------- 7.1/39.0 MB 11.5 MB/s eta 0:00:03
   --------- ------------------------------ 9.4/39.0 MB 11.5 MB/s eta 0:00:03
   ----------- ---------------------------- 10.7/39.0 MB 10.8 MB/s eta 0:00:03
   ------------- -------------------------- 12.8/39.0 MB 10.5 MB/s eta 0:00:03
   --------------- ------------------------ 15.5/39.0 MB 10.7 MB/s eta 0:00:03
   ------------------ --------------------- 17.8/39.0 MB 10.7 MB/s eta 0:00:02
   -------

  You can safely remove it manually.
  You can safely remove it manually.
ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
gensim 4.3.3 requires numpy<2.0,>=1.18.5, but you have numpy 2.2.6 which is incompatible.


In [1]:
import cv2
import numpy as np

def clip_video_to_mp4_h264(video_path, output_path, start_sec, end_sec, fps=30, resize_factor=1.0):
    """
    OpenCV를 사용해 동영상을 H.264 코덱의 MP4 파일로 저장합니다.
    """
    print("🆕 H.264 코덱 MP4 파일로 잘라내기를 시작합니다.")
    try:
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            print(f"🚨 오류: '{video_path}' 동영상 파일을 열 수 없습니다.")
            return

        original_fps = cap.get(cv2.CAP_PROP_FPS)
        original_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        original_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        
        new_width = int(original_width * resize_factor)
        new_height = int(original_height * resize_factor)

        # ▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼ 이 부분 변경 ▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼
        # 가장 호환성이 좋은 H.264 코덱('avc1')을 사용하도록 설정합니다.
        fourcc = cv2.VideoWriter_fourcc(*'avc1')
        # ▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲

        out = cv2.VideoWriter(output_path, fourcc, fps, (new_width, new_height))

        start_frame = int(start_sec * original_fps)
        end_frame = int(end_sec * original_fps)
        
        cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
        
        print(f"🎞️  {start_frame}번 부터 {end_frame}번 프레임까지 처리 중...")
        
        current_frame = start_frame
        while current_frame < end_frame:
            ret, frame = cap.read()
            if not ret:
                break

            if resize_factor != 1.0:
                resized_frame = cv2.resize(frame, (new_width, new_height), interpolation=cv2.INTER_AREA)
            else:
                resized_frame = frame
            
            out.write(resized_frame)
            current_frame += 1

        cap.release()
        out.release()
        
        print(f"✅ 성공! H.264 MP4 파일이 다음 경로에 저장되었습니다: {output_path}")

    except Exception as e:
        print(f"🚨 예상치 못한 오류가 발생했습니다: {e}")

In [None]:
# 저장할 파일명을 .mp4로 지정합니다.
video_path = "data/aespa_test_output.mp4"
output_path = "data/result_clip.mp4"

# 10초부터 15초까지 5초 분량을 잘라냅니다.
start_sec = 8
end_sec = 12

# 출력 동영상의 FPS와 크기 조절
fps = 24
resize_factor = 0.5 # 원본의 80% 크기로 약간 줄이기

# MP4 생성 함수 호출
clip_video_to_mp4_h264(video_path, output_path, start_sec, end_sec, fps, resize_factor)

🆕 H.264 코덱 MP4 파일로 잘라내기를 시작합니다.
🎞️  184번 부터 276번 프레임까지 처리 중...
✅ 성공! H.264 MP4 파일이 다음 경로에 저장되었습니다: data/result_clip_2.mp4
