# 6156 Capstone Project

Group 5: Connor Lynch, Harrison Kim

In [1]:
# Clone directly from GitHub
!git clone https://github.com/harrisonkimdev/6156-capstone-project.git
%cd 6156-capstone-project

import os
print("Cloned Repository!")
print(f"Current location: {os.getcwd()}")

fatal: destination path '6156-capstone-project' already exists and is not an empty directory.
/Users/harrisonkim/code/repos/6156-capstone-project/notebooks/6156-capstone-project
Cloned Repository!
Current location: /Users/harrisonkim/code/repos/6156-capstone-project/notebooks/6156-capstone-project


## Install required packages and configure pose_ai module path

In [3]:
# 1. Install packages (protobuf>=5.29.1 avoids Colab resolver conflicts)
import os
import sys
import subprocess
from pathlib import Path

def find_repo_root() -> Path:
    candidates = [
        Path.cwd(),
        Path.cwd() / '6156-capstone-project',
        Path('/content/6156-capstone-project'),
    ]
    candidates.extend(Path.cwd().parents)
    seen = set()
    for candidate in candidates:
        candidate = candidate.resolve()
        if candidate in seen:
            continue
        seen.add(candidate)
        if (candidate / 'src').exists():
            return candidate
    raise FileNotFoundError('Could not locate repo root containing src/. Make sure the repo is cloned.')

repo_root = find_repo_root()
if Path.cwd().resolve() != repo_root:
    os.chdir(repo_root)
    print(f'Changed working directory to repo root: {repo_root}')
else:
    print(f'Using existing working directory: {repo_root}')

requirements_path = repo_root / 'requirements.txt'
fallback_packages = [
    'numpy',
    'pandas',
    'scikit-learn',
    'xgboost',
    'opencv-python',
    'mediapipe',
    'protobuf>=5.29.1,<7',
]

def _pip_install(*args):
    cmd = [sys.executable, '-m', 'pip', *args]
    print('Running:', ' '.join(cmd))
    subprocess.run(cmd, check=True)

_pip_install('install', '-q', '--upgrade', 'pip')
if requirements_path.exists():
    _pip_install('install', '-q', '-r', str(requirements_path))
else:
    print(f'⚠️ requirements.txt not found at {requirements_path}, installing fallback packages.')
    _pip_install('install', '-q', *fallback_packages)

# 2. Configure path to import pose_ai module
# Add src folder to Python path
src_path = repo_root / 'src'

if src_path.exists():
    if str(src_path) not in sys.path:
        sys.path.insert(0, str(src_path))
    os.environ.setdefault('PYTHONPATH', str(src_path) + (':' + os.environ.get('PYTHONPATH', '')))
    print(f"✓ src path added: {src_path}")
else:
    print(f"⚠️ Warning: src folder not found: {src_path}")
    print('   Please ensure the repository is cloned or src/ folder exists.')

# 3. Verify pose_ai import
try:
    import pose_ai as _pose_ai
    print(f"✓ pose_ai module imported successfully! (version: {getattr(_pose_ai, '__version__', 'unknown')})")
except Exception as e:  # pylint: disable=broad-exception-caught
    print(f"✗ pose_ai import failed: {e}")
    print('  Solutions:')
    print('  1. Verify the repository is cloned correctly')
    print('  2. Check if src/pose_ai/ folder exists')
    print('  3. Refer to "How to Use in Google Colab" cell above for repo setup')

# Display current working directory
print(f"Current working directory: {Path.cwd()}")
print(f"src added to Python path: {str(src_path) if src_path.exists() else 'N/A'}")


SyntaxError: unterminated string literal (detected at line 79) (498671747.py, line 79)

# Scripts Python Collection

This notebook aggregates the Python scripts from the `scripts` directory.

## scripts/extract_frames.py

In [4]:
"""CLI for extracting frame sequences from climbing videos."""

from __future__ import annotations

import argparse
import logging
from pathlib import Path

from pose_ai.data import FrameExtractionResult, extract_frames_every_n_seconds, iter_video_files


LOGGER = logging.getLogger("pose_ai.scripts.extract_frames")


def configure_logging(verbose: bool) -> None:
    level = logging.DEBUG if verbose else logging.INFO
    logging.basicConfig(
        level=level,
        format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
    )


def extract_from_directory(
    source_dir: Path,
    *,
    output_root: Path,
    interval_seconds: float,
    recursive: bool,
    overwrite: bool,
    write_manifest: bool,
) -> list[FrameExtractionResult]:
    results: list[FrameExtractionResult] = []
    for video_path in iter_video_files(source_dir, recursive=recursive):
        LOGGER.info("Processing %s", video_path)
        result = extract_frames_every_n_seconds(
            video_path,
            interval_seconds=interval_seconds,
            output_root=output_root,
            write_manifest=write_manifest,
            overwrite=overwrite,
        )
        LOGGER.info(
            "Saved %d frames for %s",
            result.saved_frames,
            video_path.name,
        )
        results.append(result)
    if not results:
        LOGGER.warning("No video files found in %s", source_dir)
    return results


def build_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(
        description="Extract frame sequences from climbing videos.",
    )
    parser.add_argument(
        "video_dir",
        type=Path,
        help="Directory containing source video files.",
    )
    parser.add_argument(
        "--output",
        type=Path,
        default=Path("data") / "frames",
        help="Directory where frame folders will be stored.",
    )
    parser.add_argument(
        "--interval",
        type=float,
        default=1.0,
        help="Seconds between captured frames (default: 1.0).",
    )
    parser.add_argument(
        "--recursive",
        action="store_true",
        help="Search for videos recursively.",
    )
    parser.add_argument(
        "--overwrite",
        action="store_true",
        help="Overwrite existing extracted frames.",
    )
    parser.add_argument(
        "--no-manifest",
        dest="write_manifest",
        action="store_false",
        help="Disable writing manifest.json files.",
    )
    parser.add_argument(
        "--verbose",
        action="store_true",
        help="Enable verbose logging.",
    )
    return parser


def main() -> None:
    parser = build_parser()
    args = parser.parse_args()

    configure_logging(verbose=args.verbose)
    extract_from_directory(
        args.video_dir,
        output_root=args.output,
        interval_seconds=args.interval,
        recursive=args.recursive,
        overwrite=args.overwrite,
        write_manifest=args.write_manifest,
    )


if __name__ == "__main__":
    main()


ModuleNotFoundError: No module named 'pose_ai'

## scripts/run_feature_export.py

In [None]:
"""CLI to export pose-derived features from manifests."""

from __future__ import annotations

import argparse
import json
from pathlib import Path

from pose_ai.service import export_features_for_manifest


def build_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(description="Export pose feature rows from pose_results.json.")
    parser.add_argument("manifest", type=Path, help="Path to manifest.json")
    parser.add_argument(
        "--holds",
        type=Path,
        help="Optional JSON describing holds (name -> coords, normalized, etc).",
    )
    parser.add_argument(
        "--out",
        type=Path,
        default=None,
        help="Output directory (defaults to manifest directory).",
    )
    return parser


def main() -> None:
    parser = build_parser()
    args = parser.parse_args()
    output_path = export_features_for_manifest(
        args.manifest,
        holds_path=args.holds,
        output_root=args.out,
    )
    print(f"Feature rows saved to {output_path}")


if __name__ == "__main__":
    main()


## scripts/run_pipeline.py

## scripts/generate_holds_and_features.py


In [None]:
"""Generate holds.json and enriched pose_features.json for an existing frame directory.

Usage:
    python scripts/generate_holds_and_features.py /path/to/frames/manifest.json --model yolov8m.pt
"""

from __future__ import annotations

import argparse
from pathlib import Path
import sys

ROOT_DIR = Path(__file__).resolve().parents[1]
SRC_DIR = ROOT_DIR / "src"
if str(SRC_DIR) not in sys.path:
    sys.path.insert(0, str(SRC_DIR))

from pose_ai.service.feature_service import export_features_for_manifest
from pose_ai.service.hold_extraction import extract_and_cluster_holds, export_holds_json


def build_parser() -> argparse.ArgumentParser:
    p = argparse.ArgumentParser(description="Hold extraction + enriched feature export")
    p.add_argument("manifest", type=Path, help="Path to manifest.json")
    p.add_argument("--model", default="yolov8n.pt", help="YOLO model weights")
    p.add_argument("--device", default=None, help="Optional torch device")
    return p


def main() -> None:
    parser = build_parser()
    args = parser.parse_args()
    manifest_path = args.manifest.expanduser().resolve()
    if not manifest_path.exists():
        raise SystemExit(f"Manifest not found: {manifest_path}")
    frame_dir = manifest_path.parent
    image_paths = sorted(frame_dir.glob("*.jpg"))
    if not image_paths:
        print("No frame images found; aborting hold extraction.")
        holds_path = None
    else:
        print(f"Detecting holds in {len(image_paths)} frames using {args.model}")
        clustered = extract_and_cluster_holds(image_paths, model_name=args.model, device=args.device)
        if clustered:
            holds_path = export_holds_json(clustered, output_path=frame_dir / "holds.json")
            print(f"Exported {len(clustered)} clustered holds to {holds_path}")
        else:
            holds_path = None
            print("No holds detected.")
    print("Exporting enriched pose features (with wall angle & holds)...")
    export_features_for_manifest(manifest_path, holds_path=holds_path, auto_wall_angle=True)
    print("Done.")


if __name__ == "__main__":
    main()


In [None]:
"""End-to-end pipeline: extract frames, run pose estimation, features, segments, visualize."""

from __future__ import annotations

import argparse
import sys
from pathlib import Path

ROOT_DIR = Path(__file__).resolve().parents[1]
SRC_DIR = ROOT_DIR / "src"
if str(SRC_DIR) not in sys.path:
    sys.path.insert(0, str(SRC_DIR))

from pose_ai.service import (  # type: ignore  # pylint: disable=wrong-import-position
    estimate_poses_from_manifest,
    export_features_for_manifest,
    generate_segment_report,
)
from extract_frames import extract_frames_every_n_seconds, iter_video_files  # type: ignore
from visualize_pose import visualize_pose_results  # type: ignore

# NOTE: For simplicity we call into script helpers directly for pose/feature export
# and reuse service APIs for intermediate steps.

def build_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(description="Run entire pose analysis pipeline")
    parser.add_argument("video_dir", type=Path, help="Directory containing videos (.mp4, etc.)")
    parser.add_argument("--out", type=Path, default=Path("data/frames"), help="Frame output directory")
    parser.add_argument("--interval", type=float, default=1.0, help="Extraction interval (seconds)")
    parser.add_argument("--skip-visuals", action="store_true", help="Skip visualization step")
    return parser


def extract_frames(video_dir: Path, out_dir: Path, interval: float) -> list[Path]:
    manifests = []
    out_dir.mkdir(parents=True, exist_ok=True)
    for video_file in iter_video_files(video_dir):
        result = extract_frames_every_n_seconds(video_file, output_root=out_dir, interval_seconds=interval)
        manifests.append(result.frame_directory / "manifest.json")
    return manifests


def main() -> None:
    parser = build_parser()
    args = parser.parse_args()
    manifests = extract_frames(args.video_dir, args.out, args.interval)
    for manifest in manifests:
        print(f"Processing manifest {manifest}")
        estimate_poses_from_manifest(manifest)
        export_features_for_manifest(manifest)
        generate_segment_report(manifest)
        if not args.skip_visuals:
            frame_dir = manifest.parent
            visualize_pose_results(frame_dir / "pose_results.json")
    print("Pipeline completed.")


if __name__ == "__main__":
    main()


## scripts/run_pose_estimation.py

In [None]:
"""CLI to run pose estimation on extracted frame sequences."""

from __future__ import annotations

import argparse
import json
import sys
from pathlib import Path

from pose_ai.pose import PoseEstimator
from pose_ai.service import estimate_poses_for_directory, estimate_poses_from_manifest


def build_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(description="Run MediaPipe pose estimation on frame manifests.")
    group = parser.add_mutually_exclusive_group(required=True)
    group.add_argument("--manifest", type=Path, help="Path to a manifest.json file.")
    group.add_argument("--frames-root", type=Path, help="Directory containing extracted frame folders.")
    parser.add_argument(
        "--json",
        action="store_true",
        help="Print pose results as JSON instead of a textual summary.",
    )
    parser.add_argument(
        "--no-save",
        action="store_true",
        help="Disable writing pose_results.json files alongside frames.",
    )
    return parser


def frames_to_dict(frames):
    return [
        {
            "image_path": str(frame.image_path),
            "timestamp_seconds": frame.timestamp_seconds,
            "detection_score": frame.detection_score,
            "landmarks": [
                {
                    "name": landmark.name,
                    "x": landmark.x,
                    "y": landmark.y,
                    "z": landmark.z,
                    "visibility": landmark.visibility,
                }
                for landmark in frame.landmarks
            ],
        }
        for frame in frames
    ]


def main() -> None:
    parser = build_parser()
    args = parser.parse_args()

    estimator = PoseEstimator()
    try:
        if args.manifest:
            frames = estimate_poses_from_manifest(
                args.manifest,
                estimator=estimator,
                save_json=not args.no_save,
            )
            if args.json:
                print(json.dumps(frames_to_dict(frames), indent=2))
            else:
                print(f"Processed {len(frames)} frames from {args.manifest}")
        else:
            results = estimate_poses_for_directory(
                args.frames_root,
                estimator=estimator,
                save_json=not args.no_save,
            )
            if args.json:
                payload = {manifest: frames_to_dict(frames) for manifest, frames in results.items()}
                print(json.dumps(payload, indent=2))
            else:
                for manifest, frames in results.items():
                    print(f"{manifest}: {len(frames)} frames")
    except ModuleNotFoundError as exc:
        parser.error(
            f"{exc}. Ensure mediapipe is installed in your environment (e.g. `pip install mediapipe`)."
        )
    finally:
        estimator.close()


if __name__ == "__main__":
    main()


## scripts/train_model.py


In [None]:
#!/usr/bin/env python3
"""Train multitask model (BiLSTM or Transformer).

Usage:
    # Train BiLSTM (default)
    python scripts/train_model.py \
        --data data/features \
        --epochs 100 \
        --device cuda

    # Train Transformer
    python scripts/train_model.py \
        --data data/features \
        --model-type transformer \
        --num-layers 4 \
        --num-heads 8 \
        --epochs 100 \
        --device cuda
"""

import argparse
import sys
from pathlib import Path

# Add src to path
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))

try:
    import torch
    from torch.utils.data import DataLoader
except ImportError:
    print("Error: PyTorch is required. Install with: pip install torch")
    sys.exit(1)

from pose_ai.ml.dataset import create_datasets_from_directory
from pose_ai.ml.models import (
    BiLSTMMultitaskModel,
    TransformerMultitaskModel,
    ModelConfig,
    TransformerConfig,
    count_parameters,
)
from pose_ai.ml.train import Trainer, TrainingConfig


def main():
    parser = argparse.ArgumentParser(description="Train multitask model (BiLSTM or Transformer)")
    
    # Model selection
    parser.add_argument(
        "--model-type",
        choices=["bilstm", "transformer"],
        default="bilstm",
        help="Model architecture type (default: bilstm)"
    )
    
    # Data and checkpointing
    parser.add_argument(
        "--data",
        type=Path,
        required=True,
        help="Directory containing feature JSON files"
    )
    parser.add_argument(
        "--checkpoint-dir",
        type=Path,
        default=Path("models/checkpoints"),
        help="Directory for model checkpoints (default: models/checkpoints)"
    )
    
    # Training hyperparameters
    parser.add_argument("--epochs", type=int, default=100, help="Number of training epochs (default: 100)")
    parser.add_argument("--batch-size", type=int, default=32, help="Batch size (default: 32)")
    parser.add_argument("--window-size", type=int, default=32, help="Window size for sliding windows (default: 32)")
    parser.add_argument("--stride", type=int, default=1, help="Stride for sliding windows (default: 1)")
    parser.add_argument("--dropout", type=float, default=0.3, help="Dropout rate (default: 0.3)")
    parser.add_argument("--lr", type=float, default=0.001, help="Learning rate (default: 0.001)")
    parser.add_argument("--weight-decay", type=float, default=0.0001, help="Weight decay (default: 0.0001)")
    parser.add_argument("--patience", type=int, default=10, help="Early stopping patience (default: 10)")
    parser.add_argument("--device", default="cuda", help="Device to use (cuda/cpu, default: cuda)")
    parser.add_argument("--train-split", type=float, default=0.7, help="Training split ratio (default: 0.7)")
    parser.add_argument("--val-split", type=float, default=0.2, help="Validation split ratio (default: 0.2)")
    
    # BiLSTM-specific arguments
    parser.add_argument("--hidden-dim", type=int, default=128, help="LSTM hidden dimension (default: 128)")
    parser.add_argument("--num-layers", type=int, default=2, help="Number of LSTM/Transformer layers (default: 2 for BiLSTM, 4 for Transformer)")
    parser.add_argument("--no-attention", action="store_true", help="[BiLSTM only] Disable attention pooling")
    
    # Transformer-specific arguments
    parser.add_argument("--d-model", type=int, default=128, help="[Transformer only] Model dimension (default: 128)")
    parser.add_argument("--num-heads", type=int, default=8, help="[Transformer only] Number of attention heads (default: 8)")
    parser.add_argument("--dim-feedforward", type=int, default=512, help="[Transformer only] Feedforward network dimension (default: 512)")
    parser.add_argument("--pooling", choices=["mean", "max", "cls"], default="mean", help="[Transformer only] Pooling strategy (default: mean)")
    parser.add_argument("--positional-encoding", choices=["sinusoidal", "learnable"], default="sinusoidal", help="[Transformer only] Positional encoding type (default: sinusoidal)")
    
    args = parser.parse_args()
    
    # Validate data directory
    if not args.data.exists():
        print(f"Error: Data directory not found: {args.data}")
        sys.exit(1)
    
    # Create checkpoint directory
    args.checkpoint_dir.mkdir(parents=True, exist_ok=True)
    
    print("=" * 60)
    print(f"{args.model_type.upper()} Multitask Model Training")
    print("=" * 60)
    
    # Create datasets
    print("\nCreating datasets...")
    train_dataset, val_dataset, test_dataset = create_datasets_from_directory(
        args.data,
        window_size=args.window_size,
        stride=args.stride,
        train_split=args.train_split,
        val_split=args.val_split,
        normalize=True,
    )
    
    if train_dataset is None or val_dataset is None:
        print("Error: Failed to create datasets")
        sys.exit(1)
    
    print(f"  Training samples: {len(train_dataset)}")
    print(f"  Validation samples: {len(val_dataset)}")
    if test_dataset:
        print(f"  Test samples: {len(test_dataset)}")
    
    # Create data loaders
    train_loader = DataLoader(train_dataset, batch_size=args.batch_size, shuffle=True, num_workers=4, pin_memory=True if args.device == "cuda" else False)
    val_loader = DataLoader(val_dataset, batch_size=args.batch_size, shuffle=False, num_workers=4, pin_memory=True if args.device == "cuda" else False)
    
    # Create model based on type
    print("\nCreating model...")
    
    if args.model_type == "bilstm":
        model_config = ModelConfig(
            input_dim=60,
            hidden_dim=args.hidden_dim,
            num_layers=args.num_layers,
            dropout=args.dropout,
            num_action_classes=5,
            bidirectional=True,
            use_attention=not args.no_attention,
        )
        model = BiLSTMMultitaskModel(model_config)
        print(f"  Model: BiLSTM (bidirectional={model_config.bidirectional})")
        print(f"  Hidden dim: {args.hidden_dim}")
        print(f"  Layers: {args.num_layers}")
        print(f"  Attention: {model_config.use_attention}")
    elif args.model_type == "transformer":
        num_layers = args.num_layers if args.num_layers != 2 else 4
        transformer_config = TransformerConfig(
            input_dim=60,
            d_model=args.d_model,
            nhead=args.num_heads,
            num_layers=num_layers,
            dim_feedforward=args.dim_feedforward,
            dropout=args.dropout,
            num_action_classes=5,
            pooling=args.pooling,
            positional_encoding=args.positional_encoding,
        )
        model = TransformerMultitaskModel(transformer_config)
        print(f"  Model: Transformer")
        print(f"  d_model: {args.d_model}")
        print(f"  Layers: {num_layers}")
        print(f"  Heads: {args.num_heads}")
        print(f"  Pooling: {args.pooling}")
        print(f"  Positional encoding: {args.positional_encoding}")
    else:
        print(f"Error: Unknown model type: {args.model_type}")
        sys.exit(1)
    
    num_params = count_parameters(model)
    print(f"  Total parameters: {num_params:,}")
    
    # Create trainer
    print("\nSetting up trainer...")
    training_config = TrainingConfig(
        epochs=args.epochs,
        batch_size=args.batch_size,
        learning_rate=args.lr,
        weight_decay=args.weight_decay,
        patience=args.patience,
        device=args.device,
        checkpoint_dir=args.checkpoint_dir,
        log_interval=10,
    )
    
    trainer = Trainer(model=model, train_loader=train_loader, val_loader=val_loader, config=training_config)
    
    # Train
    print("\n" + "=" * 60)
    trainer.train()
    
    # Save final model
    print("\nSaving final model...")
    final_model_path = args.checkpoint_dir / f"{args.model_type}_multitask.pt"
    model.save(final_model_path)
    print(f"Model saved to: {final_model_path}")
    
    # Save normalization parameters
    if train_dataset.feature_mean is not None:
        import numpy as np
        norm_path = args.checkpoint_dir / "normalization.npz"
        np.savez(norm_path, mean=train_dataset.feature_mean, std=train_dataset.feature_std)
        print(f"Normalization parameters saved to: {norm_path}")
    
    print("\nTraining complete!")


if __name__ == "__main__":
    main()


## scripts/evaluate_model.py


In [None]:
#!/usr/bin/env python3
"""Evaluate multitask model (BiLSTM or Transformer).

Usage:
    python scripts/evaluate_model.py \
        --model models/checkpoints/bilstm_multitask.pt \
        --data data/features \
        --device cuda
"""

import argparse
import sys
from pathlib import Path

# Add src to path
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))

try:
    import torch
    from torch.utils.data import DataLoader
    import numpy as np
except ImportError:
    print("Error: PyTorch and NumPy are required. Install with: pip install torch numpy")
    sys.exit(1)

from pose_ai.ml.dataset import create_datasets_from_directory
from pose_ai.ml.models import BiLSTMMultitaskModel, TransformerMultitaskModel, MultitaskLoss


def load_model(model_path: Path, device: str):
    """Load model with auto-detection of type."""
    checkpoint = torch.load(model_path, map_location=device)
    model_type = checkpoint.get("model_type", "bilstm")
    print(f"Detected model type: {model_type}")
    
    if model_type == "bilstm":
        model = BiLSTMMultitaskModel.load(model_path, device=device)
    elif model_type == "transformer":
        model = TransformerMultitaskModel.load(model_path, device=device)
    else:
        raise ValueError(f"Unknown model type: {model_type}")
    
    return model, model_type


def evaluate_model(model, data_loader, criterion, device) -> dict:
    """Evaluate model on dataset."""
    model.eval()
    total_loss = 0.0
    total_eff_loss = 0.0
    total_action_loss = 0.0
    num_batches = 0
    
    all_eff_pred = []
    all_eff_true = []
    all_action_pred = []
    all_action_true = []
    
    with torch.no_grad():
        for features, eff_labels, action_labels in data_loader:
            features = features.to(device)
            eff_labels = eff_labels.to(device)
            action_labels = action_labels.to(device)
            
            eff_pred, action_logits = model(features)
            loss, loss_dict = criterion(eff_pred, eff_labels, action_logits, action_labels)
            
            total_loss += loss_dict["total"]
            total_eff_loss += loss_dict["efficiency"]
            total_action_loss += loss_dict["action"]
            num_batches += 1
            
            all_eff_pred.extend(eff_pred.cpu().numpy().flatten())
            all_eff_true.extend(eff_labels.cpu().numpy())
            all_action_pred.extend(action_logits.argmax(dim=-1).cpu().numpy())
            all_action_true.extend(action_labels.cpu().numpy())
    
    all_eff_pred = np.array(all_eff_pred)
    all_eff_true = np.array(all_eff_true)
    all_action_pred = np.array(all_action_pred)
    all_action_true = np.array(all_action_true)
    
    eff_mae = np.abs(all_eff_pred - all_eff_true).mean()
    eff_rmse = np.sqrt(((all_eff_pred - all_eff_true) ** 2).mean())
    eff_corr = np.corrcoef(all_eff_pred, all_eff_true)[0, 1] if len(all_eff_pred) > 1 else 0.0
    
    ss_res = ((all_eff_true - all_eff_pred) ** 2).sum()
    ss_tot = ((all_eff_true - all_eff_true.mean()) ** 2).sum()
    r2 = 1 - (ss_res / ss_tot) if ss_tot > 0 else 0.0
    
    action_acc = (all_action_pred == all_action_true).mean()
    
    per_class_acc = {}
    for class_id in range(5):
        mask = all_action_true == class_id
        if mask.sum() > 0:
            per_class_acc[class_id] = (all_action_pred[mask] == all_action_true[mask]).mean()
    
    return {
        "loss": {
            "total": total_loss / num_batches,
            "efficiency": total_eff_loss / num_batches,
            "action": total_action_loss / num_batches,
        },
        "efficiency": {
            "mae": float(eff_mae),
            "rmse": float(eff_rmse),
            "correlation": float(eff_corr),
            "r2": float(r2),
        },
        "action": {
            "accuracy": float(action_acc),
            "per_class": {k: float(v) for k, v in per_class_acc.items()},
        },
    }


def main():
    parser = argparse.ArgumentParser(description="Evaluate multitask model (BiLSTM or Transformer)")
    parser.add_argument("--model", type=Path, required=True, help="Path to trained model (.pt file)")
    parser.add_argument("--data", type=Path, required=True, help="Directory containing feature JSON files")
    parser.add_argument("--batch-size", type=int, default=32, help="Batch size (default: 32)")
    parser.add_argument("--device", default="cuda", help="Device to use (cuda/cpu, default: cuda)")
    parser.add_argument("--split", default="test", choices=["train", "val", "test", "all"], help="Which split to evaluate (default: test)")
    parser.add_argument("--window-size", type=int, default=32, help="Window size (default: 32)")
    
    args = parser.parse_args()
    
    if not args.model.exists():
        print(f"Error: Model file not found: {args.model}")
        sys.exit(1)
    
    if not args.data.exists():
        print(f"Error: Data directory not found: {args.data}")
        sys.exit(1)
    
    if args.device == "cuda" and not torch.cuda.is_available():
        print("CUDA not available, using CPU")
        args.device = "cpu"
    device = torch.device(args.device)
    
    print("=" * 60)
    print("Model Evaluation")
    print("=" * 60)
    
    print(f"\nLoading model from: {args.model}")
    model, model_type = load_model(args.model, args.device)
    print(f"Model type: {model_type}")
    print(f"Model loaded on: {args.device}")
    
    print("\nLoading datasets...")
    train_dataset, val_dataset, test_dataset = create_datasets_from_directory(
        args.data, window_size=args.window_size, stride=1, normalize=True
    )
    
    criterion = MultitaskLoss(efficiency_weight=1.0, action_weight=0.5, efficiency_loss="huber")
    
    if args.split == "all":
        splits = [("Train", train_dataset), ("Val", val_dataset), ("Test", test_dataset)]
    elif args.split == "train":
        splits = [("Train", train_dataset)]
    elif args.split == "val":
        splits = [("Val", val_dataset)]
    else:
        splits = [("Test", test_dataset)]
    
    for split_name, dataset in splits:
        if dataset is None:
            print(f"\n{split_name} dataset not available")
            continue
        
        print(f"\nEvaluating on {split_name} set ({len(dataset)} samples)...")
        data_loader = DataLoader(dataset, batch_size=args.batch_size, shuffle=False, num_workers=4)
        results = evaluate_model(model, data_loader, criterion, device)
        
        print(f"\n{split_name} Results:")
        print("=" * 60)
        print("\nLoss:")
        print(f"  Total: {results['loss']['total']:.4f}")
        print(f"  Efficiency: {results['loss']['efficiency']:.4f}")
        print(f"  Action: {results['loss']['action']:.4f}")
        print("\nEfficiency Regression:")
        print(f"  MAE: {results['efficiency']['mae']:.4f}")
        print(f"  RMSE: {results['efficiency']['rmse']:.4f}")
        print(f"  Correlation: {results['efficiency']['correlation']:.4f}")
        print(f"  R²: {results['efficiency']['r2']:.4f}")
        print("\nNext-Action Classification:")
        print(f"  Accuracy: {results['action']['accuracy']:.4f}")
        print("\n  Per-Class Accuracy:")
        class_names = ["no_change", "left_hand", "right_hand", "left_foot", "right_foot"]
        for class_id, acc in results['action']['per_class'].items():
            print(f"    {class_names[class_id]}: {acc:.4f}")
    
    print("\nEvaluation complete!")


if __name__ == "__main__":
    main()


## scripts/run_segment_report.py

In [None]:
"""CLI to generate segment-level metrics."""

from __future__ import annotations

import argparse

from pose_ai.service import generate_segment_report


def build_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(description="Export segment metrics (COM, joints, contacts)")
    parser.add_argument("manifest", type=str, help="Path to manifest.json")
    parser.add_argument("--holds", type=str, help="Optional holds JSON path", default=None)
    return parser


def main() -> None:
    parser = build_parser()
    args = parser.parse_args()
    metrics = generate_segment_report(args.manifest, holds_path=Path(args.holds) if args.holds else None)
    print(f"Saved {len(metrics)} segments")


if __name__ == "__main__":
    main()


## scripts/run_segmentation.py

In [None]:
"""CLI to run rule-based segmentation over extracted frame manifests."""

from __future__ import annotations

import argparse
import json
from pathlib import Path

from pose_ai.service import segment_video_from_manifest, segment_videos_under_directory


def build_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(description="Produce rest/movement segments from frame manifests.")
    group = parser.add_mutually_exclusive_group(required=True)
    group.add_argument("--manifest", type=Path, help="Path to a manifest.json file.")
    group.add_argument("--frames-root", type=Path, help="Directory containing subfolders with manifest.json files.")
    parser.add_argument("--json", action="store_true", help="Print segmentation results as JSON.")
    return parser


def _segment_to_dict(segment):
    return {
        "start_time": segment.start_time,
        "end_time": segment.end_time,
        "label": segment.label,
        "duration": segment.duration,
        "frame_indices": segment.frame_indices,
    }


def main() -> None:
    parser = build_parser()
    args = parser.parse_args()

    if args.manifest:
        segments = segment_video_from_manifest(args.manifest)
        if args.json:
            print(json.dumps([_segment_to_dict(seg) for seg in segments], indent=2))
        else:
            print(f"Segments for {args.manifest}:")
            for seg in segments:
                print(
                    f"- {seg.label:9s} {seg.start_time:5.2f}s → {seg.end_time:5.2f}s "
                    f"(duration {seg.duration:4.2f}s, frames {seg.frame_indices})"
                )
    else:
        results = segment_videos_under_directory(args.frames_root)
        if args.json:
            payload = {
                manifest: [_segment_to_dict(seg) for seg in segments]
                for manifest, segments in results.items()
            }
            print(json.dumps(payload, indent=2))
        else:
            for manifest, segments in results.items():
                print(f"Segments for {manifest}:")
                for seg in segments:
                    print(
                        f"  - {seg.label:9s} {seg.start_time:5.2f}s → {seg.end_time:5.2f}s "
                        f"(duration {seg.duration:4.2f}s, frames {seg.frame_indices})"
                    )


if __name__ == "__main__":
    main()


## Advanced Efficiency and Planner Modules

These modules provide advanced efficiency scoring and next-move planning capabilities.


In [None]:
# ============================================================================
# ADVANCED PLANNER MODULE
# ============================================================================
# Rule-based next-move planner with efficiency simulation
# ============================================================================

from __future__ import annotations
from dataclasses import dataclass
from typing import Dict, List, MutableMapping, Optional, Sequence, Tuple
import math

try:
    import numpy as np
    HAS_NUMPY = True
except ImportError:
    HAS_NUMPY = False
    np = None

Point = Tuple[float, float]


@dataclass(slots=True)
class MoveCandidate:
    """Represents a candidate next move."""
    limb: str
    hold_id: str
    hold_position: Point
    simulated_efficiency: float
    efficiency_delta: float
    reasoning: str
    constraint_violations: List[str]
    hold_type: str | None = None


@dataclass(slots=True)
class PlannerConfig:
    """Configuration for the rule-based planner."""
    k_candidates: int = 10
    upward_bias: float = 0.3
    min_support_count: int = 2
    max_reach_ratio: float = 1.2
    com_polygon_tolerance: float = 0.15
    stability_alpha: float = 4.0
    prefer_jug_when_low_support: bool = True
    jug_bonus: float = 0.05
    reach_hold_bonus: float = 0.03
    
    def get_adjusted_reach_ratio(
        self,
        climber_wingspan: float | None = None,
        climber_height: float | None = None,
        climber_flexibility: float | None = None,
    ) -> float:
        """Compute personalized reach ratio based on climber attributes."""
        adjusted_ratio = self.max_reach_ratio
        if climber_wingspan is not None and climber_height is not None and climber_height > 0:
            wingspan_ratio = climber_wingspan / climber_height
            wingspan_multiplier = min(1.2, max(0.8, 0.9 + 0.2 * wingspan_ratio))
            adjusted_ratio *= wingspan_multiplier
        if climber_flexibility is not None:
            flexibility_bonus = 0.05 + 0.05 * climber_flexibility
            adjusted_ratio *= (1.0 + flexibility_bonus)
        return adjusted_ratio


def _safe_float(value) -> float | None:
    if value is None:
        return None
    try:
        return float(value)
    except Exception:
        return None


def _distance(p1: Point, p2: Point) -> float:
    return math.sqrt((p1[0] - p2[0]) ** 2 + (p1[1] - p2[1]) ** 2)


def _point_in_polygon(point: Point, polygon: Sequence[Point]) -> bool:
    """Ray casting algorithm for point-in-polygon test."""
    if len(polygon) < 3:
        return False
    x, y = point
    inside = False
    for i in range(len(polygon)):
        x1, y1 = polygon[i]
        x2, y2 = polygon[(i + 1) % len(polygon)]
        cond = ((y1 > y) != (y2 > y)) and (x < (x2 - x1) * (y - y1) / ((y2 - y1) or 1e-6) + x1)
        if cond:
            inside = not inside
    return inside


def _distance_to_polygon(point: Point, polygon: Sequence[Point]) -> float:
    """Distance from point to polygon."""
    if not polygon:
        return float("inf")
    if len(polygon) == 1:
        return _distance(point, polygon[0])
    if _point_in_polygon(point, polygon):
        return 0.0
    min_dist = float("inf")
    for i in range(len(polygon)):
        a = polygon[i]
        b = polygon[(i + 1) % len(polygon)]
        ax, ay = a
        bx, by = b
        px, py = point
        abx = bx - ax
        aby = by - ay
        if abs(abx) < 1e-9 and abs(aby) < 1e-9:
            dist = _distance(point, a)
        else:
            t = max(0.0, min(1.0, ((px - ax) * abx + (py - ay) * aby) / (abx * abx + aby * aby)))
            proj_x = ax + t * abx
            proj_y = ay + t * aby
            dist = math.sqrt((px - proj_x) ** 2 + (py - proj_y) ** 2)
        min_dist = min(min_dist, dist)
    return min_dist


def _compute_stability_score(com: Point, support_points: List[Point], body_scale: float, alpha: float = 4.0) -> float:
    """Compute support polygon stability score."""
    if not support_points:
        return 0.0
    distance = _distance_to_polygon(com, support_points)
    return math.exp(-alpha * (distance / (body_scale + 1e-6)))


class NextMovePlanner:
    """Rule-based planner for next move recommendations."""
    
    def __init__(self, config: Optional[PlannerConfig] = None):
        self.config = config or PlannerConfig()
    
    def plan_next_move(
        self,
        current_row: MutableMapping[str, object],
        holds: Sequence[Dict[str, object]],
        *,
        top_k: int = 3,
    ) -> List[MoveCandidate]:
        """Generate ranked next-move candidates with efficiency simulation."""
        if not holds:
            return []
        
        com_x = _safe_float(current_row.get("com_x"))
        com_y = _safe_float(current_row.get("com_y"))
        body_scale = _safe_float(current_row.get("body_scale")) or 1.0
        
        if com_x is None or com_y is None:
            return []
        
        com = (com_x, com_y)
        used_holds = set()
        for limb in ("left_hand", "right_hand", "left_foot", "right_foot"):
            hold_id = current_row.get(f"{limb}_contact_hold")
            if hold_id:
                used_holds.add(str(hold_id))
        
        candidates: List[Tuple[str, str, Point, float, str | None]] = []
        
        for hold in holds:
            hold_id = str(hold.get("hold_id") or hold.get("name", ""))
            if hold_id in used_holds:
                continue
            
            coords = hold.get("coords")
            if not isinstance(coords, (list, tuple)) or len(coords) < 2:
                continue
            
            hx, hy = float(coords[0]), float(coords[1])
            hold_pos = (hx, hy)
            hold_type = hold.get("hold_type")
            
            dist = _distance(com, hold_pos)
            dist_score = 1.0 / (dist / body_scale + 1e-6)
            
            upward_score = 0.0
            if hy < com_y:
                upward_score = self.config.upward_bias * (com_y - hy)
            
            type_bonus = 0.0
            if hold_type:
                current_support = sum(1 for limb in ("left_hand", "right_hand", "left_foot", "right_foot") 
                                    if current_row.get(f"{limb}_contact_on"))
                if self.config.prefer_jug_when_low_support and hold_type == "jug" and current_support < 3:
                    type_bonus = self.config.jug_bonus
                if hy < com_y - 0.15 * body_scale and hold_type in ("crimp", "sloper"):
                    type_bonus = max(type_bonus, self.config.reach_hold_bonus)
            
            sample_score = dist_score + upward_score + type_bonus
            
            if hy < com_y - 0.1 * body_scale:
                for limb in ("left_hand", "right_hand"):
                    if not current_row.get(f"{limb}_contact_on"):
                        candidates.append((limb, hold_id, hold_pos, sample_score, hold_type))
                        break
            else:
                for limb in ("left_foot", "right_foot"):
                    if not current_row.get(f"{limb}_contact_on"):
                        candidates.append((limb, hold_id, hold_pos, sample_score, hold_type))
                        break
        
        candidates.sort(key=lambda x: x[3], reverse=True)
        candidates = candidates[:self.config.k_candidates]
        
        move_candidates: List[MoveCandidate] = []
        for limb, hold_id, hold_pos, _, hold_type in candidates:
            # Simulate efficiency (simplified)
            support_points: List[Point] = []
            for other_limb in ("left_hand", "right_hand", "left_foot", "right_foot"):
                if other_limb == limb:
                    support_points.append(hold_pos)
                else:
                    if current_row.get(f"{other_limb}_contact_on"):
                        tx = _safe_float(current_row.get(f"{other_limb}_target_x"))
                        ty = _safe_float(current_row.get(f"{other_limb}_target_y"))
                        if tx is not None and ty is not None:
                            support_points.append((tx, ty))
            
            violations: List[str] = []
            if len(support_points) < self.config.min_support_count:
                violations.append(f"low_support_count_{len(support_points)}")
            
            stability = _compute_stability_score(com, support_points, body_scale, self.config.stability_alpha)
            sim_eff = stability * 0.5 + (1.0 - len(violations) * 0.1)
            delta_eff = sim_eff - 0.5
            
            reasoning_parts = []
            if delta_eff > 0:
                reasoning_parts.append(f"Improves efficiency by {delta_eff:.2f}")
            if hold_pos[1] < com_y:
                reasoning_parts.append("Upward progression")
            if hold_type:
                reasoning_parts.append(f"Hold type: {hold_type}")
            if not violations:
                reasoning_parts.append("No constraint violations")
            
            reasoning = "; ".join(reasoning_parts) if reasoning_parts else "Candidate move"
            
            move_candidates.append(
                MoveCandidate(
                    limb=limb,
                    hold_id=hold_id,
                    hold_position=hold_pos,
                    simulated_efficiency=sim_eff,
                    efficiency_delta=delta_eff,
                    reasoning=reasoning,
                    constraint_violations=violations,
                    hold_type=hold_type,
                )
            )
        
        valid_candidates = [c for c in move_candidates if not any("exceeded" in v or "low_support" in v for v in c.constraint_violations)]
        if not valid_candidates:
            valid_candidates = move_candidates
        
        valid_candidates.sort(key=lambda c: c.simulated_efficiency, reverse=True)
        return valid_candidates[:top_k]


print("Advanced planner module loaded")
print("Usage: planner = NextMovePlanner(); candidates = planner.plan_next_move(current_row, holds, top_k=3)")


## scripts/train_xgboost.py

In [None]:
"""Train an XGBoost model on pose feature data (CLI)."""

from __future__ import annotations

import argparse
from pathlib import Path

from pose_ai.ml.xgb_trainer import TrainParams, params_from_dict, train_from_file


def _parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(description="Train XGBoost on pose feature rows.")
    parser.add_argument("features", type=Path, help="Path to pose_features.json")

    # Data/label
    parser.add_argument("--label-column", default="detection_score")
    parser.add_argument("--label-threshold", type=float, default=None)
    parser.add_argument("--drop-columns", nargs="*", default=["image_path"])
    parser.add_argument("--task", choices=["classification", "regression"], default="classification")
    parser.add_argument("--test-size", type=float, default=0.2)
    parser.add_argument("--random-state", type=int, default=42)

    # XGBoost hyperparameters
    parser.add_argument("--n-estimators", type=int, default=300)
    parser.add_argument("--learning-rate", type=float, default=0.05)
    parser.add_argument("--max-depth", type=int, default=4)
    parser.add_argument("--subsample", type=float, default=0.8)
    parser.add_argument("--colsample-bytree", type=float, default=0.8)
    parser.add_argument("--scale-pos-weight", type=float, default=None)
    parser.add_argument("--n-jobs", type=int, default=0)
    parser.add_argument("--tree-method", default=None, help="e.g., hist or gpu_hist")

    # Training behaviour
    parser.add_argument("--early-stopping-rounds", type=int, default=30)
    parser.add_argument("--eval-metric-cls", default="logloss")
    parser.add_argument("--eval-metric-reg", default="rmse")

    # Outputs
    parser.add_argument("--model-out", type=Path, default=Path("models/xgb_pose.json"))
    parser.add_argument("--metrics-out", type=Path, default=None)
    parser.add_argument("--feature-out", type=Path, default=None)
    parser.add_argument("--importance-out", type=Path, default=None)
    return parser.parse_args()


def main() -> None:
    args = _parse_args()
    params = params_from_dict(
        {
            "task": args.task,
            "label_column": args.label_column,
            "label_threshold": args.label_threshold,
            "drop_columns": args.drop_columns,
            "test_size": args.test_size,
            "random_state": args.random_state,
            "n_estimators": args.n_estimators,
            "learning_rate": args.learning_rate,
            "max_depth": args.max_depth,
            "subsample": args.subsample,
            "colsample_bytree": args.colsample_bytree,
            "scale_pos_weight": args.scale_pos_weight,
            "n_jobs": args.n_jobs,
            "tree_method": args.tree_method,
            "early_stopping_rounds": args.early_stopping_rounds,
            "eval_metric_cls": args.eval_metric_cls,
            "eval_metric_reg": args.eval_metric_reg,
            "model_out": args.model_out,
            "metrics_out": args.metrics_out,
            "feature_out": args.feature_out,
            "importance_out": args.importance_out,
        }
    )

    metrics = train_from_file(args.features, params)
    print(f"Model saved to {params.model_out}")
    print("Metrics:", metrics)


if __name__ == "__main__":
    main()


In [None]:
# ============================================================================
# IMU-BASED WALL ANGLE ESTIMATION (Additional Functions)
# ============================================================================
# These functions extend the wall angle estimation module with IMU sensor support

def quaternion_to_euler(w: float, x: float, y: float, z: float) -> tuple[float, float, float]:
    """Convert quaternion to Euler angles (pitch, roll, yaw) in degrees."""
    sinr_cosp = 2.0 * (w * x + y * z)
    cosr_cosp = 1.0 - 2.0 * (x * x + y * y)
    roll = np.arctan2(sinr_cosp, cosr_cosp)
    
    sinp = 2.0 * (w * y - z * x)
    if abs(sinp) >= 1:
        pitch = np.copysign(np.pi / 2, sinp)
    else:
        pitch = np.arcsin(sinp)
    
    siny_cosp = 2.0 * (w * z + x * y)
    cosy_cosp = 1.0 - 2.0 * (y * y + z * z)
    yaw = np.arctan2(siny_cosp, cosy_cosp)
    
    return np.degrees(pitch), np.degrees(roll), np.degrees(yaw)


def compute_wall_angle_from_imu(
    quaternion: list[float] | None = None,
    euler_angles: list[float] | None = None,
) -> WallAngleResult:
    """Compute wall angle from IMU sensor data.
    
    Args:
        quaternion: Device orientation as [w, x, y, z]
        euler_angles: Device orientation as [pitch, roll, yaw] in degrees
    
    Returns:
        WallAngleResult with angle in degrees (0=horizontal, 90=vertical)
    """
    if quaternion is not None:
        if len(quaternion) != 4:
            return WallAngleResult(angle_degrees=None, confidence=0.0, method="imu_error", hough_lines=[])
        w, x, y, z = quaternion
        pitch, roll, yaw = quaternion_to_euler(w, x, y, z)
    elif euler_angles is not None:
        if len(euler_angles) != 3:
            return WallAngleResult(angle_degrees=None, confidence=0.0, method="imu_error", hough_lines=[])
        pitch, roll, yaw = euler_angles
    else:
        return WallAngleResult(angle_degrees=None, confidence=0.0, method="imu_missing", hough_lines=[])
    
    wall_angle = abs(pitch)
    wall_angle = max(0.0, min(180.0, wall_angle))
    confidence = 1.0
    if roll > 45 or roll < -45:
        confidence = 0.7
    
    return WallAngleResult(
        angle_degrees=wall_angle,
        confidence=confidence,
        method="imu_sensor",
        hough_lines=[],
        pca_angle=None,
    )


print("IMU wall angle functions loaded")
print("Usage: compute_wall_angle_from_imu(quaternion=[w,x,y,z] or euler_angles=[pitch,roll,yaw])")


## scripts/visualize_pose.py

In [None]:
"""Generate pose visualization overlays from pose_results.json."""

from __future__ import annotations

import argparse
import json
from pathlib import Path

import cv2

try:  # optional dependency for connection definitions
    from mediapipe.python.solutions.pose import PoseLandmark, POSE_CONNECTIONS
except ModuleNotFoundError:  # fallback if mediapipe not installed
    PoseLandmark = None
    POSE_CONNECTIONS = []


DEFAULT_CONNECTIONS = [
    ("left_shoulder", "right_shoulder"),
    ("left_shoulder", "left_elbow"),
    ("left_elbow", "left_wrist"),
    ("right_shoulder", "right_elbow"),
    ("right_elbow", "right_wrist"),
    ("left_hip", "right_hip"),
    ("left_shoulder", "left_hip"),
    ("right_shoulder", "right_hip"),
    ("left_hip", "left_knee"),
    ("left_knee", "left_ankle"),
    ("right_hip", "right_knee"),
    ("right_knee", "right_ankle"),
]


def build_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(description="Visualize pose landmarks on extracted frames.")
    parser.add_argument("pose_results", type=Path, help="Path to pose_results.json")
    parser.add_argument(
        "--output", type=Path, default=None,
        help="Directory to write visualized images (defaults to frame directory).",
    )
    parser.add_argument(
        "--include-missing", action="store_true",
        help="Include frames even when detection score/visibility is low.",
    )
    parser.add_argument(
        "--min-score",
        type=float,
        default=0.3,
        help="Minimum frame detection score required for visualization (default: 0.3).",
    )
    parser.add_argument(
        "--min-visibility",
        type=float,
        default=0.2,
        help="Minimum landmark visibility to draw a point/connection (default: 0.2).",
    )
    return parser


def _get_connections():
    if PoseLandmark is None or not POSE_CONNECTIONS:
        return DEFAULT_CONNECTIONS
    connections = []
    for a_idx, b_idx in POSE_CONNECTIONS:
        connections.append((PoseLandmark(a_idx).name.lower(), PoseLandmark(b_idx).name.lower()))
    return connections


def visualize_pose_results(
    pose_results_path: Path,
    output_dir: Path | None = None,
    *,
    include_missing: bool = False,
    min_score: float = 0.3,
    min_visibility: float = 0.2,
) -> int:
    payload = json.loads(pose_results_path.read_text(encoding="utf-8"))
    frames = payload.get("frames", [])
    count = 0
    connections = _get_connections()

    for frame in frames:
        image_path = Path(frame["image_path"])
        image = cv2.imread(str(image_path))
        if image is None:
            continue
        height, width = image.shape[:2]
        landmarks = frame.get("landmarks", [])
        detection_score = float(frame.get("detection_score", 0.0))
        if not include_missing and detection_score < min_score:
            continue
        if not landmarks and not include_missing:
            continue

        points = {}
        for landmark in landmarks:
            x = int(landmark["x"] * width)
            y = int(landmark["y"] * height)
            if landmark.get("visibility", 1.0) < min_visibility:
                continue
            points[landmark["name"]] = (x, y)
            cv2.circle(image, (x, y), 4, (0, 255, 0), -1)

        for start, end in connections:
            if start in points and end in points:
                cv2.line(image, points[start], points[end], (255, 0, 0), 2)

        target_dir = output_dir or image_path.parent / "visualized"
        target_dir.mkdir(parents=True, exist_ok=True)
        out_path = target_dir / f"{image_path.stem}_viz{image_path.suffix}"
        cv2.imwrite(str(out_path), image)
        count += 1
    return count


def main() -> None:
    parser = build_parser()
    args = parser.parse_args()
    processed = visualize_pose_results(
        args.pose_results,
        args.output,
        include_missing=args.include_missing,
        min_score=args.min_score,
        min_visibility=args.min_visibility,
    )
    print(f"Saved {processed} annotated frames")


if __name__ == "__main__":
    main()


## Wall Angle Estimation Module

Automatic wall angle estimation using Hough line detection and PCA

In [None]:
# ============================================================================
# WALL ANGLE ESTIMATION
# ============================================================================
# Purpose: Automatically estimate climbing wall angle from video frames
# Method: Combines Hough line detection with optional PCA on hold positions
# Output: Angle in degrees (0 = horizontal, 90 = vertical wall)
# ============================================================================

from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional, Sequence, Tuple
import cv2
import numpy as np


# Data structure for wall angle estimation results
@dataclass(slots=True)
class WallAngleResult:
    angle_degrees: float | None
    confidence: float
    method: str
    hough_lines: List[Tuple[Tuple[int, int], Tuple[int, int]]]
    pca_angle: float | None = None

    def as_dict(self) -> dict[str, object]:
        return {
            "angle_degrees": self.angle_degrees,
            "confidence": self.confidence,
            "method": self.method,
            "pca_angle": self.pca_angle,
            "hough_line_count": len(self.hough_lines),
        }


def estimate_wall_angle(
    image_path: Path | str,
    *,
    hold_centers: Optional[list] = None,
    canny_threshold1: int = 50,
    canny_threshold2: int = 150,
    hough_threshold: int = 120,
) -> WallAngleResult:
    """
    Estimate wall angle from a single frame image.
    
    Algorithm:
    1. Convert image to grayscale
    2. Apply Canny edge detection
    3. Use Hough line transform to find dominant lines
    4. Calculate mean angle from detected lines
    5. Optionally refine with PCA on hold center coordinates
    6. Blend estimates if both methods agree
    
    Parameters:
        image_path: Path to frame image
        hold_centers: Optional list of (x, y) hold coordinates for PCA refinement
        canny_threshold1: Lower threshold for Canny edge detection
        canny_threshold2: Upper threshold for Canny edge detection
        hough_threshold: Minimum votes for Hough line detection
        
    Returns:
        WallAngleResult with angle estimate and confidence score
    """
    path = Path(image_path)
    image = cv2.imread(str(path))
    if image is None:
        return WallAngleResult(
            angle_degrees=None, 
            confidence=0.0, 
            method="load-error", 
            hough_lines=[]
        )
    
    # Step 1: Prepare image for edge detection
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    edges = cv2.Canny(gray, canny_threshold1, canny_threshold2, L2gradient=True)
    
    # Step 2: Detect lines using probabilistic Hough transform
    lines_p = cv2.HoughLinesP(
        edges, 
        rho=1, 
        theta=np.pi/180.0, 
        threshold=hough_threshold,
        minLineLength=60, 
        maxLineGap=10
    )
    
    # Step 3: Extract line endpoints
    normalized_lines = []
    if lines_p is not None:
        for line in lines_p:
            x1, y1, x2, y2 = line[0]
            normalized_lines.append(((int(x1), int(y1)), (int(x2), int(y2))))
    
    # Step 4: Compute angle from Hough lines
    angles = []
    for (x1, y1), (x2, y2) in normalized_lines:
        dx, dy = x2 - x1, y2 - y1
        if abs(dx) < 1e-6 and abs(dy) < 1e-6:
            continue
        angle = float(np.degrees(np.arctan2(dy, dx)))
        if angle < 0:
            angle += 180.0
        angles.append(angle)
    
    hough_angle = None
    if angles:
        # Use circular mean for angle averaging
        radians = np.radians(angles)
        mean = float(np.degrees(np.arctan2(np.sum(np.sin(radians)), np.sum(np.cos(radians)))))
        if mean < 0:
            mean += 180.0
        hough_angle = mean
    
    # Step 5: Optional PCA refinement using hold positions
    pca_angle_value = None
    if hold_centers:
        pts = np.array(list(hold_centers), dtype=float)
        # Scale normalized coordinates to pixel space
        if pts.max() <= 1.2:
            h, w = gray.shape[:2]
            pts[:, 0] *= w
            pts[:, 1] *= h
        if pts.shape[0] >= 3:
            centered = pts - np.mean(pts, axis=0, keepdims=True)
            cov = np.cov(centered.T)
            eigvals, eigvecs = np.linalg.eig(cov)
            idx = int(np.argmax(eigvals))
            principal = eigvecs[:, idx]
            pca_angle_value = float(np.degrees(np.arctan2(principal[1], principal[0])))
            if pca_angle_value < 0:
                pca_angle_value += 180.0
    
    # Step 6: Combine estimates with confidence weighting
    if hough_angle is not None and pca_angle_value is not None:
        diff = abs(hough_angle - pca_angle_value)
        if diff < 15.0:  # Agreement threshold
            final_angle = (hough_angle + pca_angle_value) / 2.0
            method, confidence = "hough+pca", 0.9
        else:  # Disagreement - prefer Hough but lower confidence
            final_angle, method, confidence = hough_angle, "hough", 0.6
    elif hough_angle is not None:
        final_angle = hough_angle
        confidence = 0.7 if len(normalized_lines) >= 5 else 0.5
        method = "hough"
    else:
        final_angle = pca_angle_value
        confidence = 0.4 if pca_angle_value is not None else 0.0
        method = "pca" if pca_angle_value is not None else "none"
    
    return WallAngleResult(
        angle_degrees=final_angle,
        confidence=confidence,
        method=method,
        hough_lines=normalized_lines,
        pca_angle=pca_angle_value,
    )


# Module loaded confirmation
print("Wall angle estimation module loaded")
print("Usage: estimate_wall_angle(image_path, hold_centers=[(x1,y1), ...])")


## Hold Detection and Clustering Module

YOLO-based hold detection with DBSCAN clustering

In [None]:
# ============================================================================
# HOLD DETECTION AND CLUSTERING
# ============================================================================
# Purpose: Detect climbing holds from frames and cluster into stable positions
# Method: YOLO object detection + DBSCAN spatial clustering
# Output: List of unique holds with normalized coordinates
# ============================================================================

from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import List, Sequence
import json
import numpy as np

try:
    from ultralytics import YOLO as UltralyticsYOLO
    YOLO_AVAILABLE = True
except ModuleNotFoundError:
    UltralyticsYOLO = None
    YOLO_AVAILABLE = False
    print("Warning: ultralytics not installed")
    print("Install with: pip install ultralytics")


# Data structures for hold detection
@dataclass(slots=True)
class HoldDetection:
    """Single hold detection from one frame"""
    frame_index: int
    label: str
    confidence: float
    x_center: float  # normalized 0-1
    y_center: float
    width: float
    height: float


@dataclass(slots=True)
class ClusteredHold:
    """Aggregated hold position from multiple detections"""
    hold_id: str
    label: str
    x: float
    y: float
    radius: float
    detections: int
    avg_confidence: float

    def as_dict(self) -> dict[str, object]:
        return {
            "hold_id": self.hold_id,
            "label": self.label,
            "coords": [self.x, self.y],
            "radius": self.radius,
            "detections": self.detections,
            "avg_confidence": self.avg_confidence,
            "normalized": True,
        }


def detect_holds(
    image_paths: Sequence[Path],
    *,
    model_name: str = "yolov8n.pt",
    device: str | None = None,
    hold_labels: tuple = ("hold", "foot_hold", "volume", "jug", "crimp", "sloper", "pinch"),
) -> List[HoldDetection]:
    """
    Run YOLO object detection to find holds in frame images.
    
    Process:
    1. Load YOLO model (pre-trained or fine-tuned)
    2. Run inference on all frames in batch
    3. Filter detections to hold-related classes only
    4. Normalize bounding box coordinates to [0,1] range
    
    Parameters:
        image_paths: List of frame image paths
        model_name: YOLO model weights file (e.g., 'yolov8n.pt' or custom)
        device: Torch device ('cpu', 'cuda:0', etc.)
        hold_labels: Tuple of class labels to keep
        
    Returns:
        List of HoldDetection objects with normalized coordinates
    """
    if not YOLO_AVAILABLE:
        print("YOLO not available - skipping hold detection")
        return []
    
    model = UltralyticsYOLO(model_name)
    if not image_paths:
        return []
    
    # Run batch inference
    results = model.predict(
        source=[str(p) for p in image_paths], 
        device=device, 
        imgsz=640, 
        stream=False, 
        verbose=False
    )
    
    detections = []
    
    # Process each frame result
    for frame_idx, result in enumerate(results):
        boxes = getattr(result, "boxes", None)
        if boxes is None:
            continue
        
        # Extract detection data (move from GPU if needed)
        xyxy = boxes.xyxy.cpu().tolist() if hasattr(boxes.xyxy, "cpu") else boxes.xyxy.tolist()
        cls = boxes.cls.cpu().tolist() if hasattr(boxes.cls, "cpu") else boxes.cls.tolist()
        conf = boxes.conf.cpu().tolist() if hasattr(boxes.conf, "cpu") else boxes.conf.tolist()
        names = result.names or {}
        
        # Filter and normalize hold detections
        for box_idx, bbox in enumerate(xyxy):
            class_idx = int(cls[box_idx]) if box_idx < len(cls) else -1
            label = str(names.get(class_idx, class_idx)).lower()
            confidence = float(conf[box_idx]) if box_idx < len(conf) else 0.0
            
            # Keep only hold-related classes
            if label not in hold_labels:
                continue
            
            # Convert bbox to normalized center + size
            x1, y1, x2, y2 = (float(v) for v in bbox[:4])
            width = max(1e-6, x2 - x1)
            height = max(1e-6, y2 - y1)
            h, w = result.orig_shape
            
            detections.append(HoldDetection(
                frame_index=frame_idx,
                label=label,
                confidence=confidence,
                x_center=(x1 + width/2.0) / w,
                y_center=(y1 + height/2.0) / h,
                width=width / w,
                height=height / h,
            ))
    
    return detections


def cluster_holds(
    detections: Sequence[HoldDetection], 
    *, 
    eps: float = 0.03, 
    min_samples: int = 3
) -> List[ClusteredHold]:
    """
    Cluster hold detections across frames to find stable hold positions.
    
    Algorithm:
    1. Extract 2D coordinates from all detections
    2. Apply DBSCAN density-based clustering
    3. For each cluster, compute centroid and statistics
    4. Assign unique hold IDs to each cluster
    
    Parameters:
        detections: List of hold detections from all frames
        eps: DBSCAN epsilon (max distance between points in cluster)
        min_samples: Minimum detections required to form cluster
        
    Returns:
        List of ClusteredHold objects representing unique holds
    """
    if not detections:
        return []
    
    points = np.array([[d.x_center, d.y_center] for d in detections], dtype=float)
    labels_arr = np.array([d.label for d in detections], dtype=object)
    confidences = np.array([d.confidence for d in detections], dtype=float)
    
    # Apply DBSCAN clustering
    try:
        from sklearn.cluster import DBSCAN
        clustering = DBSCAN(eps=eps, min_samples=min_samples).fit(points)
        cluster_ids = clustering.labels_
    except Exception:
        # Fallback: no clustering
        cluster_ids = np.full(points.shape[0], -1, dtype=int)
    
    unique_ids = [cid for cid in sorted(set(int(c) for c in cluster_ids)) if cid >= 0]
    clustered = []
    
    if not unique_ids:
        # Fallback: treat each detection as unique hold
        for idx, d in enumerate(detections):
            clustered.append(ClusteredHold(
                hold_id=f"hold_{idx}",
                label=d.label,
                x=d.x_center,
                y=d.y_center,
                radius=max(d.width, d.height) / 2.0,
                detections=1,
                avg_confidence=d.confidence,
            ))
        return clustered
    
    # Aggregate clusters
    for cid in unique_ids:
        mask = cluster_ids == cid
        cluster_pts = points[mask]
        cluster_labels = labels_arr[mask]
        cluster_conf = confidences[mask]
        
        # Compute cluster centroid
        x_mean = float(cluster_pts[:, 0].mean())
        y_mean = float(cluster_pts[:, 1].mean())
        
        # Compute cluster radius (mean distance to centroid)
        dists = np.linalg.norm(cluster_pts - np.array([[x_mean, y_mean]]), axis=1)
        radius = float(dists.mean() + 0.01)
        
        # Find dominant label
        lbl_values, counts = np.unique(cluster_labels, return_counts=True)
        dominant_label = str(lbl_values[int(np.argmax(counts))])
        
        clustered.append(ClusteredHold(
            hold_id=f"hold_{cid}",
            label=dominant_label,
            x=x_mean,
            y=y_mean,
            radius=radius,
            detections=int(mask.sum()),
            avg_confidence=float(cluster_conf.mean()),
        ))
    
    return clustered


def export_holds_json(
    clustered: Sequence[ClusteredHold], 
    *, 
    output_path: Path | str
) -> Path:
    """
    Export clustered holds to JSON file.
    
    Output format:
    {
        "hold_0": {"coords": [x, y], "label": "jug", ...},
        "hold_1": {"coords": [x, y], "label": "crimp", ...},
        ...
    }
    """
    path = Path(output_path)
    path.parent.mkdir(parents=True, exist_ok=True)
    payload = {hold.hold_id: hold.as_dict() for hold in clustered}
    path.write_text(json.dumps(payload, indent=2), encoding="utf-8")
    return path


# Module loaded confirmation
print("Hold detection and clustering module loaded")
print(f"YOLO available: {YOLO_AVAILABLE}")


## Efficiency Scoring and Recommendation Module

Heuristic-based efficiency scoring and next hold recommendation

In [None]:
# ============================================================================
# EFFICIENCY SCORING AND RECOMMENDATION
# ============================================================================
# Purpose: Calculate movement efficiency and suggest next holds
# Method: Multi-component heuristic scoring + proximity-based ranking
# Output: Efficiency score (0-1) and ranked hold suggestions
# ============================================================================

from __future__ import annotations
from dataclasses import dataclass
from typing import List, Sequence
import math

# Weight configuration for efficiency components
WEIGHTS = {
    "detection": 0.30,  # Pose detection quality
    "joint": 0.20,      # Joint angle efficiency (less extreme = better)
    "com": 0.20,        # Center of mass stability
    "contact": 0.20,    # Contact point stability
    "hip": 0.10,        # Hip alignment with wall
}


@dataclass(slots=True)
class EfficiencyResult:
    """Efficiency score with component breakdown"""
    score: float
    components: dict[str, float]

    def as_dict(self) -> dict[str, float]:
        payload = {"score": self.score}
        payload.update(self.components)
        return payload


def _safe_float(value) -> float:
    """Convert value to float, return NaN on failure"""
    try:
        return float(value)
    except Exception:
        return float("nan")


def efficiency_from_frames(
    frame_rows: Sequence[dict[str, object]], 
    window: int = 5
) -> EfficiencyResult:
    """
    Calculate efficiency score from pose feature frames.
    
    Algorithm:
    1. Take last N frames (window) for recent movement context
    2. Compute 5 components:
       - detection: Average pose detection quality
       - joint: Joint angle efficiency (normalized by 180 degrees)
       - com: Center of mass stability (inverse variance)
       - contact: Limb contact stability (hold consistency)
       - hip: Hip alignment with wall angle
    3. Weighted average of components
    
    Parameters:
        frame_rows: List of feature dictionaries (from pose_features.json)
        window: Number of recent frames to analyze
        
    Returns:
        EfficiencyResult with overall score and component breakdown
    """
    if not frame_rows:
        return EfficiencyResult(score=float("nan"), components={})
    
    # Use recent frames for temporal context
    recent = list(frame_rows[-window:]) if len(frame_rows) >= window else list(frame_rows)
    
    # Component 1: Detection quality
    detection_scores = [
        _safe_float(row.get("detection_score")) 
        for row in recent 
        if row.get("detection_score") is not None
    ]
    detection_component = (
        float(sum(detection_scores) / len(detection_scores)) 
        if detection_scores else 0.0
    )
    
    # Component 2: Joint efficiency (lower angles = more efficient)
    joint_keys = [k for k in recent[-1].keys() if k.endswith("_angle")]
    joint_values = []
    for row in recent:
        for key in joint_keys:
            v = row.get(key)
            if isinstance(v, (int, float)):
                joint_values.append(abs(float(v)))
    
    joint_component = (
        1.0 - (sum(joint_values) / (len(joint_values) * 180.0)) 
        if joint_values else 0.5
    )
    
    # Component 3: COM stability (perpendicular to wall)
    com_perp_vals = [
        _safe_float(row.get("com_perp_wall")) 
        for row in recent 
        if row.get("com_perp_wall") is not None
    ]
    
    if len(com_perp_vals) >= 2:
        mean_val = sum(com_perp_vals) / len(com_perp_vals)
        variance = sum((v - mean_val) ** 2 for v in com_perp_vals) / len(com_perp_vals)
        max_var = max(variance, 1e-6)
        com_component = 1.0 / (1.0 + variance / max_var)
    else:
        com_component = 0.5
    
    # Component 4: Contact stability (limbs staying on same holds)
    contact_keys = [k for k in recent[-1].keys() if k.endswith("_target")]
    stable_counts = 0
    total_contacts = 0
    
    if contact_keys and len(recent) >= 2:
        prev_row = recent[0]
        for row in recent[1:]:
            for key in contact_keys:
                if key in row and key in prev_row:
                    total_contacts += 1
                    if row.get(key) == prev_row.get(key):
                        stable_counts += 1
            prev_row = row
    
    contact_component = (stable_counts / total_contacts) if total_contacts else 0.5
    
    # Component 5: Hip alignment with wall
    hip_alignment = _safe_float(recent[-1].get("hip_alignment_error"))
    hip_component = (
        1.0 - (hip_alignment / 90.0) 
        if hip_alignment == hip_alignment  # Check for NaN
        else 0.5
    )
    
    # Weighted combination
    score = (
        WEIGHTS["detection"] * detection_component
        + WEIGHTS["joint"] * joint_component
        + WEIGHTS["com"] * com_component
        + WEIGHTS["contact"] * contact_component
        + WEIGHTS["hip"] * hip_component
    )
    
    return EfficiencyResult(
        score=score,
        components={
            "detection_component": detection_component,
            "joint_component": joint_component,
            "com_component": com_component,
            "contact_component": contact_component,
            "hip_component": hip_component,
        },
    )


def suggest_next_holds(
    current_row: dict[str, object],
    all_holds: Sequence[dict[str, object]],
    *,
    top_k: int = 3,
) -> List[dict[str, object]]:
    """
    Suggest next holds based on proximity and novelty.
    
    Ranking heuristic:
        score = 0.7 * (1 - distance) + 0.3 * novelty
        
    Where:
        - distance: Euclidean distance from COM to hold (normalized)
        - novelty: 1 if hold not currently contacted, 0 otherwise
    
    Parameters:
        current_row: Latest feature row (current climber state)
        all_holds: List of available holds (from holds.json)
        top_k: Number of holds to recommend
        
    Returns:
        List of top-k hold dictionaries sorted by score
    """
    com_x = _safe_float(current_row.get("com_x"))
    com_y = _safe_float(current_row.get("com_y"))
    
    # Require valid COM position
    if any(math.isnan(v) for v in (com_x, com_y)):
        return []
    
    # Track currently used holds
    used_targets = {
        str(current_row.get(k))
        for k in current_row.keys()
        if k.endswith("_target") and current_row.get(k)
    }
    
    # Rank all holds
    ranked = []
    for hold in all_holds:
        coords = hold.get("coords")
        if not isinstance(coords, (list, tuple)) or len(coords) != 2:
            continue
        
        hx, hy = float(coords[0]), float(coords[1])
        
        # Calculate distance component
        dist = math.sqrt((hx - com_x) ** 2 + (hy - com_y) ** 2)
        dist_norm = min(dist, 1.0)
        
        # Calculate novelty component
        novel = 0 if hold.get("hold_id") in used_targets else 1
        
        # Combined score
        score = 0.7 * (1.0 - dist_norm) + 0.3 * novel
        ranked.append((score, hold))
    
    # Sort and return top-k
    ranked.sort(key=lambda t: t[0], reverse=True)
    return [hold for _, hold in ranked[:top_k]]


# Module loaded confirmation
print("Efficiency and recommendation module loaded")
print("Component weights:", WEIGHTS)


## Complete Pipeline Demo

End-to-end demonstration of all new features

In [None]:
# ============================================================================
# COMPLETE PIPELINE DEMONSTRATION
# ============================================================================
# Purpose: Show integration of all new features in end-to-end workflow
# Steps:
#   1. Hold detection and clustering
#   2. Wall angle estimation
#   3. Load pose features
#   4. Calculate efficiency score
#   5. Generate hold recommendations
# ============================================================================

from pathlib import Path
import json

# Configuration: Set your frame directory path
FRAME_DIR = Path("data/frames/video01")  # Change to your actual path
manifest_path = FRAME_DIR / "manifest.json"
pose_results_path = FRAME_DIR / "pose_results.json"

print("=" * 60)
print("BetaMove Enhanced Pipeline Demo")
print("=" * 60)

# ----------------------------------------------------------------------------
# STEP 1: Hold Detection and Clustering
# ----------------------------------------------------------------------------
print("\n[Step 1/5] Hold Detection and Clustering")
print("-" * 60)

if YOLO_AVAILABLE:
    # Get sample frames (limit to 20 for demo speed)
    image_paths = sorted(FRAME_DIR.glob("*.jpg"))[:20]
    
    if image_paths:
        # Run YOLO detection
        holds_detections = detect_holds(image_paths, model_name="yolov8n.pt")
        print(f"Detected {len(holds_detections)} hold instances across frames")
        
        # Cluster detections into unique holds
        clustered_holds = cluster_holds(holds_detections, eps=0.03, min_samples=2)
        print(f"Clustered into {len(clustered_holds)} unique holds")
        
        if clustered_holds:
            # Save to JSON
            holds_json_path = export_holds_json(
                clustered_holds, 
                output_path=FRAME_DIR / "holds.json"
            )
            print(f"Exported to: {holds_json_path}")
            
            # Display sample holds
            print("\nSample holds:")
            for h in clustered_holds[:3]:
                print(f"  {h.hold_id}: {h.label} at ({h.x:.3f}, {h.y:.3f})")
                print(f"    confidence={h.avg_confidence:.2f}, detections={h.detections}")
    else:
        print("Warning: No frame images found")
        clustered_holds = []
else:
    print("Warning: YOLO not available - skipping hold detection")
    clustered_holds = []

# ----------------------------------------------------------------------------
# STEP 2: Wall Angle Estimation
# ----------------------------------------------------------------------------
print("\n[Step 2/5] Wall Angle Estimation")
print("-" * 60)

first_image = next(FRAME_DIR.glob("*.jpg"), None)
if first_image:
    # Use hold centers for PCA refinement if available
    hold_centers = [(h.x, h.y) for h in clustered_holds] if clustered_holds else None
    
    # Estimate wall angle
    wall_result = estimate_wall_angle(first_image, hold_centers=hold_centers)
    
    if wall_result.angle_degrees is not None:
        print(f"Estimated angle: {wall_result.angle_degrees:.1f} degrees")
        print(f"Confidence: {wall_result.confidence:.2f}")
        print(f"Method: {wall_result.method}")
        print(f"Hough lines detected: {len(wall_result.hough_lines)}")
        if wall_result.pca_angle:
            print(f"PCA angle: {wall_result.pca_angle:.1f} degrees")
    else:
        print("Failed to estimate wall angle")
else:
    print("Warning: No images found for wall angle estimation")
    wall_result = None

# ----------------------------------------------------------------------------
# STEP 3: Load Pose Features
# ----------------------------------------------------------------------------
print("\n[Step 3/5] Loading Pose Features")
print("-" * 60)

features_path = FRAME_DIR / "pose_features.json"
if features_path.exists():
    with open(features_path) as f:
        feature_rows = json.load(f)
    
    print(f"Loaded {len(feature_rows)} feature rows")
    
    # Check for new features
    if feature_rows:
        sample = feature_rows[0]
        print(f"Total features per row: {len(sample)}")
        
        new_features = [
            k for k in sample.keys() 
            if k in ['wall_angle', 'hip_alignment_error', 'com_along_wall', 'com_perp_wall']
        ]
        if new_features:
            print(f"New wall features present: {', '.join(new_features)}")
else:
    print("Warning: pose_features.json not found")
    print("Tip: Run feature export with auto_wall_angle=True")
    feature_rows = []

# ----------------------------------------------------------------------------
# STEP 4: Calculate Efficiency Score
# ----------------------------------------------------------------------------
print("\n[Step 4/5] Efficiency Score Calculation")
print("-" * 60)

if feature_rows:
    # Calculate efficiency using recent frames
    eff_result = efficiency_from_frames(feature_rows, window=5)
    
    print(f"Overall Efficiency Score: {eff_result.score:.3f}")
    print("\nComponent Breakdown:")
    for comp, val in eff_result.components.items():
        print(f"  {comp:20s}: {val:.3f}")
else:
    print("Warning: No features available for efficiency calculation")
    eff_result = None

# ----------------------------------------------------------------------------
# STEP 5: Next Hold Recommendations
# ----------------------------------------------------------------------------
print("\n[Step 5/5] Next Hold Recommendations")
print("-" * 60)

if feature_rows and clustered_holds:
    # Convert holds to dictionary format
    holds_dicts = [h.as_dict() for h in clustered_holds]
    
    # Get recommendations based on current state
    next_holds = suggest_next_holds(feature_rows[-1], holds_dicts, top_k=3)
    
    print(f"Top {len(next_holds)} recommended holds:")
    for i, hold in enumerate(next_holds, 1):
        coords = hold.get('coords', [])
        print(f"  {i}. {hold.get('hold_id')} ({hold.get('label')})")
        print(f"     Position: ({coords[0]:.3f}, {coords[1]:.3f})")
        print(f"     Confidence: {hold.get('avg_confidence', 0):.2f}")
else:
    print("Warning: Missing features or holds for recommendations")

# ----------------------------------------------------------------------------
# Summary
# ----------------------------------------------------------------------------
print("\n" + "=" * 60)
print("Demo Complete")
print("=" * 60)

print("\nSummary:")
print(f"  Holds detected: {len(clustered_holds)}")
if wall_result and wall_result.angle_degrees:
    print(f"  Wall angle: {wall_result.angle_degrees:.1f} degrees ({wall_result.method})")
else:
    print("  Wall angle: Not available")
if eff_result:
    print(f"  Efficiency score: {eff_result.score:.3f}")
else:
    print("  Efficiency score: Not available")

print("\nTo run full pipeline with new features:")
print("  Option 1: Use web UI - upload video (auto-runs all features)")
print("  Option 2: CLI - python scripts/generate_holds_and_features.py <manifest_path>")
