In [None]:
!pip install mediapipe

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import cv2
import mediapipe as mp
import numpy as np
import os
import json
from concurrent.futures import ThreadPoolExecutor

# Mediapipe 초기화
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(
    static_image_mode=False,
    model_complexity=2,
    smooth_landmarks=False,
    enable_segmentation=False,
    min_detection_confidence=0.85,
    min_tracking_confidence=0.85
)

# 주요 Keypoint 인덱스
KEYPOINTS_OF_INTEREST = [
    11, 12,  # Shoulders
    13, 14, 15, 16,
    23, 24,  # Hips
    25, 26,  # Knees
    27, 28   # Ankles
]

def extract_keypoints(frame):
    """주요 키포인트 추출"""
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    results = pose.process(frame_rgb)

    if results.pose_landmarks:
        keypoints = []
        for i in KEYPOINTS_OF_INTEREST:
            landmark = results.pose_landmarks.landmark[i]
            keypoints.extend([landmark.x, landmark.y, landmark.z])
        return keypoints
    return None

def process_frames(frame_batch):
    """프레임 배치를 처리하여 키포인트를 추출"""
    data = []
    for frame in frame_batch:
        keypoints = extract_keypoints(frame)
        if keypoints is not None:
            data.append(keypoints)
    return data

def process_video_in_batches(video_path, action, batch_size=100, num_threads=10):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error: Unable to open video {video_path}")
        return

    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    print(f"Total frames in video: {total_frames}")

    frame_batches = []  # 배치 저장
    batch = []
    frame_count = 0

    # 프레임을 읽어 배치로 나눔
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        batch.append(frame)
        frame_count += 1

        if len(batch) == batch_size or frame_count == total_frames:
            frame_batches.append(batch)
            batch = []

    cap.release()

    # ThreadPoolExecutor로 병렬 처리
    with ThreadPoolExecutor(max_workers=num_threads) as executor:
        results = executor.map(process_frames, frame_batches)

    # 모든 결과를 병합
    all_data = []
    for batch_data in results:
        all_data.extend(batch_data)


    save_path = os.path.join('/content/NewDataset', action)
    # 데이터 저장
    os.makedirs(save_path, exist_ok=True)
    file_name = os.path.join(save_path, f'{action}.json')

    # 기존 데이터 읽기
    if os.path.exists(file_name):
        with open(file_name, 'r') as json_file:
            try:
                existing_data = json.load(json_file)
            except json.JSONDecodeError:
                print(f"Warning: Failed to decode JSON from {file_name}. Starting with empty data.")
                existing_data = []
    else:
        existing_data = []

    # 기존 데이터와 병합
    combined_data = existing_data + all_data

    # 최종 데이터 저장
    with open(file_name, 'w') as json_file:
        json.dump(combined_data, json_file)

    print(f"Data saved at {save_path}")
    print(f"Total keypoints extracted: {len(combined_data)}")

In [None]:
video_path = '/content/walk.mp4'
action = "walk"
process_video_in_batches(video_path, action, batch_size=64, num_threads=5)

In [None]:
import json
import os
import numpy as np

def sample_data(data, target_size=5000):
    indices = np.random.choice(len(data), target_size, replace=False)
    return [data[i] for i in indices]

def adjust_dataset_size_and_label(input_file, output_directory, target_size=5000):
    with open(input_file, 'r') as f:
        data = json.load(f)

    label = os.path.splitext(os.path.basename(input_file))[0]
    labeled_data = [{'features': item, 'label': label} for item in data]

    adjusted_data = sample_data(labeled_data, target_size)

    output_file = os.path.join(output_directory, os.path.basename(input_file))
    with open(output_file, 'w') as f:
        json.dump(adjusted_data, f, indent=2)

    print(f"Processed {input_file} and saved adjusted data to {output_file}")

# 경로 설정
input_file = '/content/Datasets/bow.json'  # 수정할 JSON 파일 경로
output_directory = '/content/Datasets/'

# 출력 디렉토리가 없으면 생성
os.makedirs(output_directory, exist_ok=True)

# 데이터셋 크기 조정 및 라벨 추가 후 새로운 디렉토리에 저장
adjust_dataset_size_and_label(input_file, output_directory)


Processed /content/Datasets/bow.json and saved adjusted data to /content/Datasets/bow.json


In [None]:
import json

# JSON 파일에서 쌍의 개수를 세는 함수
def count_pairs_in_json_file(file_path):
    try:
        with open(file_path, 'r') as f:
            data = json.load(f)
        if isinstance(data, list):
            pair_count = len(data)
            print(f"JSON 파일의 feature, label 쌍의 총 개수: {pair_count}")
        else:
            print("JSON 데이터의 형식이 올바르지 않습니다. 리스트 형태여야 합니다.")
    except json.JSONDecodeError as e:
        print(f"JSONDecodeError: {e}")
    except Exception as e:
        print(f"파일을 읽는 중 오류 발생: {e}")

# 파일 경로 설정
file_path = '/content/Datasets/bow.json'
count_pairs_in_json_file(file_path)


JSON 파일의 feature, label 쌍의 총 개수: 5000


In [None]:
import json

# JSON 파일을 단계별로 읽고 리스트 크기를 확인하는 함수입니다.
def inspect_json_file(file_path):
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            data = f.read()  # 파일 내용을 문자열로 읽기
        print(f"File content (first 1000 characters):\n{data[:1000]}")  # 파일의 처음 1000자 출력

        # 문자열을 JSON으로 파싱합니다.
        json_data = json.loads(data)
        print("Successfully parsed JSON data.")

        # 최상위 리스트의 크기를 확인합니다.
        if isinstance(json_data, list):
            print(f"Top-level list size: {len(json_data)}")
        else:
            print("The top-level element is not a list.")

    except json.JSONDecodeError as e:
        print(f"JSONDecodeError: {e}")
    except Exception as e:
        print(f"Other error: {e}")

# 파일 경로 설정
file_path = '/content/bow.json'
inspect_json_file(file_path)


In [None]:
import json

# JSON 파일을 단계별로 읽어 봅니다.
def inspect_json_file(file_path):
    try:
        with open(file_path, 'r') as f:
            data = f.read()  # 파일 내용을 문자열로 읽기
        print(f"File content (first 1000 characters):\n{data[:1000]}")  # 파일의 처음 1000자 출력
        json_data = json.loads(data)  # 문자열을 JSON으로 파싱
        print("Successfully parsed JSON data.")
    except json.JSONDecodeError as e:
        print(f"JSONDecodeError: {e}")
    except Exception as e:
        print(f"Other error: {e}")

# 파일 경로 설정
file_path = '/content/Datasets/bow.json'
inspect_json_file(file_path)
