# Exploración de datos con MediaPipe Pose

Este cuaderno realiza una exploración inicial sobre videos con actividades humanas usando extracción de pose (landmarks) con MediaPipe.

## Objetivos
- Cargar un video de ejemplo (subido o por ruta en Colab).
- Extraer landmarks de pose (hombros, caderas, rodillas, tobillos, muñecas, cabeza).
- Preprocesar: normalización por tamaño de torso y suavizado.
- Generar características: ángulos de rodilla y cadera, inclinación de tronco, velocidades.
- Visualizar series temporales y estadísticas básicas.
- Integrar anotaciones si se proporciona un CSV y analizar distribución por clases.


In [None]:
# Instalación de dependencias (si es necesario) y paquetes
import sys, subprocess, importlib

def ensure(pkg, import_name=None, version=None):
    try:
        return importlib.import_module(import_name or pkg)
    except Exception:
        to_install = pkg if version is None else f"{pkg}=={version}"
        subprocess.check_call([sys.executable, "-m", "pip", "install", to_install])
        return importlib.import_module(import_name or pkg)

np = ensure("numpy")
pd = ensure("pandas")
cv2 = ensure("opencv-python", "cv2")
mp = ensure("mediapipe", "mediapipe", version="0.10.14")
matplotlib = ensure("matplotlib")
plt = importlib.import_module("matplotlib.pyplot")
scipy = ensure("scipy")
from scipy.signal import savgol_filter
try:
    sns = ensure("seaborn")
except Exception:
    sns = None

print({
    "numpy": np.__version__,
    "pandas": pd.__version__,
    "cv2": cv2.__version__,
    "mediapipe": mp.__version__,
    "matplotlib": matplotlib.__version__,
    "scipy": scipy.__version__,
    "seaborn": getattr(sns, "__version__", "not-installed"),
})



{'numpy': '2.0.2', 'pandas': '2.2.2', 'cv2': '4.12.0', 'mediapipe': '0.10.14', 'matplotlib': '3.10.0', 'scipy': '1.16.3', 'seaborn': '0.13.2'}


In [None]:
# Entrada: ruta de video y (opcional) anotaciones (Colab)
from pathlib import Path
from typing import Optional
from google.colab import files  # type: ignore

# Opcional: define una ruta si ya tienes el archivo en el entorno de Colab
VIDEO_PATH: str = ""  # p.ej., "/content/mi_video.mp4"
ANNOTATIONS_CSV: str = ""  # p.ej., "/content/anotaciones.csv"

EXPECTED_ANNOTATION_FORMAT = "either-intervals-or-framewise"
# Formatos soportados:
# - Intervalos: columnas [start_s, end_s, label]
# - Framewise: columnas [frame, label]


def choose_video_path() -> str:
    if VIDEO_PATH:
        p = Path(VIDEO_PATH)
        if p.exists():
            return str(p)
        else:
            print(f"No se encontró VIDEO_PATH: {VIDEO_PATH}. Se solicitará subida.")
    print("Sube un archivo de video (mp4/mov/avi)...")
    uploaded = files.upload()
    assert uploaded, "Debes subir un archivo de video."
    path = list(uploaded.keys())[0]
    print(f"Video subido: {path}")
    return path

video_path = choose_video_path()
print("Usando video:", video_path)



In [None]:
# Extracción de pose con MediaPipe
from dataclasses import dataclass
import math

POSE_LM = mp.solutions.pose.PoseLandmark
SELECTED_LANDMARKS = [
    POSE_LM.NOSE,
    POSE_LM.LEFT_SHOULDER, POSE_LM.RIGHT_SHOULDER,
    POSE_LM.LEFT_HIP, POSE_LM.RIGHT_HIP,
    POSE_LM.LEFT_KNEE, POSE_LM.RIGHT_KNEE,
    POSE_LM.LEFT_ANKLE, POSE_LM.RIGHT_ANKLE,
    POSE_LM.LEFT_WRIST, POSE_LM.RIGHT_WRIST,
]

@dataclass
class VideoInfo:
    fps: float
    width: int
    height: int
    frame_count: int
    duration_s: float


def read_video_info(path: str) -> VideoInfo:
    cap = cv2.VideoCapture(path)
    assert cap.isOpened(), f"No se pudo abrir el video: {path}"
    fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    duration_s = frame_count / fps if fps > 0 else 0.0
    cap.release()
    return VideoInfo(fps=fps, width=width, height=height, frame_count=frame_count, duration_s=duration_s)


def extract_pose_dataframe(path: str, sample_every_n: int = 1) -> tuple[pd.DataFrame, VideoInfo]:
    info = read_video_info(path)
    records = []
    cap = cv2.VideoCapture(path)
    with mp.solutions.pose.Pose(
        static_image_mode=False,
        model_complexity=1,
        enable_segmentation=False,
        min_detection_confidence=0.5,
        min_tracking_confidence=0.5,
    ) as pose:
        frame_idx = 0
        while cap.isOpened():
            ok, frame_bgr = cap.read()
            if not ok:
                break
            if frame_idx % sample_every_n != 0:
                frame_idx += 1
                continue
            frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
            res = pose.process(frame_rgb)
            time_s = frame_idx / (info.fps if info.fps > 0 else 30.0)
            if res.pose_landmarks:
                lm = res.pose_landmarks.landmark
                # Puntos medios para torso
                l_sh = lm[POSE_LM.LEFT_SHOULDER.value]
                r_sh = lm[POSE_LM.RIGHT_SHOULDER.value]
                l_hip = lm[POSE_LM.LEFT_HIP.value]
                r_hip = lm[POSE_LM.RIGHT_HIP.value]
                shoulder_mid = ((l_sh.x + r_sh.x) / 2.0, (l_sh.y + r_sh.y) / 2.0)
                hip_mid = ((l_hip.x + r_hip.x) / 2.0, (l_hip.y + r_hip.y) / 2.0)
                torso_size = math.hypot(shoulder_mid[0] - hip_mid[0], shoulder_mid[1] - hip_mid[1])
                torso_size = torso_size if torso_size > 1e-6 else 1.0

                row = {
                    "frame": frame_idx,
                    "time_s": time_s,
                }
                # Guardar coords normalizadas relativas al centro de cadera y escaladas por tamaño de torso
                for landmark in SELECTED_LANDMARKS:
                    p = lm[landmark.value]
                    rel_x = (p.x - hip_mid[0]) / torso_size
                    rel_y = (p.y - hip_mid[1]) / torso_size
                    row[f"{landmark.name.lower()}_x"] = rel_x
                    row[f"{landmark.name.lower()}_y"] = rel_y
                    row[f"{landmark.name.lower()}_v"] = p.visibility
                # guardar puntos medios
                row["shoulder_mid_x"], row["shoulder_mid_y"] = (
                    (shoulder_mid[0] - hip_mid[0]) / torso_size,
                    (shoulder_mid[1] - hip_mid[1]) / torso_size,
                )
                row["hip_mid_x"], row["hip_mid_y"] = 0.0, 0.0
                records.append(row)
            frame_idx += 1
    cap.release()
    df = pd.DataFrame.from_records(records)
    return df, info

pose_df, video_info = extract_pose_dataframe(video_path)
print(video_info)
print("Frames con pose detectada:", len(pose_df))
pose_df.head()



In [None]:
# Preprocesamiento: suavizado y utilidades geométricas

def smooth_series(y: np.ndarray, window: int = 11, poly: int = 2) -> np.ndarray:
    if y.ndim == 1:
        n = len(y)
    else:
        n = y.shape[0]
    if n < 5:
        return y
    w = min(window, n - (1 - n % 2))
    if w % 2 == 0:
        w = max(3, w - 1)
    try:
        return savgol_filter(y, window_length=w, polyorder=min(poly, w - 1), axis=0)
    except Exception:
        return y


def angle_between(v1: np.ndarray, v2: np.ndarray, eps: float = 1e-8) -> float:
    a = v1 / (np.linalg.norm(v1) + eps)
    b = v2 / (np.linalg.norm(v2) + eps)
    cosang = np.clip(np.dot(a, b), -1.0, 1.0)
    return float(np.degrees(np.arccos(cosang)))


def get_xy(df: pd.DataFrame, base: str) -> tuple[np.ndarray, np.ndarray]:
    return df[f"{base}_x"].to_numpy(), df[f"{base}_y"].to_numpy()



In [None]:
# Extracción de características: ángulos, inclinación, velocidades

def compute_features(df: pd.DataFrame, fps: float) -> pd.DataFrame:
    out = df.copy()
    # Puntos medios tronco
    shoulder = out[["shoulder_mid_x", "shoulder_mid_y"]].to_numpy()
    hip = out[["hip_mid_x", "hip_mid_y"]].to_numpy()
    trunk_vec = shoulder - hip
    # Inclinación del tronco respecto al eje vertical (0, -1)
    vertical = np.stack([np.zeros(len(out)), -np.ones(len(out))], axis=1)
    trunk_incl_deg = np.array([angle_between(trunk_vec[i], vertical[i]) for i in range(len(out))])
    out["trunk_incl_deg_raw"] = trunk_incl_deg
    out["trunk_incl_deg"] = smooth_series(trunk_incl_deg)

    # Ángulos de rodilla: entre (cadera->rodilla) y (tobillo->rodilla)
    def joint_angle(prox_base: str, joint: str, dist_base: str) -> np.ndarray:
        px = out[[f"{prox_base}_x", f"{prox_base}_y"]].to_numpy()
        jx = out[[f"{joint}_x", f"{joint}_y"]].to_numpy()
        dx = out[[f"{dist_base}_x", f"{dist_base}_y"]].to_numpy()
        v1 = px - jx
        v2 = dx - jx
        return np.array([angle_between(v1[i], v2[i]) for i in range(len(out))])

    lknee_deg = joint_angle("left_hip", "left_knee", "left_ankle")
    rknee_deg = joint_angle("right_hip", "right_knee", "right_ankle")
    out["left_knee_deg_raw"], out["right_knee_deg_raw"] = lknee_deg, rknee_deg
    out["left_knee_deg"], out["right_knee_deg"] = smooth_series(lknee_deg), smooth_series(rknee_deg)

    # Ángulo de cadera: entre (hombro_mid->cadera) y (rodilla->cadera)
    def hip_angle(side: str) -> np.ndarray:
        shoulder_mid = out[["shoulder_mid_x", "shoulder_mid_y"]].to_numpy()
        hip_pt = out[[f"{side}_hip_x", f"{side}_hip_y"]].to_numpy()
        knee_pt = out[[f"{side}_knee_x", f"{side}_knee_y"]].to_numpy()
        v1 = shoulder_mid - hip_pt
        v2 = knee_pt - hip_pt
        return np.array([angle_between(v1[i], v2[i]) for i in range(len(out))])

    lhip_deg = hip_angle("left")
    rhip_deg = hip_angle("right")
    out["left_hip_deg_raw"], out["right_hip_deg_raw"] = lhip_deg, rhip_deg
    out["left_hip_deg"], out["right_hip_deg"] = smooth_series(lhip_deg), smooth_series(rhip_deg)

    # Velocidades (derivadas) para algunos puntos clave
    def deriv(a: np.ndarray, fps: float) -> np.ndarray:
        if len(a) < 2:
            return np.zeros_like(a)
        d = np.gradient(a, 1.0 / max(fps, 1.0), axis=0)
        return d

    for base in [
        "left_wrist", "right_wrist",
        "left_knee", "right_knee",
        "left_ankle", "right_ankle",
    ]:
        x, y = get_xy(out, base)
        vx, vy = deriv(x, fps), deriv(y, fps)
        speed = np.sqrt(vx**2 + vy**2)
        out[f"{base}_speed"] = smooth_series(speed)

    return out

features_df = compute_features(pose_df, fps=max(video_info.fps, 1.0))
features_df.head()



In [None]:
# EDA: estadísticas y visualizaciones
import matplotlib.pyplot as plt
import numpy as np

plt.figure(figsize=(12, 4))
plt.plot(features_df["time_s"], features_df["left_knee_deg"], label="Left knee")
plt.plot(features_df["time_s"], features_df["right_knee_deg"], label="Right knee")
plt.title("Ángulo de rodilla (suavizado)")
plt.xlabel("Tiempo (s)")
plt.ylabel("Grados")
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

plt.figure(figsize=(12, 4))
plt.plot(features_df["time_s"], features_df["left_hip_deg"], label="Left hip")
plt.plot(features_df["time_s"], features_df["right_hip_deg"], label="Right hip")
plt.title("Ángulo de cadera (suavizado)")
plt.xlabel("Tiempo (s)")
plt.ylabel("Grados")
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

plt.figure(figsize=(12, 4))
plt.plot(features_df["time_s"], features_df["trunk_incl_deg"], color="tab:orange")
plt.title("Inclinación del tronco (grados)")
plt.xlabel("Tiempo (s)")
plt.ylabel("Grados")
plt.grid(True, alpha=0.3)
plt.show()

summary_cols = [
    "left_knee_deg", "right_knee_deg",
    "left_hip_deg", "right_hip_deg",
    "trunk_incl_deg",
    "left_wrist_speed", "right_wrist_speed",
    "left_knee_speed", "right_knee_speed",
    "left_ankle_speed", "right_ankle_speed",
]
print("Resumen estadístico de características clave:")
display(features_df[summary_cols].describe().T)



In [None]:
# (Opcional) Cargar anotaciones y analizar distribución por clases
import pandas as pd
from typing import Optional


def load_annotations(csv_path: str) -> Optional[pd.DataFrame]:
    if not csv_path:
        print("No se proporcionó ANNOTATIONS_CSV. Saltando.")
        return None
    p = Path(csv_path)
    if not p.exists():
        print(f"No existe el archivo de anotaciones: {csv_path}")
        return None
    df = pd.read_csv(p)
    print("Anotaciones cargadas:", df.shape)
    return df


def align_annotations_per_frame(ann: pd.DataFrame, fps: float, frames: int) -> Optional[pd.Series]:
    # Soporta dos formatos: intervalos o por frame
    cols = set(c.lower() for c in ann.columns)
    labels = pd.Series([None] * frames, name="label")
    if {"start_s", "end_s", "label"}.issubset(cols):
        for _, row in ann.iterrows():
            s = int(max(0, round(row["start_s"] * fps)))
            e = int(min(frames - 1, round(row["end_s"] * fps)))
            labels.iloc[s : e + 1] = row["label"]
        return labels
    if {"frame", "label"}.issubset(cols):
        idx = ann["frame"].astype(int).clip(0, frames - 1)
        labels.iloc[idx] = ann["label"].astype(str)
        return labels
    print("Formato de anotaciones no reconocido. Esperado intervalos o por frame.")
    return None

ann_df = load_annotations(ANNOTATIONS_CSV)
labels_per_frame = None
if ann_df is not None:
    labels_per_frame = align_annotations_per_frame(
        ann_df, fps=max(video_info.fps, 1.0), frames=video_info.frame_count
    )
    if labels_per_frame is not None:
        # recortar a los frames presentes en features_df
        max_frame = features_df["frame"].max()
        labels_per_frame = labels_per_frame.iloc[: max_frame + 1]
        label_aligned = labels_per_frame.reindex(features_df["frame"].to_numpy(), fill_value=None).reset_index(drop=True)
        features_df["label"] = label_aligned
        print("Distribución de clases:")
        print(features_df["label"].value_counts(dropna=True))



In [None]:
# Exportar características a CSV
out_path = Path("features.csv")
features_df.to_csv(out_path, index=False)
print("Guardado:", out_path.resolve())