In [8]:
import mediapipe as mp
import cv2
import numpy as np
import os
from tqdm import tqdm

In [9]:
mp_holistic = mp.solutions.holistic

def extract_keypoints_from_video(video_path, max_frames=30):
    cap = cv2.VideoCapture(video_path)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
    # Nếu video quá ngắn thì vẫn đảm bảo có đủ 30 frame bằng cách pad
    if total_frames == 0:
        print(f"⚠️ Video {video_path} bị lỗi hoặc không đọc được.")
        return np.zeros((max_frames, 75, 3), dtype=np.float32)
    
    # Chọn 30 vị trí frame rải đều trong toàn video
    frame_indices = np.linspace(0, total_frames - 1, max_frames, dtype=int)

    holistic = mp_holistic.Holistic(static_image_mode=False,
                                    model_complexity=1,
                                    min_detection_confidence=0.5,
                                    min_tracking_confidence=0.5)
    keypoints = []

    for idx in frame_indices:
        cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
        ret, frame = cap.read()
        if not ret:
            # Nếu không đọc được frame, pad bằng 0
            keypoints.append(np.zeros((75, 3), dtype=np.float32))
            continue

        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = holistic.process(frame_rgb)

        # --- Pose (33 điểm) ---
        pose_points = np.zeros((33, 3), dtype=np.float32)
        if results.pose_landmarks:
            for i, lm in enumerate(results.pose_landmarks.landmark):
                pose_points[i] = [lm.x, lm.y, lm.z]

        # --- Tay trái (21 điểm) ---
        left_hand = np.zeros((21, 3), dtype=np.float32)
        if results.left_hand_landmarks:
            for i, lm in enumerate(results.left_hand_landmarks.landmark):
                left_hand[i] = [lm.x, lm.y, lm.z]

        # --- Tay phải (21 điểm) ---
        right_hand = np.zeros((21, 3), dtype=np.float32)
        if results.right_hand_landmarks:
            for i, lm in enumerate(results.right_hand_landmarks.landmark):
                right_hand[i] = [lm.x, lm.y, lm.z]

        # --- Gộp lại thành 75 điểm ---
        frame_keypoints = np.concatenate([pose_points, left_hand, right_hand], axis=0)
        keypoints.append(frame_keypoints)

    cap.release()
    holistic.close()

    keypoints = np.array(keypoints, dtype=np.float32)

    # Nếu video ngắn hơn 30 frame (trường hợp hiếm), pad thêm cho đủ
    if len(keypoints) < max_frames:
        pad = np.zeros((max_frames - len(keypoints), 75, 3), dtype=np.float32)
        keypoints = np.concatenate([keypoints, pad], axis=0)

    return keypoints

In [10]:
# def augment_skeleton_sequence(skeleton_sequence,
#                               rotation_range=(-15, 15),
#                               scale_range=(0.9, 1.1),
#                               translation_range=(-0.05, 0.05),
#                               noise_std=0.005,
#                               temporal_jitter=3):
#     """
#     Tăng cường dữ liệu cho chuỗi skeleton (frames, joints, 3)

#     Thực hiện:
#         - Xoay (rotation)
#         - Phóng to / thu nhỏ (scaling)
#         - Tịnh tiến (translation)
#         - Thêm nhiễu Gaussian (noise)
#         - Temporal jitter (dịch khung thời gian)

#     Trả về:
#         augmented_sequence: ndarray cùng shape (frames, joints, 3)
#     """
#     seq = skeleton_sequence.copy()
#     num_frames, num_joints, _ = seq.shape

#     # --- 1. Rotation ---
#     angle = np.deg2rad(np.random.uniform(*rotation_range))
#     rotation_matrix = np.array([
#         [np.cos(angle), -np.sin(angle), 0],
#         [np.sin(angle),  np.cos(angle), 0],
#         [0, 0, 1]
#     ])
#     seq = np.einsum('ij,tkj->tki', rotation_matrix, seq)

#     # --- 2. Scaling ---
#     scale = np.random.uniform(*scale_range)
#     seq *= scale

#     # --- 3. Translation ---
#     tx = np.random.uniform(*translation_range)
#     ty = np.random.uniform(*translation_range)
#     tz = np.random.uniform(*translation_range)
#     translation = np.array([tx, ty, tz])
#     seq += translation

#     # --- 4. Noise injection ---
#     noise = np.random.normal(0, noise_std, seq.shape)
#     seq += noise

#     # --- 5. Temporal jitter ---
#     # Dịch chỉ số frame lên/xuống một lượng nhỏ ngẫu nhiên
#     shift = np.random.randint(-temporal_jitter, temporal_jitter + 1)
#     if shift > 0:
#         seq = np.concatenate([seq[shift:], np.tile(seq[-1], (shift, 1, 1))], axis=0)
#     elif shift < 0:
#         seq = np.concatenate([np.tile(seq[0], (-shift, 1, 1)), seq[:shift]], axis=0)

#     return seq

In [11]:
def rotate_sequence(seq, rotation_range=(-15, 15)):
    angle = np.deg2rad(np.random.uniform(*rotation_range))
    R = np.array([[np.cos(angle), -np.sin(angle), 0],
                  [np.sin(angle),  np.cos(angle), 0],
                  [0, 0, 1]])
    return np.einsum('ij,tkj->tki', R, seq)

def scale_sequence(seq, scale_range=(0.9, 1.1)):
    s = np.random.uniform(*scale_range)
    return seq * s

def translate_sequence(seq, translation_range=(-0.05, 0.05)):
    t = np.random.uniform(*translation_range, size=(1, 1, 3))
    return seq + t

def add_noise(seq, noise_std=0.003):
    return seq + np.random.normal(0, noise_std, seq.shape)

def temporal_jitter(seq, jitter=3):
    shift = np.random.randint(-jitter, jitter + 1)
    if shift > 0:
        seq = np.concatenate([seq[shift:], np.tile(seq[-1], (shift, 1, 1))], axis=0)
    elif shift < 0:
        seq = np.concatenate([np.tile(seq[0], (-shift, 1, 1)), seq[:shift]], axis=0)
    return seq


In [12]:
def augment_skeleton_sequence(seq, min_aug=1, max_aug=3):
    funcs = [rotate_sequence, scale_sequence, translate_sequence, add_noise, temporal_jitter]
    probs = [0.6, 0.7, 0.5, 0.5, 0.6]

    applied_funcs = [f for f, p in zip(funcs, probs) if np.random.rand() < p]

    if len(applied_funcs) > max_aug:
        applied_funcs = list(np.random.choice(applied_funcs, max_aug, replace=False))

    if len(applied_funcs) < min_aug:
        applied_funcs.append(np.random.choice(funcs))

    for f in applied_funcs:
        seq = f(seq)
    return seq


In [13]:
def process_dataset(video_root, output_dir):
    os.makedirs(output_dir, exist_ok=True)
    for label_name in os.listdir(video_root):
        label_path = os.path.join(video_root, label_name)
        if not os.path.isdir(label_path):
            continue
        label_id = int(label_name.split('_')[-1])
        out_label_dir = os.path.join(output_dir, label_name)
        os.makedirs(out_label_dir, exist_ok=True)

        for vid in tqdm(os.listdir(label_path), desc=f"Processing {label_name}"):
            if not vid.endswith(".mp4"):
                continue
            vid_path = os.path.join(label_path, vid)
            data = extract_keypoints_from_video(vid_path)
            out_file = os.path.join(out_label_dir, label_name + "_" + vid.replace('.mp4', '_00.npy'))
            np.save(out_file, {'keypoints': data, 'label': label_id})

            # Tăng cường dữ liệu
            for i in range(1, 10):
                augmented = augment_skeleton_sequence(data)
                out_file_aug = os.path.join(out_label_dir, label_name + "_" + vid.replace('.mp4', f'_{i:02d}.npy'))
                np.save(out_file_aug, {'keypoints': augmented, 'label': label_id})

In [14]:
process_dataset("vid", "data_03")

Processing sign_058: 100%|██████████| 8/8 [00:20<00:00,  2.61s/it]
