# Pose Landmarks with MediaPipe — From Local Videos & Folders Using Python

This notebook is both a **guided lesson** and a **working pipeline** for detecting human pose landmarks from **local video files** or **entire folders** of videos using **MediaPipe Tasks**.

## Goal

1. Set up a clean Python 3.12 environment and verify required packages.
2. Understand each step and terminologies.
3. Download and select a Pose Landmarker model (**lite / full / heavy**) and understand accuracy–speed trade-offs.
4. Read videos with OpenCV and run inference in **`RunningMode.VIDEO`** with correct **timestamps**.
5. Export results as tidy CSVs for analysis: **2D image-normalized** and **3D world** landmarks.
6. Create an **annotated MP4** showing the skeleton overlay.
7. Build intuition for **visibility**, **image vs. world coordinates**, and simple feature engineering (e.g., joint angles).

> **Built for learning:** Along the way you’ll see short callouts explaining *why* each step exists (e.g., timestamps in VIDEO mode), how coordinate spaces differ, and how to tune speed vs. accuracy.

## After completing this guide, you will be able to

* Load one video—or loop through an entire folder—and extract the coordinates of the landmark bodypoints frame-by-frame.
* Save two analysis-ready CSVs per video: one for **2D normalized** landmarks and one for **3D world** coordinates.
* Produce an **annotated MP4** with landmarks and connections overlaid.
* Explain and adjust **`RunningMode.VIDEO`**, **per-frame timestamps**, **visibility filtering**, **image vs. world coordinates**, and model variants (**lite/full/heavy**).

> **Prerequisites**
>
> * Python **3.12** virtual environment selected as the active Jupyter kernel. In case yo8u need help, please refer to the "LS100_Guide 3_Introduction to Pose Estimation Using MediaPipe.pdf" guide.
> * Installed packages: `mediapipe opencv-python pandas numpy tqdm matplotlib seaborn`
> * One or more local video files (e.g., `.mp4`) to test.

> **Ethics & consent**
>
> * If processing videos of people, obtain consent and store data securely. Avoid uploading sensitive content to third-party services.

### References for learners

* MediaPipe Pose Landmarker (Python guide): [https://ai.google.dev/edge/mediapipe/solutions/vision/pose_landmarker/python](https://ai.google.dev/edge/mediapipe/solutions/vision/pose_landmarker/python)
* Pose Landmarker API: [https://ai.google.dev/edge/api/mediapipe/python/mp/tasks/vision/PoseLandmarker](https://ai.google.dev/edge/api/mediapipe/python/mp/tasks/vision/PoseLandmarker)
* Model card (BlazePose GHUM 3D; lite/full/heavy):
  [https://storage.googleapis.com/mediapipe-assets/Model%20Card%20BlazePose%20GHUM%203D.pdf](https://storage.googleapis.com/mediapipe-assets/Model%20Card%20BlazePose%20GHUM%203D.pdf)

---


# 0. Environment Setup and Verification (LS100 Standard)

Before running any code, make sure you’re using the **LS100_PoseEstimation_MP** kernel that was created in your Python 3.12 virtual environment.
This section verifies your environment and installs all required packages.

---

### **What you should already have**

✅ Python 3.12 installed

✅ Virtual environment activated (`(MediaPipeEnv)` should appear in your terminal)

✅ Kernel registered as **LS100_PoseEstimation_MP**

If you haven’t completed those steps, revisit the **LS100_Guide 3_Introduction to Pose Estimation Using MediaPipe.pdf** document.

---

### **Required packages**

This notebook uses the following libraries:

* `mediapipe` – pose landmark model and API
* `opencv-python` – video I/O (input/output) and frame conversion
* `pandas` & `numpy` – data handling and analysis
* `tqdm` – progress bars for video processing
* `matplotlib` & `seaborn` – visualization and data inspection

Run the next cell to ensure these are installed and to confirm the environment details.

---

### **Learning focus**

* Why virtual environments prevent version conflicts
* Why we require **Python 3.12** (MediaPipe Tasks currently supports Python 3.9–3.12 only)
* How each library fits into the MediaPipe Pose pipeline
---



## 0. Environment setup

> If running locally (VS Code/Jupyter), run the following cell once; it might take about a minute to run.


In [4]:
# ============================================
# 0. Environment Setup and Package Verification
# ============================================

import sys
import importlib
import subprocess

# ---- 1. Check Python version ----
py_version = sys.version_info
print(f"🧠 Python version: {py_version.major}.{py_version.minor}.{py_version.micro}")
if py_version < (3, 9) or py_version >= (3, 13):
    print("⚠️ MediaPipe Tasks officially supports Python 3.9–3.12.")
    print("⚠️ Please switch to Python 3.12 for this notebook (as used in LS100).")

# ---- 2. Define required packages ----
required_packages = [
    "mediapipe",
    "opencv-python",
    "pandas",
    "numpy",
    "tqdm",
    "matplotlib",
    "seaborn",
]

# ---- 3. Function to check and install ----
def install_if_missing(pkg):
    """
    Try importing the package; if not found, install it quietly.
    """
    try:
        importlib.import_module(pkg.split("==")[0])
        print(f"✅ {pkg} already installed")
    except ImportError:
        print(f"⬇️ Installing {pkg} ...")
        subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", pkg])

# ---- 4. Verify each dependency ----
for package in required_packages:
    install_if_missing(package)

# ---- 5. Print package versions for reproducibility ----
import mediapipe as mp
import cv2, pandas as pd, numpy as np, tqdm, matplotlib, seaborn

print("\n📦 Package versions:")
print(f"mediapipe      : {mp.__version__}")
print(f"opencv-python  : {cv2.__version__}")
print(f"pandas         : {pd.__version__}")
print(f"numpy          : {np.__version__}")
print(f"matplotlib     : {matplotlib.__version__}")
print(f"seaborn        : {seaborn.__version__}")

print("\n✅ Environment is ready to proceed!")


🧠 Python version: 3.12.12
✅ mediapipe already installed
⬇️ Installing opencv-python ...
✅ pandas already installed
✅ numpy already installed
✅ tqdm already installed
✅ matplotlib already installed
✅ seaborn already installed

📦 Package versions:
mediapipe      : 0.10.21
opencv-python  : 4.11.0
pandas         : 2.3.3
numpy          : 1.26.4
matplotlib     : 3.10.7
seaborn        : 0.13.2

✅ Environment is ready to proceed!



## 1. Imports & version checks


---
# 1. Imports and Version Verification

Now that your environment is ready, let’s import the main libraries used throughout this notebook.

This step helps confirm that:

* The correct packages are installed inside your LS100 virtual environment
* MediaPipe loads successfully (and we can access its **Tasks API**)
* OpenCV, NumPy, and Pandas are working properly

If an import fails, it usually means you’re running the notebook in a different kernel (not the one you registered).
You can fix that by selecting **Kernel → Change Kernel → LS100_PoseEstimation_MP** (or the name you chose).

---

In [1]:
# ======================================
# 1. Import Libraries and Verify Versions (fixed for MediaPipe >=0.10)
# ======================================

import os, cv2, numpy as np, pandas as pd, matplotlib, seaborn as sns
from tqdm import tqdm

import mediapipe as mp
from mediapipe.tasks import python as mp_python
from mediapipe.tasks.python import vision as mp_vision

print("✅ MediaPipe Tasks API imported successfully!\n")
print(f"mediapipe version : {mp.__version__}")
print(f"opencv version    : {cv2.__version__}")
print(f"pandas version    : {pd.__version__}")
print(f"numpy version     : {np.__version__}")

# Optional: check GPU availability
backend = "GPU" if cv2.cuda.getCudaEnabledDeviceCount() > 0 else "CPU"
print(f"⚙️ Running on {backend}")

# ---- Smoke test: confirm Tasks API symbols exist ----
BaseOptions = mp_python.BaseOptions
PoseLandmarker = mp_vision.PoseLandmarker
PoseLandmarkerOptions = mp_vision.PoseLandmarkerOptions
RunningMode = mp_vision.RunningMode

print("\n MediaPipe Tasks API is available:")
print(f"- BaseOptions           : {BaseOptions is not None}")
print(f"- PoseLandmarker        : {PoseLandmarker is not None}")
print(f"- PoseLandmarkerOptions : {PoseLandmarkerOptions is not None}")
print(f"- RunningMode           : {RunningMode is not None}")

✅ MediaPipe Tasks API imported successfully!

mediapipe version : 0.10.21
opencv version    : 4.11.0
pandas version    : 2.3.3
numpy version     : 1.26.4
⚙️ Running on CPU

 MediaPipe Tasks API is available:
- BaseOptions           : True
- PoseLandmarker        : True
- PoseLandmarkerOptions : True
- RunningMode           : True


---

### **Notes**

* **Why this matters:** ensures that the environment is truly isolated and reproducible.
* **Discussion prompt:** Can you tell *why* we check MediaPipe imports *before* running the pipeline? (to confirm the **Tasks** API is available and working).
* **TASK:** Print `mp.__file__` to confirm MediaPipe’s path. This helps you understand where packages live inside the venv.

---


## 2. How Pose Landmarker works

- **Running modes:** `IMAGE`, `VIDEO`, `LIVE_STREAM`. For offline videos we use **`VIDEO`** and must pass a **timestamp (ms)** for each frame; the task uses **tracking** to avoid re-running the full model on every frame (reduces latency at the same accuracy settings).  
- **Outputs:**  
  - **2D normalized landmarks** in image coordinates (*x,y in [0,1] relative to width/height; z is a depth-like value; visibility in [0,1]*).  
  - **3D world landmarks** (meters, origin near hip center; handy for biomechanical features).  
- **Variants:** **lite / full / heavy**. Heavier models = more accurate, slower (see model card).  
- **Accuracy vs speed knobs:** `num_poses` (usually 1 for single-person), `min_pose_detection_confidence`, `min_pose_presence_confidence`, `min_tracking_confidence`, and **frame stride** (e.g., analyze every 2nd/3rd frame).

> We’ll expose all of these transparently in helper functions below.



## 3. Download a Pose Landmarker model (`.task` bundle)

Choose one of: `"lite"`, `"full"`, `"heavy"` (default).  
URLs follow Google’s published pattern; we try `latest/…` first and then fall back to version `1/…`.

> You only need to download once; it will be cached under `models/`.


In [3]:
# ================================
# 3. Model Selection & Download
# ================================
import os
import pathlib
import urllib.request
import urllib.error

import mediapipe as mp
from mediapipe.tasks import python as mp_python
from mediapipe.tasks.python import vision as mp_vision

# ---- Where to save models ----
MODELS_DIR = pathlib.Path("models")
MODELS_DIR.mkdir(parents=True, exist_ok=True)

# ---- Official model URLs (latest, with fallback to v1) ----
MODEL_URLS = {
    "lite": [
        "https://storage.googleapis.com/mediapipe-models/pose_landmarker/pose_landmarker_lite/float16/latest/pose_landmarker_lite.task",
        "https://storage.googleapis.com/mediapipe-models/pose_landmarker/pose_landmarker_lite/float16/1/pose_landmarker_lite.task",
    ],
    "full": [
        "https://storage.googleapis.com/mediapipe-models/pose_landmarker/pose_landmarker_full/float16/latest/pose_landmarker_full.task",
        "https://storage.googleapis.com/mediapipe-models/pose_landmarker/pose_landmarker_full/float16/1/pose_landmarker_full.task",
    ],
    "heavy": [
        "https://storage.googleapis.com/mediapipe-models/pose_landmarker/pose_landmarker_heavy/float16/latest/pose_landmarker_heavy.task",
        "https://storage.googleapis.com/mediapipe-models/pose_landmarker_heavy/float16/1/pose_landmarker_heavy.task",
    ],
}

def download_pose_model(variant: str = "heavy") -> str:
    """
    Download the selected model variant (.task) to MODELS_DIR.
    Returns the local file path.
    """
    variant = variant.lower().strip()
    assert variant in MODEL_URLS, f"Unknown variant '{variant}'. Choose: lite, full, heavy."

    out_path = MODELS_DIR / f"pose_landmarker_{variant}.task"
    if out_path.exists() and out_path.stat().st_size > 50_000:
        print(f"✔ Model already present: {out_path}")
        return str(out_path)

    last_err = None
    for url in MODEL_URLS[variant]:
        try:
            print(f"Downloading {variant} model from:\n  {url}")
            with urllib.request.urlopen(url, timeout=60) as r, open(out_path, "wb") as f:
                f.write(r.read())
            if out_path.stat().st_size <= 50_000:
                raise RuntimeError("Downloaded file seems too small; trying fallback...")
            print(f"✔ Saved to {out_path} ({out_path.stat().st_size/1e6:.2f} MB)")
            return str(out_path)
        except Exception as e:
            print(f"… failed: {e}")
            last_err = e
    raise RuntimeError(f"Could not download model for variant '{variant}'. Last error: {last_err}")

# ---- Choose your default model here ----
# If the previous cell set `selected_model`, use it; otherwise default to "heavy".
try:
    MODEL_VARIANT = selected_model.lower().strip()
except NameError:
    MODEL_VARIANT = "heavy"   # default

MODEL_PATH = download_pose_model(MODEL_VARIANT)

# ---- Verify we can initialize the Pose Landmarker (VIDEO mode) ----
BaseOptions = mp_python.BaseOptions
PoseLandmarker = mp_vision.PoseLandmarker
PoseLandmarkerOptions = mp_vision.PoseLandmarkerOptions
RunningMode = mp_vision.RunningMode

options = PoseLandmarkerOptions(
    base_options=BaseOptions(model_asset_path=MODEL_PATH),
    running_mode=RunningMode.VIDEO,
    num_poses=1,
    min_pose_detection_confidence=0.5,
    min_pose_presence_confidence=0.5,
    min_tracking_confidence=0.5,
    output_segmentation_masks=False,
)

try:
    with PoseLandmarker.create_from_options(options) as landmarker:
        print("✅ PoseLandmarker initialized successfully (VIDEO mode).")
        print(f"   Model: {MODEL_VARIANT} → {MODEL_PATH}")
except Exception as e:
    print("❌ Failed to initialize PoseLandmarker. Check the model file and MediaPipe version.")
    raise


✔ Model already present: models/pose_landmarker_heavy.task
✅ PoseLandmarker initialized successfully (VIDEO mode).
   Model: heavy → models/pose_landmarker_heavy.task


I0000 00:00:1761584076.985338 53872720 gl_context.cc:369] GL version: 2.1 (2.1 Metal - 88.1), renderer: Apple M2 Max
W0000 00:00:1761584077.082610 53919524 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1761584077.170321 53919532 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.


## 4. VIDEO mode: timestamps & inference loop

For offline videos, we must use RunningMode.VIDEO and pass a monotonic timestamp (ms) for each frame:

* We read frames with OpenCV, compute timestamp_ms = int((frame_idx / fps) * 1000), and call
landmarker.detect_for_video(mp_image, timestamp_ms).

* The Task returns normalized 2D landmarks (x, y ∈ [0,1], z depth-like, plus visibility) and world 3D landmarks (x_m, y_m, z_m in meters).

* We’ll save tidy CSV files for 2D and 3D landmarks.

* We’ll also write an annotated MP4 by drawing a simple skeleton over each frame.

#### Parameters you can tune

* `MODEL_VARIANT` (lite/full/heavy), `num_poses` (usually 1), `frame_stride` (skip frames for speed),

* `min_pose_detection_confidence`, `min_pose_presence_confidence`, `min_tracking_confidence`.


# 5. Choose Your Parameters

Before running extraction, set the **tunable parameters** in the next cell.  
These control **model accuracy**, **processing speed**, **output organization**, and **post-processing filters** (anti-jitter smoothing).

---

### **Model Variant**

* **`MODEL_VARIANT`** — choose one of:
  * `lite` → fastest but least accurate  
  * `full` → balanced (medium accuracy & speed)  
  * `heavy` → **most accurate** *(default; recommended for LS100 on modern hardware)*

> Changing `MODEL_VARIANT` automatically downloads the correct `.task` file to your local `models/` folder if needed.

---

### **Inference Settings**

* **`frame_stride`** — process every *k*-th frame  
  * `1` = every frame (maximum precision)  
  * `2` = every other frame (faster)  
  * `3+` = skip more frames (fastest, least temporal detail)

* **`num_poses`** — number of people to detect per frame  
  * Use `1` for single-person videos (default in LS100)

* **Confidence thresholds**  
  * `min_pose_detection_confidence` — confidence for detecting a pose  
  * `min_pose_presence_confidence` — confidence that a person is visible  
  * `min_tracking_confidence` — confidence for stable tracking across frames

---

### **Output Settings**

* **`make_annotated_video`** — if `True`, saves an annotated `.mp4` showing the skeleton overlay.  
* **`outputs_subdir_name`** — defines where outputs are saved:
  * All **CSVs** and optional **annotated MP4s** are written to an `outputs/` folder placed **next to each input video** (same directory).

---

### **Post-Processing Filters (Anti-Jitter)**

After landmark extraction, you can smooth or clean the data:

* **`visibility_thresh`** — discard landmarks with confidence below threshold  
* **`hampel_window`** / **`hampel_nsigmas`** — outlier removal using a Hampel filter  
  * Removes sudden jumps and replaces them with local medians  
* **`rolling_window`** — rolling average smoother (reduces frame-to-frame jitter)

> 💡 **Tip:**  
> - If you have a slow computer, yo can choose `MODEL_VARIANT = "lite"` or `frame_stride = 2` to reduce load.  
> - After extraction, apply filtering to clean up the 2D CSV before using it in analysis.

---

*The Pose Landmarker returns:*
- **2D normalized landmarks**: `(x, y ∈ [0,1])`, `z` (depth-like, unitless), `visibility` (0–1 confidence).  
- **3D world landmarks**: `(x_m, y_m, z_m)` in meters.  

Outputs:
- **CSV files** for both 2D and 3D landmarks.  
- Optional annotated **MP4** with the skeleton overlay.



In [None]:
# =========================================
# 5. Parameters — YOU CAN EDIT THIS BLOCK
# =========================================

# --- Model choice ---
MODEL_VARIANT = "heavy"          # options: "lite", "full", "heavy"

# --- Inference behavior ---
frame_stride = 1                 # 1=every frame; 2=every other; 3=every third, etc.
num_poses = 1                    # typically 1 for single-person videos

# Confidence thresholds
min_pose_detection_confidence = 0.5
min_pose_presence_confidence  = 0.5
min_tracking_confidence       = 0.5

# --- Output location ---
# If you input a single video file → outputs will be saved to: <video_dir>/<outputs_subdir_name>/
# If you input a folder path → outputs will be saved to: <parent_of_folder>/<outputs_subdir_name>/
outputs_subdir_name  = "outputs"
make_annotated_video = True      # set False to skip saving annotated MP4s

# --- Post-processing filters (applied AFTER extraction to the 2D CSV) ---
# NOTE: Filtering improves smoothness but is slower. Turn off to speed up runs.
enable_filtering  = True        # ← students toggle this (True/False)
visibility_thresh = 0.5          # keep rows where visibility >= this
hampel_window     = 7            # odd int (in frames); robust outlier window
hampel_nsigmas    = 3.0          # sensitivity for Hampel (higher = fewer outliers)
rolling_window    = 3            # odd int (in frames); centered rolling average for x,y



# ======================================================
# 5.a DO NOT EDIT BELOW — this ensures everything runs correctly
# ======================================================
from pathlib import Path

# --- Ensure filtering utility is available (only defines if missing) ---
try:
    apply_filters_to_pose2d
except NameError:
    import pandas as pd
    from pathlib import Path

    def _hampel(series: pd.Series, window: int, nsigmas: float) -> pd.Series:
        med = series.rolling(window, center=True, min_periods=1).median()
        diff = (series - med).abs()
        mad  = diff.rolling(window, center=True, min_periods=1).median()
        thr  = nsigmas * 1.4826 * mad.fillna(0)
        outlier = diff > thr
        return series.where(~outlier, med)

    def apply_filters_to_pose2d(csv2d_path: str,
                                visibility_thresh: float = 0.5,
                                rolling_window: int = 3,
                                hampel_window: int = 7,
                                hampel_nsigmas: float = 3.0) -> str:
        """
        Saves a filtered CSV next to the original as *_filtered.csv.
        Returns the filtered path.
        """
        df = pd.read_csv(csv2d_path)
        if "visibility" in df.columns:
            df = df[df["visibility"].fillna(0.0) >= visibility_thresh].copy()

        # Sort for stable rolling ops
        df = df.sort_values(["video","landmark_index","frame"])

        # Hampel (robust outlier removal), then rolling mean smooth
        for coord in ("x","y"):
            if coord in df.columns:
                df[coord] = (
                    df.groupby(["video","landmark_index"], group_keys=False)[coord]
                      .apply(lambda s: _hampel(s, hampel_window, hampel_nsigmas))
                      .rolling(rolling_window, center=True, min_periods=1).mean()
                )

        out_path = Path(csv2d_path).with_name(Path(csv2d_path).stem + "_filtered.csv")
        df.to_csv(out_path, index=False)
        return str(out_path)

# Ensure MODEL_PATH exists (downloaded earlier)
try:
    MODEL_PATH
except NameError:
    raise RuntimeError("MODEL_PATH not found. Please run the previous model download cell first.")

# --- Helper: Ensure odd window sizes for filters ---
def _ensure_odd(n: int) -> int:
    try:
        n = int(n)
    except Exception:
        return 3
    return n if n % 2 == 1 else n + 1

hampel_window  = _ensure_odd(hampel_window)
rolling_window = _ensure_odd(rolling_window)

# --- Helper: Resolve output folder location ---
VIDEO_EXTS = {".mp4", ".mov", ".m4v", ".avi", ".mkv"}

def resolve_outputs_dir(input_path: str | Path, outputs_subdir_name: str = "outputs") -> Path:
    """
    If input is a file (has a known video extension):
        -> <file_dir>/<outputs_subdir_name>/
    If input is a folder (no extension):
        -> <parent_of_folder>/<outputs_subdir_name>/
    """
    p = Path(input_path)
    if p.is_file() or p.suffix.lower() in VIDEO_EXTS:
        return p.parent / outputs_subdir_name
    else:
        return p.parent / outputs_subdir_name

# --- Sanity check summary ---
print("\n===== Parameter Summary =====")
print(f"MODEL_VARIANT                  : {MODEL_VARIANT}")
print(f"MODEL_PATH                     : {MODEL_PATH}")
print(f"frame_stride                   : {frame_stride}")
print(f"num_poses                      : {num_poses}")
print(f"confidences (detect,pres,track): {min_pose_detection_confidence}, "
      f"{min_pose_presence_confidence}, {min_tracking_confidence}")
print(f"outputs_subdir_name            : {outputs_subdir_name}")
print(f"make_annotated_video           : {make_annotated_video}")
print(f"enable_filtering               : {enable_filtering}")
if enable_filtering:
    print(f"   visibility_thresh           : {visibility_thresh}")
    print(f"   hampel_window / nsigmas     : {hampel_window} / {hampel_nsigmas}")
    print(f"   rolling_window              : {rolling_window}")
else:
    print("   ↳ Filtering is OFF (fast mode: no smoothing or visibility threshold applied)")
print("=============================\n")




===== Parameter Summary =====
MODEL_VARIANT                  : heavy
MODEL_PATH                     : models/pose_landmarker_heavy.task
frame_stride                   : 1
num_poses                      : 1
confidences (detect,pres,track): 0.5, 0.5, 0.5
outputs_subdir_name            : outputs
make_annotated_video           : True
enable_filtering               : False
   ↳ Filtering is OFF (fast mode: no smoothing or visibility threshold applied)



In [14]:
# =========================================================
# 6. Function: Extract pose landmarks from a video file
#    - Writes RAW outputs according to the new folder rules
#    - Filtering (if enabled) happens in the NEXT block
# =========================================================

# Reuse landmark names if already defined; else define here.
try:
    landmark_index_to_name
except NameError:
    POSE_LANDMARK_NAMES = [
        "nose","left_eye_inner","left_eye","left_eye_outer",
        "right_eye_inner","right_eye","right_eye_outer",
        "left_ear","right_ear","mouth_left","mouth_right",
        "left_shoulder","right_shoulder","left_elbow","right_elbow",
        "left_wrist","right_wrist","left_pinky","right_pinky",
        "left_index","right_index","left_thumb","right_thumb",
        "left_hip","right_hip","left_knee","right_knee",
        "left_ankle","right_ankle","left_heel","right_heel",
        "left_foot_index","right_foot_index",
    ]
    landmark_index_to_name = {i: n for i, n in enumerate(POSE_LANDMARK_NAMES)}

from pathlib import Path
from typing import Optional, Union, Dict

def extract_pose_from_video(
    video_path: Union[str, Path],
    model_path: Union[str, Path],
    make_annotated_video: bool = False,
    frame_stride: int = 1,
    num_poses: int = 1,
    min_pose_detection_confidence: float = 0.5,
    min_pose_presence_confidence: float = 0.5,
    min_tracking_confidence: float = 0.5,
    output_segmentation_masks: bool = False,
    # If provided, write outputs here; else follow file/folder rules via resolve_outputs_dir(...)
    base_outputs_dir: Optional[Union[str, Path]] = None,
) -> Dict[str, Optional[str]]:
    """
    Extracts pose landmarks from a single video and saves:
      - 2D CSV (RAW):   <outputs>/<video_stem>_pose2d.csv
      - 3D CSV (RAW):   <outputs>/<video_stem>_pose3d.csv  (if world landmarks available)
      - MP4 (optional): <outputs>/<video_stem>_annotated.mp4

    Output folder rules:
      • If base_outputs_dir is given → use it.
      • Else (single-file default)   → <video_dir>/<outputs_subdir_name>/
        (outputs_subdir_name is set in the Parameters cell).

    NOTE: Any smoothing/jitter filtering is performed in the NEXT block.
    """
    import cv2, numpy as np, pandas as pd
    import mediapipe as mp
    from mediapipe.tasks import python as mp_python
    from mediapipe.tasks.python import vision as mp_vision

    video_path = Path(video_path)
    model_path = str(model_path)

    # Determine output directory
    if base_outputs_dir is not None:
        out_dir = Path(base_outputs_dir)
    else:
        try:
            out_dir = resolve_outputs_dir(video_path, outputs_subdir_name=outputs_subdir_name)
        except NameError:
            out_dir = video_path.parent / (outputs_subdir_name if 'outputs_subdir_name' in globals() else 'outputs')
    out_dir.mkdir(parents=True, exist_ok=True)

    stem = video_path.stem
    csv2d   = out_dir / f"{stem}_pose2d.csv"
    csv3d   = out_dir / f"{stem}_pose3d.csv"
    mp4_out = out_dir / f"{stem}_annotated.mp4"

    # Optional: echo filter toggle (if defined) for clarity
    if 'enable_filtering' in globals():
        print(f"[extract] enable_filtering = {enable_filtering} (filtering runs after extraction)")

    # --- OpenCV video IO ---
    cap = cv2.VideoCapture(str(video_path))
    if not cap.isOpened():
        raise FileNotFoundError(f"Cannot open video: {video_path}")

    fps = cap.get(cv2.CAP_PROP_FPS)
    if not fps or fps <= 1e-6:
        fps = 30.0  # safe fallback
    width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    writer = None
    if make_annotated_video:
        fourcc = cv2.VideoWriter_fourcc(*"mp4v")
        writer = cv2.VideoWriter(str(mp4_out), fourcc, fps / max(1, frame_stride), (width, height))

    # --- MediaPipe Tasks (VIDEO mode) ---
    BaseOptions = mp_python.BaseOptions
    PoseLandmarker = mp_vision.PoseLandmarker
    PoseLandmarkerOptions = mp_vision.PoseLandmarkerOptions
    RunningMode = mp_vision.RunningMode

    options = PoseLandmarkerOptions(
        base_options=BaseOptions(model_asset_path=model_path),
        running_mode=RunningMode.VIDEO,
        num_poses=num_poses,
        min_pose_detection_confidence=min_pose_detection_confidence,
        min_pose_presence_confidence=min_pose_presence_confidence,
        min_tracking_confidence=min_tracking_confidence,
        output_segmentation_masks=output_segmentation_masks,
    )

    # --- Helpers (image conversion + simple skeleton overlay) ---
    def _mp_image_from_bgr(bgr):
        rgb = cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)
        return mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb)

    def _draw_skeleton(bgr, norm_landmarks, visibility_thresh: float = 0.5):
        h, w = bgr.shape[:2]
        pts = {}
        for i, lm in enumerate(norm_landmarks):
            vis = getattr(lm, "visibility", 1.0) or 0.0
            if vis >= visibility_thresh:
                x, y = int(lm.x * w), int(lm.y * h)
                pts[i] = (x, y)
                cv2.circle(bgr, (x, y), 2, (255, 255, 255), -1)
        for a, b in [
            (11,13),(13,15),(12,14),(14,16),(11,12),(23,24),(11,23),(12,24),
            (23,25),(25,27),(24,26),(26,28),(27,29),(29,31),(28,30),(30,32)
        ]:
            if a in pts and b in pts:
                cv2.line(bgr, pts[a], pts[b], (255, 255, 255), 2)

    rows2d, rows3d = [], []

    with PoseLandmarker.create_from_options(options) as landmarker:
        frame_idx = 0
        while True:
            ok, bgr = cap.read()
            if not ok:
                break

            # Frame skipping for speed
            if frame_stride > 1 and (frame_idx % frame_stride != 0):
                frame_idx += 1
                continue

            # VIDEO mode requires monotonic ms timestamps
            ts_ms = int((frame_idx / fps) * 1000.0)
            mp_image = _mp_image_from_bgr(bgr)
            result = landmarker.detect_for_video(mp_image, ts_ms)

            for pose_id, nlands in enumerate(result.pose_landmarks):
                # 2D normalized landmarks (+ visibility)
                for li, lm in enumerate(nlands):
                    rows2d.append({
                        "video": video_path.name,
                        "frame": frame_idx,
                        "time_ms": ts_ms,
                        "landmark_index": li,
                        "landmark_name": landmark_index_to_name.get(li, str(li)),
                        "x": lm.x, "y": lm.y, "z": lm.z,
                        "visibility": getattr(lm, "visibility", np.nan),
                    })

                # 3D world landmarks (meters), if available
                if len(result.pose_world_landmarks) > pose_id:
                    wlands = result.pose_world_landmarks[pose_id]
                    for li, lm in enumerate(wlands):
                        rows3d.append({
                            "video": video_path.name,
                            "frame": frame_idx,
                            "time_ms": ts_ms,
                            "landmark_index": li,
                            "landmark_name": landmark_index_to_name.get(li, str(li)),
                            "x_m": lm.x, "y_m": lm.y, "z_m": lm.z,
                            "visibility": getattr(lm, "visibility", np.nan),
                        })

                # Optional overlay
                if writer is not None and len(nlands) > 0:
                    bgr_draw = bgr.copy()
                    _draw_skeleton(bgr_draw, nlands, visibility_thresh=0.5)
                    writer.write(bgr_draw)

            frame_idx += 1

    cap.release()
    if writer is not None:
        writer.release()

    # --- Save RAW CSVs ---
    import pandas as pd
    pd.DataFrame(rows2d).to_csv(csv2d, index=False)
    if rows3d:
        pd.DataFrame(rows3d).to_csv(csv3d, index=False)
        csv3d_str = str(csv3d)
    else:
        csv3d_str = None

    # Return RAW paths; the next block may also produce a *_filtered.csv
    return {
        "csv2d": str(csv2d),
        "csv3d": csv3d_str,
        "annotated_mp4": (str(mp4_out) if make_annotated_video else None),
    }

# --- Quick peek helper (unchanged) ---
def peek_csv(path, n=5):
    import pandas as pd
    df = pd.read_csv(path)
    print(f"{path} → shape={df.shape}")
    display(df.head(n))
    return df


## Input the path of the video or the folder containing videos

In [15]:
# =========================================
# 7. Paste your input path (file OR folder)
# =========================================
# Examples:
# input_path = "/path/to/video.mp4"
# input_path = "/path/to/folder_with_videos"

input_path = "/Users/souvikmandal/Documents/06_Teaching_Mentoring/LS100_comp_etho/2025/media/video/Kevin_2022_Day5_CRNCH.mp4"  # ← paste here (keep quotes)


## Now, run the coe blcok below.

Pleae note that if you set the enable_filtering (the jitter filter) to `True`, it will take loger time.

In [16]:
# =========================================================
# 8. Run extraction and save outputs (CSV + annotated MP4s)
#    - If input_path is a single video file: outputs → <video_dir>/<outputs_subdir_name>/
#    - If input_path is a folder: outputs (shared) → <parent_of_folder>/<outputs_subdir_name>/
#    - Writes a manifest CSV when processing a folder
# =========================================================
from pathlib import Path
import pandas as pd

if not input_path or not str(input_path).strip():
    raise ValueError("Please set `input_path` in the previous cell.")

p = Path(input_path).expanduser().resolve()

# Ensure resolver exists (it was defined in §5.a)
try:
    resolve_outputs_dir
except NameError:
    # Minimal fallback (same logic as earlier)
    VIDEO_EXTS = {".mp4", ".mov", ".m4v", ".avi", ".mkv"}
    def resolve_outputs_dir(input_path, outputs_subdir_name="outputs"):
        ip = Path(input_path)
        if ip.is_file() or ip.suffix.lower() in VIDEO_EXTS:
            return ip.parent / outputs_subdir_name
        else:
            return ip.parent / outputs_subdir_name

VIDEO_EXTS = VIDEO_EXTS if "VIDEO_EXTS" in globals() else {".mp4", ".mov", ".m4v", ".avi", ".mkv"}

def _is_video_file(path: Path) -> bool:
    return path.is_file() and path.suffix.lower() in VIDEO_EXTS

if _is_video_file(p):
    # -------- Single video mode --------
    base_out = resolve_outputs_dir(p, outputs_subdir_name)
    base_out.mkdir(parents=True, exist_ok=True)
    print(f"Single video detected.\nOutputs will be saved to: {base_out}")

    outs = extract_pose_from_video(
        video_path=str(p),
        model_path=MODEL_PATH,
        make_annotated_video=make_annotated_video,
        frame_stride=frame_stride,
        num_poses=num_poses,
        min_pose_detection_confidence=min_pose_detection_confidence,
        min_pose_presence_confidence=min_pose_presence_confidence,
        min_tracking_confidence=min_tracking_confidence,
        base_outputs_dir=base_out,  # important
    )
    print("\n✔ Done.")
    print("2D CSV :", outs.get("csv2d"))
    print("3D CSV :", outs.get("csv3d"))
    print("MP4    :", outs.get("annotated_mp4"))

else:
    # -------- Folder mode --------
    if not p.exists() or not p.is_dir():
        raise NotADirectoryError(f"Not a directory: {p}")

    # Shared outputs placed alongside the folder
    base_out = resolve_outputs_dir(p, outputs_subdir_name)
    base_out.mkdir(parents=True, exist_ok=True)
    print(f"Folder detected. Outputs will be saved to: {base_out}")

    # Find videos (non-recursive by default; flip to rglob for recursive)
    videos = sorted([str(f) for f in p.iterdir() if _is_video_file(f)])
    if not videos:
        # Try recursive as a helpful fallback
        videos = sorted([str(f) for f in p.rglob("*") if _is_video_file(f)])
        if videos:
            print(f"Found {len(videos)} video(s) (recursive search).")
        else:
            raise FileNotFoundError(f"No supported video files found in: {p}")

    records = []
    for i, vp in enumerate(videos, 1):
        print(f"[{i}/{len(videos)}] {vp}")
        try:
            outs = extract_pose_from_video(
                video_path=vp,
                model_path=MODEL_PATH,
                make_annotated_video=make_annotated_video,
                frame_stride=frame_stride,
                num_poses=num_poses,
                min_pose_detection_confidence=min_pose_detection_confidence,
                min_pose_presence_confidence=min_pose_presence_confidence,
                min_tracking_confidence=min_tracking_confidence,
                base_outputs_dir=base_out,  # important
            )
            records.append({"video": vp, **outs, "status": "ok", "error": ""})
        except Exception as e:
            records.append({"video": vp, "csv2d": None, "csv3d": None,
                            "annotated_mp4": None, "status": "error", "error": str(e)})

    manifest = base_out / "outputs_manifest.csv"
    pd.DataFrame.from_records(records).to_csv(manifest, index=False)
    print(f"\n✔ Batch complete. Manifest saved to: {manifest}")


Single video detected.
Outputs will be saved to: /Users/souvikmandal/Documents/06_Teaching_Mentoring/LS100_comp_etho/2025/media/video/outputs
[extract] enable_filtering = False (filtering runs after extraction)


I0000 00:00:1761594301.579539 53872720 gl_context.cc:369] GL version: 2.1 (2.1 Metal - 88.1), renderer: Apple M2 Max
W0000 00:00:1761594301.656664 54157257 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1761594301.732664 54157256 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.



✔ Done.
2D CSV : /Users/souvikmandal/Documents/06_Teaching_Mentoring/LS100_comp_etho/2025/media/video/outputs/Kevin_2022_Day5_CRNCH_pose2d.csv
3D CSV : /Users/souvikmandal/Documents/06_Teaching_Mentoring/LS100_comp_etho/2025/media/video/outputs/Kevin_2022_Day5_CRNCH_pose3d.csv
MP4    : /Users/souvikmandal/Documents/06_Teaching_Mentoring/LS100_comp_etho/2025/media/video/outputs/Kevin_2022_Day5_CRNCH_annotated.mp4



> **Notes**
> - **2D normalized coordinates**: `x,y∈[0,1]` relative to image width/height (values can be outside the range if the estimated point is out-of-frame). `z` is depthlike (negative is closer).
> - **3D world coordinates**: `x,y,z` are in **meters** in a world coordinate space centered near the hips.  
> - **visibility**: confidence for each landmark’s presence in the frame.



## 7. Optional: compute simple joint angles

#### We will explore this in he next guide
Once you have landmarks, you can compute feature engineering targets like **elbow** or **knee angles**. Below is a tiny utility to compute an angle between three named landmarks per frame.


In [None]:

def _angle_between(a, b, c):
    # a,b,c are 2D points (x,y) or 3D (x,y,z) — here we'll use 2D image coords
    a, b, c = np.array(a), np.array(b), np.array(c)
    ba = a - b
    bc = c - b
    cosang = np.dot(ba, bc) / (np.linalg.norm(ba) * np.linalg.norm(bc) + 1e-9)
    cosang = np.clip(cosang, -1.0, 1.0)
    return np.degrees(np.arccos(cosang))

def compute_joint_angle_csv(csv2d_path: str, joint=("left_shoulder","left_elbow","left_wrist")) -> pd.DataFrame:
    df = pd.read_csv(csv2d_path)
    # wide pivot: columns like x_left_shoulder, y_left_shoulder, etc.
    wide = df.pivot_table(index=["video","frame","time_ms"], columns="landmark_name", values=["x","y"])
    # helper to get a point
    def P(name):
        return np.c_[wide["x"][name].values, wide["y"][name].values]
    A,B,C = P(joint[0]), P(joint[1]), P(joint[2])
    angles = np.array([_angle_between(a,b,c) for a,b,c in zip(A,B,C)])
    out = pd.DataFrame({
        "video": wide.index.get_level_values("video"),
        "frame": wide.index.get_level_values("frame"),
        "time_ms": wide.index.get_level_values("time_ms"),
        f"angle_{'_'.join(joint)}": angles
    })
    return out

# Example (after extraction):
# angle_df = compute_joint_angle_csv("outputs/yourvideo_pose2d.csv", ("left_shoulder","left_elbow","left_wrist"))
# angle_df.head()



## 8. Notes & best practices

- **Timestamps matter:** in `VIDEO` mode you *must* pass `timestamp_ms` that increases with frames; we compute it from frame index and FPS.  
- **Tracking saves compute:** in `VIDEO`/`LIVE_STREAM` the task performs pose tracking so the full model isn’t re-run every frame (helps latency).  
- **Out-of-frame landmarks:** 2D normalized `x,y` can be outside `[0,1]` if a joint is off‑screen; use `visibility` to filter.  
- **Model choice:** start with **full**, switch to **lite** for underpowered laptops or large batches, use **heavy** when you need the highest accuracy and can afford the speed.  
- **Stride:** a cheap speedup is `frame_stride=2` (½ the frames) or higher.  
- **Ethics & consent:** if students process videos of people, teach consent, privacy, and secure storage.



---

### Troubleshooting
- If you see `NoneType` for results, ensure the **model path exists** and your video actually contains a person.  
- If you get slowdowns or memory pressure, try `frame_stride=2` or the `"lite"` model.  
- On some platforms OpenCV MP4 writing may need codecs; if a saved video is empty, try a different `fourcc` (e.g., `cv2.VideoWriter_fourcc(*"avc1")`) or install `opencv-python-headless` alternatives.

Happy exploring!
