<a href="https://colab.research.google.com/github/LeoDinga/DL_Project/blob/main/extract_keypoints.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install mediapipe opencv-python tqdm --upgrade
import os
# os.kill(os.getpid(), 9)  # Reinicia o runtime para aplicar as mudanças



In [2]:

!git clone https://github.com/LeoDinga/DL_Project.git
%cd DL_Project


fatal: destination path 'DL_Project' already exists and is not an empty directory.
/content/DL_Project


In [4]:
import mediapipe as mp
import cv2
import numpy as np
from tqdm import tqdm
def convert_video_to_npy(video_path, resize_shape=(224, 224)):
    cap = cv2.VideoCapture(video_path)
    frames = []
    if not cap.isOpened():
        raise ValueError(f"Error opening video file: {video_path}")

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frame_resized = cv2.resize(frame, resize_shape)
        frames.append(frame_resized)

    cap.release()
    return np.array(frames)

def create_npy_from_videos_flat(src_dir, npy_dir):
    os.makedirs(npy_dir, exist_ok=True)

    for video_file in os.listdir(src_dir):
        if not video_file.lower().endswith((".mp4", ".avi")):
            continue

        video_path = os.path.join(src_dir, video_file)
        output_path = os.path.join(npy_dir, video_file.replace(".mp4", ".npy").replace(".avi", ".npy"))

        try:
            print(f"Processing: {video_file}")
            frames_array = convert_video_to_npy(video_path)
            np.save(output_path, frames_array)
        except Exception as e:
            print(f"Error processing {video_file}: {e}")


def pad_or_truncate_keypoints(keypoints, target_length=120):
    num_frames = keypoints.shape[0]
    if num_frames < target_length:
        padding = np.zeros((target_length - num_frames, keypoints.shape[1], keypoints.shape[2]))
        return np.concatenate((keypoints, padding), axis=0)
    else:
        return keypoints[:target_length]

def create_npy_from_videos(src_dir, npy_dir):
    os.makedirs(npy_dir, exist_ok=True)
    for action in os.listdir(src_dir):
        action_path = os.path.join(src_dir, action)
        if not os.path.isdir(action_path):
            continue
        dest_action_path = os.path.join(npy_dir, action)
        os.makedirs(dest_action_path, exist_ok=True)
        for video_file in os.listdir(action_path):
            if file.endswith((".avi", ".mp4")):
                video_path = os.path.join(action_path, video_file)
                output_path = os.path.join(dest_action_path, video_file.replace(".avi", ".npy"))
                try:
                    frames_array = convert_video_to_npy(video_path)
                    np.save(output_path, frames_array)
                except Exception as e:
                    print(f"Error processing {video_file}: {e}")

def extract_keypoints_from_npy(npy_dir, save_path="our_keypoints.npz"):
    mp_pose = mp.solutions.pose
    pose = mp_pose.Pose(static_image_mode=True)
    all_keypoints = {}

    for action in tqdm(os.listdir(npy_dir), desc="Processing actions"):
        action_path = os.path.join(npy_dir, action)
        if not os.path.isdir(action_path):
            continue

        all_keypoints[action] = {}
        for video_file in os.listdir(action_path):
            if not video_file.endswith(".npy"):
                continue
            video_path = os.path.join(action_path, video_file)
            try:
                sample = np.load(video_path)
                if sample.ndim != 4 or sample.shape[-1] != 3:
                    continue
                sample = sample.astype(np.uint8)

                video_keypoints = []
                for frame in sample[:]:
                    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                    results = pose.process(frame_rgb)
                    if results.pose_landmarks:
                        keypoints = [[lm.x, lm.y, lm.z] for lm in results.pose_landmarks.landmark]
                    else:
                        keypoints = np.zeros((33, 3)).tolist()
                    video_keypoints.append(keypoints)

                if video_keypoints:
                    kp_array = np.array(video_keypoints)
                    kp_array = pad_or_truncate_keypoints(kp_array, target_length=120)
                    all_keypoints[action][video_file] = kp_array
            except Exception as e:
                print(f"Error with {video_file}: {e}")

    # Transformar em dicionário plano para salvar com np.savez_compressed
    flat_dict = {}
    for action, videos in all_keypoints.items():
        for video_file, arr in videos.items():
            key = f"{action}__{video_file.replace('.npy', '')}"
            flat_dict[key] = arr

    np.savez_compressed(save_path, **flat_dict)
    print(f"Keypoints saved to {save_path}")

def extract_keypoints_from_npy_flat(npy_dir, save_path="our_keypoints.npz"):

  mp_pose = mp.solutions.pose
  pose = mp_pose.Pose(
      static_image_mode=False,
      min_detection_confidence=0.3,
      min_tracking_confidence=0.3
  )

  all_keypoints = {}

  for video_file in tqdm(os.listdir(npy_dir), desc="Processing videos"):
      if not video_file.endswith(".npy"):
          continue
      video_path = os.path.join(npy_dir, video_file)
      try:
          sample = np.load(video_path)
          if sample.ndim != 4 or sample.shape[-1] != 3:
              print(f"Skipping {video_file}, unexpected shape {sample.shape}")
              continue
          sample = sample.astype(np.uint8)

          video_keypoints = []
          for frame in sample:
              frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
              results = pose.process(frame_rgb)
              if results.pose_landmarks:
                  keypoints = [[lm.x, lm.y, lm.z] for lm in results.pose_landmarks.landmark]
              else:
                  keypoints = np.zeros((33, 3)).tolist()
              video_keypoints.append(keypoints)

          if video_keypoints:
              kp_array = np.array(video_keypoints)
              kp_array = pad_or_truncate_keypoints(kp_array, target_length=120)
              kp_array = np.expand_dims(kp_array, axis=1)
              key = video_file.replace(".npy", "")
              all_keypoints[key] = kp_array

      except Exception as e:
          print(f"Error processing {video_file}: {e}")

  np.savez_compressed(save_path, **all_keypoints)
  print(f"Keypoints saved to {save_path}")


# if __name__ == "__main__":
#     # Clone dataset se necessário
#     if not os.path.exists("dataset"):
#         os.system("git clone --filter=blob:none --no-checkout https://github.com/THETIS-dataset/dataset.git")
#         os.chdir("dataset")
#         os.system("git sparse-checkout init --cone")
#         os.system("git sparse-checkout set VIDEO_RGB")
#         os.system("git checkout")
#         os.chdir("..")

src_dir = "/content/DL_Project/DL_Project/our_videos"
npy_dir = "npy_videos"

create_npy_from_videos_flat(src_dir, npy_dir)
extract_keypoints_from_npy_flat(npy_dir, save_path="our_keypoints.npz")




Processing: VID-20250526-WA0005.mp4
Processing: VID-20250526-WA0001.mp4
Processing: VID-20250526-WA0002.mp4
Processing: VID-20250526-WA0009.mp4
Processing: VID-20250526-WA0008.mp4
Processing: VID-20250526-WA0012.mp4
Processing: VID-20250526-WA0007.mp4
Processing: VID-20250526-WA0010.mp4
Processing: VID-20250526-WA0011.mp4
Processing: VID-20250526-WA0006.mp4
Processing: VID-20250526-WA0004.mp4
Processing: VID-20250526-WA0003.mp4


Processing videos: 100%|██████████| 36/36 [01:55<00:00,  3.22s/it]


Keypoints saved to our_keypoints.npz


In [5]:
video_root = "/content/DL_Project/DL_Project/our_videos"
print("Contents of root video folder:")
print(os.listdir(video_root))

Contents of root video folder:
['VID-20250526-WA0005.mp4', 'VID-20250526-WA0001.mp4', 'VID-20250526-WA0002.mp4', 'VID-20250526-WA0009.mp4', 'VID-20250526-WA0008.mp4', 'VID-20250526-WA0012.mp4', 'VID-20250526-WA0007.mp4', 'VID-20250526-WA0010.mp4', 'VID-20250526-WA0011.mp4', 'VID-20250526-WA0006.mp4', 'VID-20250526-WA0004.mp4', 'VID-20250526-WA0003.mp4']


In [8]:
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

# Load keypoints
keypoints_path = "/content/DL_Project/our_keypoints.npz"
data = np.load(keypoints_path)

# Rebuild dictionary
all_keypoints = {}
for key in data.files:
    action, video_file = key.split("__", 1)
    if action not in all_keypoints:
        all_keypoints[action] = {}
    all_keypoints[action][video_file] = data[key]

# Select one example
first_action = list(all_keypoints.keys())[0]
first_video = list(all_keypoints[first_action].keys())[0]
frame_idx = 0

points = all_keypoints[first_action][first_video][frame_idx]  # shape: (num_joints, 3)

# === Define anatomical connections and colors ===
anatomical_connections = {
    'head': [
        (0, 1), (1, 2), (2, 3),
        (0, 4), (4, 5), (5, 6),
        (3, 7), (6, 8),
        (0, 9), (9, 10)
    ],
    'left_arm': [(11, 13), (13, 15), (15, 17), (15, 19), (15, 21)],
    'right_arm': [(12, 14), (14, 16), (16, 18), (16, 20), (16, 22)],
    'torso': [(11, 12), (23, 24), (11, 23), (12, 24)],
    'left_leg': [(23, 25), (25, 27), (27, 29), (29, 31)],
    'right_leg': [(24, 26), (26, 28), (28, 30), (30, 32)],
}

colors = {
    'head': 'gray',
    'left_arm': 'red',
    'right_arm': 'blue',
    'torso': 'orange',
    'left_leg': 'green',
    'right_leg': 'purple',
}

# === Plotting ===
fig = plt.figure(figsize=(10, 8))
ax = fig.add_subplot(111, projection='3d')
ax.scatter(points[:, 0], points[:, 1], points[:, 2], c='black', s=20)

for part, connections in anatomical_connections.items():
    for i, j in connections:
        if i < len(points) and j < len(points):  # ensure valid index
            ax.plot(
                [points[i, 0], points[j, 0]],
                [points[i, 1], points[j, 1]],
                [points[i, 2], points[j, 2]],
                color=colors[part], linewidth=2
            )

ax.set_xlabel('X')
ax.set_ylabel('Y')
ax.set_zlabel('Z')
ax.set_title(f"3D Skeleton - Frame {frame_idx} - {first_video}")
ax.view_init(elev=20, azim=-70)
plt.tight_layout()
plt.show()

ValueError: not enough values to unpack (expected 2, got 1)

In [9]:
import numpy as np
import pickle

ntu_joints_in_mediapipe = [
    0, 11, 12, 13, 14, 15, 16,
    23, 24, 25, 26, 27, 28,
    5, 2, 7, 8,
    17, 18, 19, 20, 21, 22,
    29, 30
]

def filter_to_ntu_joints(keypoints): # keypoints (T, 33, C)
    return keypoints[:, ntu_joints_in_mediapipe, :]

def prepare_stgcn_data(dataset, label_map):
    data_list = []
    for i, (keypoints, action_name) in enumerate(dataset):
        keypoints = np.array(keypoints)  # (T, 33, C)
        keypoints = filter_to_ntu_joints(keypoints)  # (T, 25, C)
        num_frames, num_joints, channels = keypoints.shape
        keypoints = keypoints[np.newaxis, ...]  # add person dim: (M=1, T, V, C)

        sample = {
            'frame_dir': f'sample_{i}',
            'label': label_map[action_name],
            'img_shape': None,
            'total_frames': num_frames,
            'keypoint': keypoints,
        }
        data_list.append(sample)
    return data_list

# Load your data
data_npz = np.load("our_keypoints.npz")
dataset = []

for key in data_npz.files:
    keypoints = data_npz[key]
    if keypoints.ndim == 4 and keypoints.shape[1] == 1:
        keypoints = np.squeeze(keypoints, axis=1)
    dataset.append((keypoints, "unknown"))

label_map = {"unknown": 0}
test_data = prepare_stgcn_data(dataset, label_map)

with open("our_test_labels.pkl", "wb") as f:
    pickle.dump(test_data, f)

print(f"Created our_test_labels.pkl with {len(test_data)} samples.")


Created our_test_labels.pkl with 36 samples.
