In [3]:
import os
import sys
import numpy as np
import imageio.v3 as iio
import mediapipe as mp
from datasets import load_dataset
from tqdm import tqdm

if sys.platform == 'win32':
    os.environ["PATH"] += os.pathsep + r"C:\Users\Admin\AppData\Local\Microsoft\WinGet\Packages\Gyan.FFmpeg.Shared_Microsoft.Winget.Source_8wekyb3d8bbwe\ffmpeg-6.1.1-full_build-shared\bin"
    from torchaudio._extension.utils import _init_dll_path
    _init_dll_path()  # Đảm bảo load DLL từ PATH
import torchcodec
print(torchcodec.__version__)

# --- 1. Khởi tạo các công cụ MediaPipe ---
print("Khởi tạo MediaPipe Holistic...")
mp_holistic = mp.solutions.holistic
mp_drawing = mp.solutions.drawing_utils

# *** ĐỊNH NGHĨA Kiểu VẼ MÀU XANH LÁ ***
# Đây là mấu chốt: chúng ta sẽ vẽ keypoint MỚI bằng màu xanh
# BGR color: (0, 255, 0) là màu xanh lá
drawing_spec_green = mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=1, circle_radius=1)
drawing_spec_green_connect = mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=1)


# --- 2. Tải video mẫu từ dataset ---
print("Đang tải dataset (sử dụng cache nếu có)...")
ds = load_dataset("asthalochan/American_Sign_Language")
video_item = ds['train'][1] # Lấy video đầu tiên để test
video_tensor = video_item['video'][:]

print("Đã lấy video mẫu.")

# --- 3. Chuẩn bị video (Tensor -> NumPy) ---
# Chuyển (T, C, H, W) sang (T, H, W, C)
video_np = video_tensor.numpy().transpose(0, 2, 3, 1)

# Đảm bảo video là uint8 (0-255)
if video_np.dtype != np.uint8:
    print("Chuyển đổi video sang uint8...")
    video_np_uint8 = (video_np * 255).astype(np.uint8)
else:
    video_np_uint8 = video_np

print(f"Video sẵn sàng để xử lý, shape: {video_np_uint8.shape}")

# --- 4. Xử lý video và VẼ ĐÈ keypoint MỚI (màu xanh) ---
output_frames = [] # Nơi lưu các frame đã được vẽ

print("Bắt đầu xử lý video và vẽ keypoint MỚI (màu xanh lá)...")
with mp_holistic.Holistic(
    static_image_mode=False,
    min_detection_confidence=0.5,
    min_tracking_confidence=0.5) as holistic:

    # Dùng tqdm để xem tiến trình
    for frame in tqdm(video_np_uint8, desc="Đang xử lý frames"):

        # MediaPipe cần frame ở dạng C-contiguous và read-only
        # (frame.copy() giải quyết cả hai)
        # Đồng thời, chúng ta cần một bản sao để vẽ lên
        annotated_frame = frame.copy()

        # Xử lý frame (MediaPipe nhận RGB, video của bạn đã là RGB)
        results = holistic.process(frame)

        # BẮT ĐẦU VẼ ĐÈ (MÀU XANH)

        # 1. Vẽ Pose (thân)
        mp_drawing.draw_landmarks(
            image=annotated_frame,
            landmark_list=results.pose_landmarks,
            connections=mp_holistic.POSE_CONNECTIONS,
            landmark_drawing_spec=drawing_spec_green,
            connection_drawing_spec=drawing_spec_green_connect)

        # 2. Vẽ Face (mặt)
        # Chúng ta bỏ qua phần lưới (tesselation) cho đỡ rối
        mp_drawing.draw_landmarks(
            image=annotated_frame,
            landmark_list=results.face_landmarks,
            connections=mp_holistic.FACEMESH_CONTOURS, # Chỉ vẽ viền
            landmark_drawing_spec=drawing_spec_green,
            connection_drawing_spec=drawing_spec_green_connect)

        # 3. Vẽ Tay Trái
        mp_drawing.draw_landmarks(
            image=annotated_frame,
            landmark_list=results.left_hand_landmarks,
            connections=mp_holistic.HAND_CONNECTIONS,
            landmark_drawing_spec=drawing_spec_green,
            connection_drawing_spec=drawing_spec_green_connect)

        # 4. Vẽ Tay Phải
        mp_drawing.draw_landmarks(
            image=annotated_frame,
            landmark_list=results.right_hand_landmarks,
            connections=mp_holistic.HAND_CONNECTIONS,
            landmark_drawing_spec=drawing_spec_green,
            connection_drawing_spec=drawing_spec_green_connect)

        # Thêm frame đã được vẽ vào danh sách
        output_frames.append(annotated_frame)

print("Xử lý hoàn tất!")

# --- 5. Lưu video kết quả ---
output_filename = "TEST_mediapipe_vs_data.mp4"
print(f"Đang lưu video kết quả vào file: {output_filename}...")

iio.imwrite(
    output_filename,
    output_frames,
    fps=25, # Giả định 25 FPS
    codec='libx264'
)

print(f"Đã lưu thành công! Hãy mở file '{output_filename}' để kiểm tra.")

0.8.0
Khởi tạo MediaPipe Holistic...
Đang tải dataset (sử dụng cache nếu có)...
Đã lấy video mẫu.
Video sẵn sàng để xử lý, shape: (76, 480, 640, 3)
Bắt đầu xử lý video và vẽ keypoint MỚI (màu xanh lá)...


Đang xử lý frames: 100%|██████████| 76/76 [00:03<00:00, 24.82it/s]


Xử lý hoàn tất!
Đang lưu video kết quả vào file: TEST_mediapipe_vs_data.mp4...
Đã lưu thành công! Hãy mở file 'TEST_mediapipe_vs_data.mp4' để kiểm tra.


In [7]:
import cv2
import mediapipe as mp
import numpy as np

# --- 1. Khởi tạo các công cụ MediaPipe ---
mp_holistic = mp.solutions.holistic
mp_drawing = mp.solutions.drawing_utils

# Định nghĩa kiểu vẽ (màu xanh lá)
drawing_spec_green = mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=1, circle_radius=1)
drawing_spec_green_connect = mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=1)

# --- 2. Định nghĩa đường dẫn file ---
video_path = "C:\\Users\\Admin\\Documents\\HUST\\2025.1\\Project3\\Code\\Video_Internet\\hello.mp4"
output_video_path = "hello_skeleton.mp4"

print(f"Bắt đầu trực quan hóa cho video: {video_path}")

# --- 3. Đọc video đầu vào ---
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
    print(f"Lỗi: Không thể mở file video: {video_path}")
else:
    # Lấy thông số video (chiều rộng, cao, fps) để ghi file mới
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)

    # --- 4. Tạo đối tượng Ghi Video (VideoWriter) ---
    fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Bộ mã hóa cho .mp4
    writer = cv2.VideoWriter(output_video_path, fourcc, fps, (frame_width, frame_height))

    print(f"Đang xử lý và lưu kết quả vào: {output_video_path}")

    # --- 5. Vòng lặp xử lý từng frame ---
    with mp_holistic.Holistic(
        min_detection_confidence=0.5,
        min_tracking_confidence=0.5) as holistic:

        while True:
            ret, frame = cap.read() # Đọc 1 frame (dạng BGR)
            if not ret:
                break # Hết video

            # 1. Chuyển BGR -> RGB cho MediaPipe
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

            # 2. Xử lý
            results = holistic.process(frame_rgb)

            # 3. Vẽ skeleton
            # Chúng ta vẽ trên frame BGR gốc (frame)
            annotated_frame = frame.copy()

            # Vẽ Pose
            mp_drawing.draw_landmarks(
                image=annotated_frame,
                landmark_list=results.pose_landmarks,
                connections=mp_holistic.POSE_CONNECTIONS,
                landmark_drawing_spec=drawing_spec_green,
                connection_drawing_spec=drawing_spec_green_connect)

            # Vẽ Face (chỉ viền)
            mp_drawing.draw_landmarks(
                image=annotated_frame,
                landmark_list=results.face_landmarks,
                connections=mp_holistic.FACEMESH_CONTOURS,
                landmark_drawing_spec=drawing_spec_green,
                connection_drawing_spec=drawing_spec_green_connect)

            # Vẽ Tay Trái
            mp_drawing.draw_landmarks(
                image=annotated_frame,
                landmark_list=results.left_hand_landmarks,
                connections=mp_holistic.HAND_CONNECTIONS,
                landmark_drawing_spec=drawing_spec_green,
                connection_drawing_spec=drawing_spec_green_connect)

            # Vẽ Tay Phải
            mp_drawing.draw_landmarks(
                image=annotated_frame,
                landmark_list=results.right_hand_landmarks,
                connections=mp_holistic.HAND_CONNECTIONS,
                landmark_drawing_spec=drawing_spec_green,
                connection_drawing_spec=drawing_spec_green_connect)

            # 4. Ghi frame đã vẽ vào file video mới
            writer.write(annotated_frame)

    # Giải phóng tài nguyên
    cap.release()
    writer.release()
    cv2.destroyAllWindows()
    print("\nHoàn tất trực quan hóa!")

Bắt đầu trực quan hóa cho video: C:\Users\Admin\Documents\HUST\2025.1\Project3\Code\Video_Internet\hello.mp4
Đang xử lý và lưu kết quả vào: hello_skeleton.mp4

Hoàn tất trực quan hóa!


In [8]:
import os
import numpy as np
import mediapipe as mp
from datasets import load_dataset
from tqdm import tqdm
import warnings

# --- 1. HÀM TRÍCH XUẤT KEYPOINT (Giống như trước, đã tối ưu) ---
mp_holistic = mp.solutions.holistic

NUM_POSE_LANDMARKS = 33
NUM_FACE_LANDMARKS = 468
NUM_HAND_LANDMARKS = 21
TOTAL_FEATURES = (NUM_POSE_LANDMARKS * 4) + (NUM_FACE_LANDMARKS * 3) + (NUM_HAND_LANDMARKS * 3 * 2) # = 1662

# Mảng zero cố định để dùng khi không phát hiện
ZERO_POSE = np.zeros(NUM_POSE_LANDMARKS * 4)
ZERO_FACE = np.zeros(NUM_FACE_LANDMARKS * 3)
ZERO_HAND = np.zeros(NUM_HAND_LANDMARKS * 3)
ZERO_FRAME_KEYPOINTS = np.concatenate([ZERO_POSE, ZERO_FACE, ZERO_HAND, ZERO_HAND])

def extract_holistic_keypoints(video_frames_rgb_list):
    """
    Hàm này nhận vào một list các frame RGB (H, W, C)
    và trả về một mảng keypoints (T, 1662).
    """
    all_frame_keypoints = []

    with mp_holistic.Holistic(
        static_image_mode=False,
        min_detection_confidence=0.5,
        min_tracking_confidence=0.5) as holistic:

        for frame in video_frames_rgb_list:
            # Xử lý frame và lấy kết quả
            results = holistic.process(frame)

            # Trích xuất với kiểm tra 'None'
            pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else ZERO_POSE
            face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else ZERO_FACE
            lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else ZERO_HAND
            rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else ZERO_HAND

            # Gộp tất cả (1662 features)
            frame_keypoints = np.concatenate([pose, face, lh, rh])
            all_frame_keypoints.append(frame_keypoints)

    return np.array(all_frame_keypoints)

# --- 2. HÀM CHUẨN BỊ VIDEO TỪ TENSOR ---
def prepare_video_frames(video_tensor):
    """
    Chuyển video tensor từ (T, C, H, W) sang list các frame (H, W, C)
    và đảm bảo là uint8.
    """
    # (T, C, H, W) -> (T, H, W, C)
    video_np = video_tensor.numpy().transpose(0, 2, 3, 1)

    # Đảm bảo là uint8 (0-255)
    if video_np.dtype != np.uint8:
        if np.max(video_np) <= 1.01: # Check nếu là float 0-1
            video_np_uint8 = (video_np * 255).astype(np.uint8)
        else:
            video_np_uint8 = video_np.astype(np.uint8) # Chuyển đổi trực tiếp
    else:
        video_np_uint8 = video_np

    # MediaPipe xử lý tốt nhất với list các frame
    return [frame for frame in video_np_uint8]

# --- 3. SCRIPT CHÍNH ĐỂ XỬ LÝ DATASET ---
def process_huggingface_dataset(ds, output_dir):
    """
    Lặp qua dataset, trích xuất keypoints, và lưu theo định dạng {label}_{index}.npy
    """

    # Tạo thư mục đầu ra
    os.makedirs(output_dir, exist_ok=True)

    # Lấy dataset 'train'
    train_ds = ds['train']

    # Lấy danh sách tên nhãn (ví dụ: 'A', 'B', 'milk', 'hello'...)
    label_names = train_ds.features['label'].names
    print(f"Đã tìm thấy {len(label_names)} nhãn.")

    print(f"Bắt đầu xử lý {len(train_ds)} video...")

    # Lặp qua toàn bộ dataset
    for i in tqdm(range(len(train_ds)), desc="Đang xử lý dataset"):
        try:
            # 1. Lấy thông tin
            item = train_ds[i]
            label_id = item['label']
            label_name = label_names[label_id]
            video_index = i # Sử dụng index 'i'

            # 2. Tạo tên file và đường dẫn
            output_filename = f"{label_name}_{video_index}.npy"
            output_path = os.path.join(output_dir, output_filename)

            # 3. Bỏ qua nếu đã xử lý
            if os.path.exists(output_path):
                continue

            # 4. Lấy video tensor
            video_tensor = item['video'][:]

            # 5. Chuẩn bị frames (Tensor -> List[np.array])
            frames_list = prepare_video_frames(video_tensor)
            if not frames_list:
                warnings.warn(f"Video {i} (nhãn {label_name}) bị rỗng.")
                continue

            # 6. Trích xuất keypoints
            keypoints_data = extract_holistic_keypoints(frames_list)

            # 7. Lưu file .npy
            np.save(output_path, keypoints_data)

        except Exception as e:
            print(f"\n--- LỖI ---")
            print(f"Không thể xử lý video tại index: {i}")
            print(f"Lỗi chi tiết: {e}")
            print(f"-------------\n")

    print("\n=====================================")
    print("HOÀN TẤT XỬ LÝ TOÀN BỘ DỮ LIỆU!")
    print(f"Dữ liệu keypoint đã được lưu tại: {output_dir}")
    print("=====================================")

# --- 4. CHẠY SCRIPT ---
if __name__ == "__main__":
    warnings.simplefilter("always")

    # Thư mục bạn muốn lưu file .npy vào
    KEYPOINT_DATA_ROOT = "Keypoint_Dataset_HF"

    # Tải dataset
    print("Đang tải dataset (sẽ dùng cache nếu có)...")
    raw_ds = load_dataset("asthalochan/American_Sign_Language")

    # Chạy xử lý
    process_huggingface_dataset(raw_ds, KEYPOINT_DATA_ROOT)

Đang tải dataset (sẽ dùng cache nếu có)...
Đã tìm thấy 47 nhãn.
Bắt đầu xử lý 3590 video...


Đang xử lý dataset:   1%|▏         | 47/3590 [02:30<3:09:05,  3.20s/it]

KeyboardInterrupt

