In [6]:
import pandas as pd
import numpy as np
import os
import cv2
import matplotlib.pyplot as plt
import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision

In [7]:
fight_datapath = './indoor/'
fight_train_video_names = pd.read_csv('./fight_train_video_names_list.csv')['names'].to_list()
fight_val_video_names = pd.read_csv('./fight_val_video_names_list.csv')['names'].to_list()
fight_train_xml_names = os.listdir(fight_datapath+'Training'+'/02.라벨링데이터')
fight_val_xml_names = os.listdir(fight_datapath+'Validation'+'/02.라벨링데이터')

normal_datapath = './indoor_normal/'
normal_train_video_names = os.listdir(normal_datapath+'Training'+'/01.원천데이터')
normal_val_video_names = os.listdir(normal_datapath+'Validation'+'/01.원천데이터')

In [8]:
fight_train_video_names = list(map(lambda x:'./indoor/Training/01.원천데이터/'+x, fight_train_video_names))
fight_val_video_names = list(map(lambda x:'./indoor/Training/01.원천데이터/'+x, fight_val_video_names))
normal_train_video_names = list(map(lambda x:'./indoor_normal/Training/01.원천데이터/'+x, normal_train_video_names))
normal_val_video_names = list(map(lambda x:'./indoor_normal/Training/01.원천데이터/'+x, normal_val_video_names))

In [9]:
# 경로 설정
video_path = fight_train_video_names[0]  # <- 사용자의 영상 경로
model_path = "pose_landmarker_full.task"  # <- .task 파일 다운로드 필요

# 모델 옵션 구성
options = vision.PoseLandmarkerOptions(
    base_options=python.BaseOptions(model_asset_path=model_path),
    running_mode=vision.RunningMode.VIDEO
)
landmarker = vision.PoseLandmarker.create_from_options(options)

# 비디오 열기
cap = cv2.VideoCapture(video_path)
fps = 3  # 3 FPS 기준
frame_interval = int(cap.get(cv2.CAP_PROP_FPS) // fps)
frame_index = 0
pose_results = []

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break
    if frame_index % frame_interval != 0:
        frame_index += 1
        continue

    # 전처리
    frame = cv2.resize(frame, (640, 360))
    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb_frame)

    # 타임스탬프 (ms)
    timestamp_ms = int(frame_index * (1000 / fps))

    # 추론
    result = landmarker.detect_for_video(mp_image, timestamp_ms)

    # 추출
    frame_landmarks = []
    if result.pose_landmarks:
        for lm in result.pose_landmarks[0]:
            frame_landmarks.extend([lm.x, lm.y, lm.visibility])
    else:
        frame_landmarks = [0.0] * (33 * 3)

    pose_results.append([frame_index] + frame_landmarks)
    frame_index += 1

cap.release()

# 컬럼명 설정
columns = ['frame']
for i in range(33):
    columns.extend([f"x_{i}", f"y_{i}", f"vis_{i}"])

# 저장
df = pd.DataFrame(pose_results, columns=columns)
df.to_csv("pose_results_3fps.csv", index=False)


I0000 00:00:1752110478.286128  804359 gl_context_egl.cc:85] Successfully initialized EGL. Major : 1 Minor: 5
I0000 00:00:1752110478.307963  804707 gl_context.cc:369] GL version: 3.2 (OpenGL ES 3.2 NVIDIA 550.127.05), renderer: NVIDIA RTX A6000/PCIe/SSE2
W0000 00:00:1752110478.384686  804720 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1752110478.433520  804729 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.


In [10]:
marks = pd.read_csv('./pose_results_3fps.csv')

In [11]:
# # 1. 시각화 유틸
# mp_pose = mp.solutions.pose
# connections = mp_pose.POSE_CONNECTIONS  # 스켈레톤 연결 정보
# landmark_names = [lm.name for lm in mp_pose.PoseLandmark]

# # 원본 프레임 불러오기
# cap = cv2.VideoCapture(video_path)
# ret, frame = cap.read()
# h, w, _ = frame.shape

# # 3. 시각화 수행
# for i, lm in enumerate(marks):
#     cx, cy = int(lm['x'] * w), int(lm['y'] * h)
#     cv2.circle(frame, (cx, cy), 4, (0, 255, 0), -1)
#     cv2.putText(frame, landmark_names[i], (cx + 4, cy - 4),
#                 cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1)

# # 연결선 그리기
# for start_idx, end_idx in connections:
#     pt1 = marks[start_idx]
#     pt2 = marks[end_idx]
#     x1, y1 = int(pt1['x'] * w), int(pt1['y'] * h)
#     x2, y2 = int(pt2['x'] * w), int(pt2['y'] * h)
#     cv2.line(frame, (x1, y1), (x2, y2), (0, 255, 255), 2)

# # 4. 결과 보기 (Jupyter에서는 matplotlib 사용)
# frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# plt.figure(figsize=(8, 6))
# plt.imshow(frame_rgb)
# plt.axis('off')
# plt.title("Skeleton Only Visualization")
# plt.show()


In [12]:
def keyPointstoCSV(video_path, save_path):
    # 경로 설정
    model_path = "pose_landmarker_full.task"  # <- .task 파일 다운로드 필요

    # 모델 옵션 구성
    options = vision.PoseLandmarkerOptions(
        base_options=python.BaseOptions(model_asset_path=model_path),
        running_mode=vision.RunningMode.VIDEO
    )
    landmarker = vision.PoseLandmarker.create_from_options(options)

    # 비디오 열기
    cap = cv2.VideoCapture(video_path)
    fps = 3  # 3 FPS 기준
    frame_interval = int(cap.get(cv2.CAP_PROP_FPS) // fps)
    frame_index = 0
    pose_results = []

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        if frame_index % frame_interval != 0:
            frame_index += 1
            continue

        # 전처리
        frame = cv2.resize(frame, (640, 360))
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb_frame)

        # 타임스탬프 (ms)
        timestamp_ms = int(frame_index * (1000 / fps))

        # 추론
        result = landmarker.detect_for_video(mp_image, timestamp_ms)

        # 추출
        frame_landmarks = []
        if result.pose_landmarks:
            for lm in result.pose_landmarks[0]:
                frame_landmarks.extend([lm.x, lm.y, lm.visibility])
        else:
            frame_landmarks = [0.0] * (33 * 3)

        pose_results.append([frame_index] + frame_landmarks)
        frame_index += 1

    cap.release()

    # 컬럼명 설정
    columns = ['frame']
    for i in range(33):
        columns.extend([f"x_{i}", f"y_{i}", f"vis_{i}"])

    # 저장
    df = pd.DataFrame(pose_results, columns=columns)
    filename = video_path.split('/')[-1]
    df.to_csv(f"{save_path}/{filename}.csv", index=False)

In [None]:
for video in normal_train_video_names:
    if video.endswith('.mp4'):
        keyPointstoCSV(video, './pointsCSV/train/normal')

I0000 00:00:1752110489.478163  804359 gl_context_egl.cc:85] Successfully initialized EGL. Major : 1 Minor: 5
I0000 00:00:1752110489.498725  804897 gl_context.cc:369] GL version: 3.2 (OpenGL ES 3.2 NVIDIA 550.127.05), renderer: NVIDIA RTX A6000/PCIe/SSE2
W0000 00:00:1752110489.562249  804910 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1752110489.602260  804912 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
I0000 00:00:1752110532.384996  804359 gl_context_egl.cc:85] Successfully initialized EGL. Major : 1 Minor: 5
I0000 00:00:1752110532.407483  805073 gl_context.cc:369] GL version: 3.2 (OpenGL ES 3.2 NVIDIA 550.127.05), renderer: NVIDIA RTX A6000/PCIe/SSE2
W0000 00:00:1752110532.480636  805086 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signatu

In [None]:
for video in normal_val_video_names:
    if video.endswith('.mp4'):
        keyPointstoCSV(video, './pointsCSV/val/normal')

In [None]:
for video in fight_train_video_names:
    if video.endswith('.mp4'):
        keyPointstoCSV(video, './pointsCSV/train/fight')

In [None]:
for video in fight_val_video_names:
    if video.endswith('.mp4'):
        keyPointstoCSV(video, './pointsCSV/val/fight')