In [1]:
!pip install -q mediapipe

In [5]:
!curl -o ../posemodels/pose_landmarker_heavy.task https://storage.googleapis.com/mediapipe-models/pose_landmarker/pose_landmarker_heavy/float16/1/pose_landmarker_heavy.task

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed

  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
 13 29.2M   13 4016k    0     0  4382k      0  0:00:06 --:--:--  0:00:06 4393k
 74 29.2M   74 21.6M    0     0  11.2M      0  0:00:02  0:00:01  0:00:01 11.3M
100 29.2M  100 29.2M    0     0  12.6M      0  0:00:02  0:00:02 --:--:-- 12.6M


In [7]:
from mediapipe import solutions
from mediapipe.framework.formats import landmark_pb2
import numpy as np
import cv2
import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
import json
import csv
import os

In [9]:
def draw_landmarks_on_image(rgb_image, detection_result):
    pose_landmarks_list = detection_result.pose_landmarks
    annotated_image = np.copy(rgb_image)

    for idx in range(len(pose_landmarks_list)):
        pose_landmarks = pose_landmarks_list[idx]
        pose_landmarks_proto = landmark_pb2.NormalizedLandmarkList()
        pose_landmarks_proto.landmark.extend([
            landmark_pb2.NormalizedLandmark(x=landmark.x, y=landmark.y, z=landmark.z) for landmark in pose_landmarks
        ])
        solutions.drawing_utils.draw_landmarks(
            annotated_image,
            pose_landmarks_proto,
            solutions.pose.POSE_CONNECTIONS,
            solutions.drawing_styles.get_default_pose_landmarks_style())
    return annotated_image

In [15]:
def extract_landmarks(detection_result):
    landmarks = []
    for pose_landmarks in detection_result.pose_landmarks:
        for landmark in pose_landmarks:
            landmarks.append({
                'x': landmark.x,
                'y': landmark.y,
                'z': landmark.z,
                'visibility': landmark.visibility
            })
    return landmarks

In [17]:
base_options = python.BaseOptions(model_asset_path='../posemodels/pose_landmarker_heavy.task')
options = vision.PoseLandmarkerOptions(
    base_options=base_options,
    output_segmentation_masks=True)
detector = vision.PoseLandmarker.create_from_options(options)

# Ruta base donde están las carpetas de datos
base_path = '../data/raw'
processed_path = '../data/processed'

# Iterar sobre cada categoría de movimiento en la carpeta base
for category in os.listdir(base_path):
    category_path = os.path.join(base_path, category)
    
    # Revisar que sea una carpeta
    if not os.path.isdir(category_path):
        continue
    
    processed_category_path = os.path.join(processed_path, category)
    output_json_path = os.path.join(processed_category_path, 'json')
    output_csv_path = os.path.join(processed_category_path, 'csv')
    os.makedirs(output_json_path, exist_ok=True)
    os.makedirs(output_csv_path, exist_ok=True)
    
    # Procesar cada video en la categoría
    for video_file in os.listdir(category_path):
        if not video_file.endswith('.mp4'):
            continue
        
        video_path = os.path.join(category_path, video_file)
        print(video_path)
        cap = cv2.VideoCapture(video_path)
        all_landmarks = []

        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break

            rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb_frame)
            detection_result = detector.detect(mp_image)

            landmarks = extract_landmarks(detection_result)
            all_landmarks.append(landmarks)

        cap.release()

        # Guardar los datos en formato JSON
        json_output_path = os.path.join(output_json_path, f"{video_file.split('.')[0]}.json")
        with open(json_output_path, 'w') as json_file:
            json.dump(all_landmarks, json_file, indent=4)

        # Guardar los datos en formato CSV
        csv_output_path = os.path.join(output_csv_path, f"{video_file.split('.')[0]}.csv")
        csv_columns = ['frame', 'landmark_index', 'x', 'y', 'z', 'visibility']
        with open(csv_output_path, 'w', newline='') as csvfile:
            writer = csv.DictWriter(csvfile, fieldnames=csv_columns)
            writer.writeheader()
            for frame_index, frame_landmarks in enumerate(all_landmarks):
                for landmark_index, landmark in enumerate(frame_landmarks):
                    row = {
                        'frame': frame_index,
                        'landmark_index': landmark_index,
                        'x': landmark['x'],
                        'y': landmark['y'],
                        'z': landmark['z'],
                        'visibility': landmark['visibility']
                    }
                    writer.writerow(row)


../data/raw\caminar hacia atras\1.mp4
../data/raw\caminar hacia atras\2.mp4
../data/raw\caminar hacia atras\3.mp4
../data/raw\caminar hacia atras\4.mp4
../data/raw\caminar hacia atras\5.mp4
../data/raw\caminar hacia atras\6.mp4
../data/raw\caminar hacia atras\7.mp4
../data/raw\caminar hacia atras\8.mp4
../data/raw\caminar hacia atras\9.mp4
../data/raw\caminar hacia atras\VID-20241028-WA0019.mp4
../data/raw\caminar hacia atras\VID-20241028-WA0020.mp4
../data/raw\caminar hacia atras\VID-20241028-WA0021.mp4
../data/raw\caminar hacia atras\VID-20241028-WA0022.mp4
../data/raw\caminar hacia delante\1.mp4
../data/raw\caminar hacia delante\2.mp4
../data/raw\caminar hacia delante\3.mp4
../data/raw\caminar hacia delante\4.mp4
../data/raw\caminar hacia delante\5.mp4
../data/raw\caminar hacia delante\6.mp4
../data/raw\caminar hacia delante\7.mp4
../data/raw\caminar hacia delante\8.mp4
../data/raw\caminar hacia delante\VID-20241028-WA0013.mp4
../data/raw\caminar hacia delante\VID-20241028-WA0014.mp