In [29]:
import json
import numpy as np
import pandas as pd
from pathlib import Path
from pose_format import Pose
from pose_format.numpy import NumPyPoseBody
from pose_format.pose_header import PoseHeader, PoseHeaderComponent, PoseHeaderDimensions

## üì• Read the .pose file first

In [2]:
def load_pose_file(pose_path: str) -> Pose:
    """
    Reads a .pose file and returns a Pose object.
    """
    with open(pose_path, 'rb') as f:
        pose = Pose.read(f.read())
    return pose


In [3]:
path_pose = '../data/pose_files/example.pose'
pose = load_pose_file(path_pose)

## 1Ô∏è‚É£ Save as .pose (original format)

In [30]:
save_path = "../output/convert_pose_formats"

In [5]:
def save_as_pose(pose: Pose, output_path: str):
    """
    Saves a Pose object to a .pose file.
    """
    with open(output_path, 'wb') as f:
        pose.write(f)
    print(f"‚úÖ Saved to: {output_path}")


save_as_pose(pose, f"{save_path}/output.pose")

‚úÖ Saved to: ../output/convert_pose_formats/output.pose


## 2Ô∏è‚É£ Convert to JSON

In [6]:
def pose_to_json(pose: Pose, output_path: str = None, include_header: bool = True):
    """
    Convert Pose to JSON.

    Parameters:
    -----------
    pose : Pose
        Pose object.
    output_path : str, optional
        Output file path. If not provided, returns the dict only.
    include_header : bool
        Include header information.
    """
    # Convert data to Python lists
    data = pose.body.data.filled(0).tolist()  # replace missing values with 0
    confidence = pose.body.confidence.tolist()
    
    result = {
        "data": data,
        "confidence": confidence,
        "fps": pose.body.fps,
        "shape": {
            "frames": pose.body.data.shape[0],
            "people": pose.body.data.shape[1],
            "points": pose.body.data.shape[2],
            "dimensions": pose.body.data.shape[3]
        }
    }
    
    if include_header:
        result["header"] = {
            "version": pose.header.version,
            "dimensions": {
                "width": pose.header.dimensions.width,
                "height": pose.header.dimensions.height,
                "depth": pose.header.dimensions.depth
            },
            "components": [
                {
                    "name": comp.name,
                    "points": comp.points,
                    "format": comp.format,
                    "limbs": comp.limbs,
                    "num_points": len(comp.points)
                }
                for comp in pose.header.components
            ]
        }
    
    if output_path:
        with open(output_path, 'w', encoding='utf-8') as f:
            json.dump(result, f, indent=2)
        print(f"‚úÖ Saved to: {output_path}")
    
    return result



In [7]:
json_data = pose_to_json(pose, f"{save_path}/output.json")

‚úÖ Saved to: ../output/convert_pose_formats/output.json


In [31]:
def build_header_from_dict(header_dict: dict) -> PoseHeader:
    """
    Build a PoseHeader from a JSON header dictionary.
    """
    dimensions = PoseHeaderDimensions(
        width=header_dict["dimensions"]["width"],
        height=header_dict["dimensions"]["height"],
        depth=header_dict["dimensions"].get("depth", 0)
    )
    components = []
    for comp in header_dict.get("components", []):
        limbs = [tuple(limb) for limb in comp.get("limbs", [])]
        colors = comp.get("colors")
        if not colors:
            colors = [(255, 255, 255)] * len(limbs)
        components.append(
            PoseHeaderComponent(
                name=comp["name"],
                points=comp["points"],
                limbs=limbs,
                colors=colors,
                point_format=comp.get("format", "XYZC")
            )
        )
    return PoseHeader(
        version=header_dict.get("version", 0.1),
        dimensions=dimensions,
        components=components
    )

def clone_header(
    reference_header: PoseHeader,
    width: int = None,
    height: int = None,
    depth: int = None,
    version: float = None
 ) -> PoseHeader:
    """
    Create a new header based on a reference header, with optional overrides.
    """
    dimensions = PoseHeaderDimensions(
        width if width is not None else reference_header.dimensions.width,
        height if height is not None else reference_header.dimensions.height,
        depth if depth is not None else reference_header.dimensions.depth
    )
    return PoseHeader(
        version=version if version is not None else reference_header.version,
        dimensions=dimensions,
        components=reference_header.components,
        is_bbox=getattr(reference_header, "is_bbox", False)
    )

def build_pose_from_arrays(
    data: np.ndarray,
    confidence: np.ndarray,
    fps: float,
    header: PoseHeader,
    mask: np.ndarray = None
 ) -> Pose:
    """
    Build a Pose object from raw arrays.
    IMPORTANT: Uses NumPyPoseBody to ensure proper write() functionality.
    """
    if mask is not None:
        data = np.ma.MaskedArray(data, mask=mask)
    # Use NumPyPoseBody instead of generic PoseBody for write support
    body = NumPyPoseBody(fps, data, confidence)
    return Pose(header, body)

def json_to_pose(
    json_path: str,
    output_path: str,
    reference_pose: Pose = None
 ) -> Pose:
    """
    Convert JSON back to a .pose file.
    If the JSON does not include a header, a reference pose is required.
    """
    with open(json_path, "r", encoding="utf-8") as f:
        json_data = json.load(f)

    header_dict = json_data.get("header")
    if header_dict:
        header = build_header_from_dict(header_dict)
    elif reference_pose is not None:
        header = reference_pose.header
    else:
        raise ValueError("Header is missing. Provide a reference_pose.")

    data = np.array(json_data["data"], dtype=np.float32)
    confidence = np.array(json_data["confidence"], dtype=np.float32)
    fps = float(json_data.get("fps", reference_pose.body.fps if reference_pose else 30))
    pose = build_pose_from_arrays(data, confidence, fps, header)
    save_as_pose(pose, output_path)
    return pose



In [33]:
# Example: JSON -> .pose
restored_pose_json = json_to_pose(f"{save_path}/output.json", f"{save_path}/restored_from_json.pose")

‚úÖ Saved to: ../output/convert_pose_formats/restored_from_json.pose


In [10]:
def pose_to_json_compact(pose: Pose, output_path: str):
    """
    Convert Pose to compact JSON (includes full header for standalone conversion).
    """
    data = pose.body.data.filled(0).tolist()
    confidence = pose.body.confidence.tolist()
    
    # Include full header for independent reconstruction
    result = {
        "d": data,
        "c": confidence,
        "f": float(pose.body.fps),
        "h": {
            "v": float(pose.header.version),
            "w": int(pose.header.dimensions.width),
            "h": int(pose.header.dimensions.height),
            "d": int(pose.header.dimensions.depth),
            "comps": [
                {
                    "n": comp.name,
                    "pts": comp.points,
                    "fmt": comp.format,
                    "limbs": [[int(x) for x in limb] if hasattr(limb, '__iter__') else int(limb) for limb in comp.limbs],
                    "colors": [[int(x) for x in c] if hasattr(c, '__iter__') else int(c) for c in getattr(comp, "colors", [])] if hasattr(comp, "colors") else None
                }
                for comp in pose.header.components
            ]
        }
    }
    
    with open(output_path, 'w', encoding='utf-8') as f:
        json.dump(result, f, separators=(',', ':'))
    
    print(f"‚úÖ Saved (compact) to: {output_path}")

In [34]:
def json_compact_to_pose(
    json_path: str,
    output_path: str,
    reference_pose: Pose = None
 ) -> Pose:
    """
    Convert compact JSON back to a .pose file (now standalone - no reference needed).
    """
    with open(json_path, "r", encoding="utf-8") as f:
        compact = json.load(f)

    data = np.array(compact["d"], dtype=np.float32)
    confidence = np.array(compact["c"], dtype=np.float32)
    fps = float(compact["f"])
    
    # Reconstruct header from embedded data
    header_data = compact.get("h")
    if header_data:
        # Build header from embedded info
        dimensions = PoseHeaderDimensions(
            width=header_data["w"],
            height=header_data["h"],
            depth=header_data.get("d", 0)
        )
        components = []
        for comp in header_data.get("comps", []):
            limbs = [tuple(limb) for limb in comp.get("limbs", [])]
            colors = comp.get("colors")
            if not colors:
                colors = [(255, 255, 255)] * len(limbs)
            components.append(
                PoseHeaderComponent(
                    name=comp["n"],
                    points=comp["pts"],
                    limbs=limbs,
                    colors=colors,
                    point_format=comp.get("fmt", "XYZC")
                )
            )
        header = PoseHeader(
            version=header_data.get("v", 0.1),
            dimensions=dimensions,
            components=components
        )
    elif reference_pose is not None:
        # Fallback to reference if no header
        header = clone_header(
            reference_header=reference_pose.header,
            width=int(compact.get("w", reference_pose.header.dimensions.width)),
            height=int(compact.get("h", reference_pose.header.dimensions.height))
        )
    else:
        raise ValueError("No header found in compact JSON and no reference_pose provided.")
    
    pose = build_pose_from_arrays(data, confidence, fps, header)
    
    # Save to file
    with open(output_path, 'wb') as f:
        pose.write(f)
    
    return pose


In [None]:
# Example: compact JSON -> .pose (now standalone!)
pose_to_json_compact(pose, f"{save_path}/output.compact.json")


‚úÖ Saved (compact) to: ../output/convert_pose_formats/output.compact.json


In [35]:
restored_pose_compact = json_compact_to_pose(f"{save_path}/output.compact.json", f"{save_path}/restored_from_compact.pose")

## 3Ô∏è‚É£ Convert to NumPy (.npz)

In [13]:
def pose_to_npz(pose: Pose, output_path: str, compressed: bool = True):
    """
    Convert Pose to NumPy (.npz) with full header information for standalone conversion.
    
    Parameters:
    -----------
    pose : Pose
        Pose object.
    output_path : str
        Output file path.
    compressed : bool
        Use compression (smaller size but slower read/write).
    """
    # Extract data
    data = np.array(pose.body.data.filled(0), dtype=np.float32)
    confidence = np.array(pose.body.confidence, dtype=np.float32)
    mask = np.array(pose.body.data.mask, dtype=bool)
    
    # Serialize components info as JSON string
    components_data = []
    for comp in pose.header.components:
        components_data.append({
            "name": comp.name,
            "points": comp.points,
            "format": comp.format,
            "limbs": [[int(x) for x in limb] if hasattr(limb, '__iter__') else int(limb) for limb in comp.limbs],
            "colors": [[int(x) for x in c] if hasattr(c, '__iter__') else int(c) for c in getattr(comp, "colors", [])] if hasattr(comp, "colors") else None
        })
    components_json = json.dumps(components_data)
    
    # Extra metadata
    metadata = {
        'fps': np.array([pose.body.fps]),
        'width': np.array([pose.header.dimensions.width]),
        'height': np.array([pose.header.dimensions.height]),
        'depth': np.array([pose.header.dimensions.depth]),
        'version': np.array([pose.header.version]),
        'components': np.array([components_json], dtype=object)
    }
    
    # Save
    save_func = np.savez_compressed if compressed else np.savez
    save_func(
        output_path,
        data=data,
        confidence=confidence,
        mask=mask,
        **metadata
    )
    
    print(f"‚úÖ Saved to: {output_path}")
    print(f"   üì¶ File size: {Path(output_path).stat().st_size / 1024:.2f} KB")



In [14]:
pose_to_npz(pose, f"{save_path}/output.npz", compressed=True)

‚úÖ Saved to: ../output/convert_pose_formats/output.npz
   üì¶ File size: 306.53 KB


In [36]:
def npz_to_pose(npz_path: str, output_path: str, reference_pose: Pose = None) -> Pose:
    """
    Convert NPZ back to a .pose file (now standalone - no reference needed).
    """
    loaded = np.load(npz_path, allow_pickle=True)
    
    data = loaded['data'].astype(np.float32)
    confidence = loaded['confidence'].astype(np.float32)
    mask = loaded['mask'] if 'mask' in loaded.files else None
    fps = float(loaded['fps'][0]) if 'fps' in loaded.files else 30.0
    
    # Try to load header from NPZ
    if 'components' in loaded.files:
        # Reconstruct header from saved components
        width = int(loaded['width'][0]) if 'width' in loaded.files else data.shape[2]
        height = int(loaded['height'][0]) if 'height' in loaded.files else data.shape[1]
        depth = int(loaded['depth'][0]) if 'depth' in loaded.files else 0
        version = float(loaded['version'][0]) if 'version' in loaded.files else 0.1
        
        dimensions = PoseHeaderDimensions(width, height, depth)
        
        # Deserialize components
        components_json = str(loaded['components'][0])
        components_data = json.loads(components_json)
        
        components = []
        for comp_data in components_data:
            limbs = [tuple(limb) for limb in comp_data.get("limbs", [])]
            colors = comp_data.get("colors")
            if not colors:
                colors = [(255, 255, 255)] * len(limbs)
            components.append(
                PoseHeaderComponent(
                    name=comp_data["name"],
                    points=comp_data["points"],
                    limbs=limbs,
                    colors=colors,
                    point_format=comp_data.get("format", "XYZC")
                )
            )
        
        header = PoseHeader(
            version=version,
            dimensions=dimensions,
            components=components
        )
    elif reference_pose is not None:
        # Fallback to reference
        width = int(loaded['width'][0]) if 'width' in loaded.files else reference_pose.header.dimensions.width
        height = int(loaded['height'][0]) if 'height' in loaded.files else reference_pose.header.dimensions.height
        depth = int(loaded['depth'][0]) if 'depth' in loaded.files else reference_pose.header.dimensions.depth
        version = float(loaded['version'][0]) if 'version' in loaded.files else reference_pose.header.version
        
        header = clone_header(
            reference_header=reference_pose.header,
            width=width,
            height=height,
            depth=depth,
            version=version
        )
    else:
        raise ValueError("No header found in NPZ and no reference_pose provided.")
    
    pose = build_pose_from_arrays(data, confidence, fps, header, mask=mask)
    
    # Save to file
    with open(output_path, 'wb') as f:
        pose.write(f)
    
    return pose



In [37]:
# Example: NPZ -> .pose (now standalone!)
restored_pose_npz = npz_to_pose(f"{save_path}/output.npz", f"{save_path}/restored_from_npz.pose")

## 4Ô∏è‚É£ Convert to Parquet

### ‚ö†Ô∏è Important Note
If you get `ArrowKeyError: A type extension with name pandas.period already defined`, restart the kernel and run all cells from the beginning. This is a known pyarrow/pandas compatibility issue.

In [17]:
def pose_to_parquet(pose: Pose, output_path: str, component_name: str = None):
    """
    Convert Pose to Parquet (good for large analytics).
    
    Schema:
    - frame_id: frame index
    - person_id: person index
    - point_name: point name
    - component: component name
    - x, y, z: coordinates
    - confidence: confidence score
    """
    rows = []
    
    data = pose.body.data
    confidence = pose.body.confidence
    
    frames, people, points, dims = data.shape
    
    # Build point list with names
    point_info = []
    for comp in pose.header.components:
        if component_name and comp.name != component_name:
            continue
        for point_name in comp.points:
            point_info.append((comp.name, point_name))
    
    print(f"üîÑ Converting... ({frames} frames)")
    
    for frame_idx in range(frames):
        for person_idx in range(people):
            for point_idx, (comp_name, point_name) in enumerate(point_info):
                conf = confidence[frame_idx, person_idx, point_idx]
                
                # Skip points with zero confidence
                if conf == 0:
                    continue
                
                coords = data[frame_idx, person_idx, point_idx]
                
                row = {
                    'frame_id': frame_idx,
                    'time_sec': frame_idx / pose.body.fps,
                    'person_id': person_idx,
                    'component': comp_name,
                    'point_name': point_name,
                    'point_index': point_idx,
                    'x': float(coords[0]),
                    'y': float(coords[1]),
                    'z': float(coords[2]) if dims > 2 else 0.0,
                    'confidence': float(conf)
                }
                rows.append(row)
    
    # Create DataFrame and save
    df = pd.DataFrame(rows)
    
    # Try different methods to avoid pyarrow conflicts
    try:
        df.to_parquet(output_path, index=False, compression='snappy', engine='pyarrow')
    except Exception as e:
        print(f"‚ö†Ô∏è PyArrow error (try restarting kernel): {str(e)[:100]}")
        print("üí° Trying alternative method...")
        try:
            # Try fastparquet as alternative
            df.to_parquet(output_path, index=False, compression='snappy', engine='fastparquet')
        except:
            # Final fallback: save without compression
            print("üí° Using fallback method (no compression)...")
            df.to_parquet(output_path, index=False, engine='pyarrow', compression=None)
    
    print(f"‚úÖ Saved to: {output_path}")
    print(f"   üìä Rows: {len(df):,}")
    print(f"   üì¶ File size: {Path(output_path).stat().st_size / 1024:.2f} KB")
    
    return df


In [18]:
df = pose_to_parquet(pose, f"{save_path}/output.parquet")

üîÑ Converting... (133 frames)
‚úÖ Saved to: ../output/convert_pose_formats/output.parquet
   üìä Rows: 26,705
   üì¶ File size: 653.23 KB


In [19]:
def pose_to_parquet_efficient(pose: Pose, output_path: str):
    """
    Convert Pose to Parquet efficiently (one row per frame) with full header metadata.
    
    This method stores each frame as a single row with points as arrays.
    Includes header as metadata for standalone reconstruction.
    """
    rows = []
    frames = pose.body.data.shape[0]
    
    for frame_idx in range(frames):
        row = {
            'frame_id': frame_idx,
            'time_sec': frame_idx / pose.body.fps,
            'data': pose.body.data[frame_idx].filled(0).flatten().tolist(),
            'confidence': pose.body.confidence[frame_idx].flatten().tolist()
        }
        rows.append(row)
    
    df = pd.DataFrame(rows)
    
    # Prepare header metadata
    components_data = []
    for comp in pose.header.components:
        components_data.append({
            "name": comp.name,
            "points": comp.points,
            "format": comp.format,
            "limbs": [[int(x) for x in limb] if hasattr(limb, '__iter__') else int(limb) for limb in comp.limbs],
            "colors": [[int(x) for x in c] if hasattr(c, '__iter__') else int(c) for c in getattr(comp, "colors", [])] if hasattr(comp, "colors") else None
        })
    
    # Save metadata in a separate JSON file instead of parquet metadata
    metadata_path = output_path.rsplit('.', 1)[0] + '_metadata.json'
    metadata = {
        'fps': float(pose.body.fps),
        'version': float(pose.header.version),
        'width': int(pose.header.dimensions.width),
        'height': int(pose.header.dimensions.height),
        'depth': int(pose.header.dimensions.depth),
        'components': components_data,
        'shape': [int(x) for x in pose.body.data.shape]
    }
    
    with open(metadata_path, 'w', encoding='utf-8') as f:
        json.dump(metadata, f, indent=2)
    
    # Save parquet without custom_metadata (compatibility issue)
    df.to_parquet(output_path, index=False)
    
    print(f"‚úÖ Saved (efficient) to: {output_path}")
    print(f"   üìã Metadata: {metadata_path}")
    print(f"   üìä Frames: {len(df)}")
    
    return df

In [38]:
def parquet_to_pose(parquet_path: str, output_path: str, reference_pose: Pose = None) -> Pose:
    """
    Convert long-format Parquet back to a .pose file (now standalone when metadata available).
    """
    df = pd.read_parquet(parquet_path)
    if df.empty:
        raise ValueError("Parquet file is empty.")

    frames = int(df["frame_id"].max()) + 1
    people = int(df["person_id"].max()) + 1
    dims = 3 if "z" in df.columns else 2
    
    # Try to get header from parquet metadata
    try:
        import pyarrow.parquet as pq
        parquet_file = pq.ParquetFile(parquet_path)
        metadata = parquet_file.schema_arrow.metadata
        
        if metadata and b'components' in metadata:
            # Reconstruct from metadata
            fps = float(metadata.get(b'fps', b'30').decode())
            version = float(metadata.get(b'version', b'0.1').decode())
            width = int(metadata.get(b'width', b'1920').decode())
            height = int(metadata.get(b'height', b'1080').decode())
            depth_val = int(metadata.get(b'depth', b'0').decode())
            shape = json.loads(metadata[b'shape'].decode())
            points = shape[2]
            
            dimensions = PoseHeaderDimensions(width, height, depth_val)
            
            components_data = json.loads(metadata[b'components'].decode())
            components = []
            for comp_data in components_data:
                limbs = [tuple(limb) for limb in comp_data.get("limbs", [])]
                colors = comp_data.get("colors")
                if not colors:
                    colors = [(255, 255, 255)] * len(limbs)
                components.append(
                    PoseHeaderComponent(
                        name=comp_data["name"],
                        points=comp_data["points"],
                        limbs=limbs,
                        colors=colors,
                        point_format=comp_data.get("format", "XYZC")
                    )
                )
            
            header = PoseHeader(version=version, dimensions=dimensions, components=components)
        else:
            raise ValueError("No metadata found")
    except Exception as e:
        if reference_pose is None:
            raise ValueError(f"Cannot reconstruct header from Parquet metadata and no reference_pose provided. Error: {e}")
        header = reference_pose.header
        points = reference_pose.header.total_points()
        fps = reference_pose.body.fps
    
    data = np.zeros((frames, people, points, dims), dtype=np.float32)
    confidence = np.zeros((frames, people, points), dtype=np.float32)
    
    for row in df.itertuples(index=False):
        frame_idx = int(row.frame_id)
        person_idx = int(row.person_id)
        point_idx = int(row.point_index)
        data[frame_idx, person_idx, point_idx, 0] = float(row.x)
        data[frame_idx, person_idx, point_idx, 1] = float(row.y)
        if dims > 2:
            data[frame_idx, person_idx, point_idx, 2] = float(getattr(row, "z", 0.0))
        confidence[frame_idx, person_idx, point_idx] = float(row.confidence)
    
    pose = build_pose_from_arrays(data, confidence, fps, header)
    
    # Save to file
    with open(output_path, 'wb') as f:
        pose.write(f)
    
    return pose

def parquet_efficient_to_pose(parquet_path: str, output_path: str, reference_pose: Pose = None) -> Pose:
    """
    Convert efficient Parquet (one row per frame) back to a .pose file (now standalone with metadata JSON).
    """
    df = pd.read_parquet(parquet_path)
    frames = len(df)
    
    # Try to load metadata from companion JSON file
    metadata_path = parquet_path.rsplit('.', 1)[0] + '_metadata.json'
    header = None
    fps = 30.0
    
    try:
        with open(metadata_path, 'r', encoding='utf-8') as f:
            metadata = json.load(f)
        
        # Reconstruct header from metadata
        fps = float(metadata["fps"])
        version = float(metadata["version"])
        width = int(metadata["width"])
        height = int(metadata["height"])
        depth_val = int(metadata["depth"])
        shape = metadata["shape"]
        
        people, points, dims = shape[1], shape[2], shape[3]
        
        dimensions = PoseHeaderDimensions(width, height, depth_val)
        
        components_data = metadata["components"]
        components = []
        for comp_data in components_data:
            limbs = [tuple(limb) for limb in comp_data.get("limbs", [])]
            colors = comp_data.get("colors")
            if not colors:
                colors = [(255, 255, 255)] * len(limbs)
            components.append(
                PoseHeaderComponent(
                    name=comp_data["name"],
                    points=comp_data["points"],
                    limbs=limbs,
                    colors=colors,
                    point_format=comp_data.get("format", "XYZC")
                )
            )
        
        header = PoseHeader(version=version, dimensions=dimensions, components=components)
        print(f"‚úÖ Loaded header from: {metadata_path}")
    except FileNotFoundError:
        if reference_pose is None:
            raise ValueError(f"Metadata file not found: {metadata_path}. Provide reference_pose or ensure metadata file exists.")
        header = reference_pose.header
        people = reference_pose.body.data.shape[1]
        points = reference_pose.body.data.shape[2]
        dims = reference_pose.body.data.shape[3]
        fps = reference_pose.body.fps
        print("‚ö†Ô∏è No metadata file found, using reference_pose")
    
    data = np.array(df["data"].to_list(), dtype=np.float32)
    confidence = np.array(df["confidence"].to_list(), dtype=np.float32)
    data = data.reshape(frames, people, points, dims)
    confidence = confidence.reshape(frames, people, points)
    
    pose = build_pose_from_arrays(data, confidence, fps, header)
    
    # Save to file
    with open(output_path, 'wb') as f:
        pose.write(f)
    
    return pose



In [40]:
# Example: Parquet -> .pose (now standalone!)
restored_pose_parquet_efficient = parquet_efficient_to_pose(f"{save_path}/output.parquet", f"{save_path}/restored_from_parquet_efficient.pose")

KeyError: 'width'

## 5Ô∏è‚É£ Convert to CSV

In [22]:
def pose_to_csv(pose: Pose, output_path: str):
    """
    Convert Pose to CSV (exports all components) with header metadata in separate JSON file.
    
    Parameters:
    -----------
    pose : Pose
        Pose object.
    output_path : str
        Output file path (a .json metadata file will be created alongside).
    """
    rows = []
    
    data = pose.body.data
    confidence = pose.body.confidence
    frames, people, _, _ = data.shape
    
    # Build point info for all components
    point_idx = 0
    point_info = []
    for comp in pose.header.components:
        for p_name in comp.points:
            point_info.append((point_idx, comp.name, p_name))
            point_idx += 1
    
    print("üîÑ Converting to CSV...")
    
    for frame_idx in range(frames):
        for person_idx in range(people):
            for p_idx, comp_name, point_name in point_info:
                conf = confidence[frame_idx, person_idx, p_idx]
                if conf == 0:
                    continue
                    
                coords = data[frame_idx, person_idx, p_idx]
                rows.append({
                    'frame': frame_idx,
                    'time': round(frame_idx / pose.body.fps, 4),
                    'person': person_idx,
                    'component': comp_name,
                    'point': point_name,
                    'x': round(float(coords[0]), 4),
                    'y': round(float(coords[1]), 4),
                    'z': round(float(coords[2]), 4) if len(coords) > 2 else 0,
                    'confidence': round(float(conf), 4)
                })
    
    df = pd.DataFrame(rows)
    df.to_csv(output_path, index=False)
    
    # Save header metadata as companion JSON file
    metadata_path = output_path.rsplit('.', 1)[0] + '_metadata.json'
    components_data = []
    for comp in pose.header.components:
        components_data.append({
            "name": comp.name,
            "points": comp.points,
            "format": comp.format,
            "limbs": [[int(x) for x in limb] if hasattr(limb, '__iter__') else int(limb) for limb in comp.limbs],
            "colors": [[int(x) for x in c] if hasattr(c, '__iter__') else int(c) for c in getattr(comp, "colors", [])] if hasattr(comp, "colors") else None
        })
    
    metadata = {
        "fps": float(pose.body.fps),
        "version": float(pose.header.version),
        "dimensions": {
            "width": int(pose.header.dimensions.width),
            "height": int(pose.header.dimensions.height),
            "depth": int(pose.header.dimensions.depth)
        },
        "components": components_data,
        "shape": [int(x) for x in pose.body.data.shape]
    }
    
    with open(metadata_path, 'w', encoding='utf-8') as f:
        json.dump(metadata, f, indent=2)
    
    print(f"‚úÖ Saved to: {output_path}")
    print(f"   üìã Metadata: {metadata_path}")
    print(f"   üìä Rows: {len(df):,}")
    
    return df



In [23]:
# Example: export to CSV
df = pose_to_csv(pose, f"{save_path}/output.csv")

üîÑ Converting to CSV...
‚úÖ Saved to: ../output/convert_pose_formats/output.csv
   üìã Metadata: ../output/convert_pose_formats/output_metadata.json
   üìä Rows: 26,705


In [41]:
def csv_to_pose(csv_path: str, output_path: str, reference_pose: Pose = None) -> Pose:
    """
    Convert CSV back to a .pose file (now standalone with metadata JSON file).
    """
    df = pd.read_csv(csv_path)
    if df.empty:
        raise ValueError("CSV file is empty.")

    # Try to load metadata from companion JSON file
    metadata_path = csv_path.rsplit('.', 1)[0] + '_metadata.json'
    header = None
    fps = 30.0
    
    try:
        with open(metadata_path, 'r', encoding='utf-8') as f:
            metadata = json.load(f)
        
        # Reconstruct header from metadata
        fps = float(metadata["fps"])
        version = float(metadata["version"])
        dimensions = PoseHeaderDimensions(
            width=metadata["dimensions"]["width"],
            height=metadata["dimensions"]["height"],
            depth=metadata["dimensions"]["depth"]
        )
        
        components = []
        for comp_data in metadata["components"]:
            limbs = [tuple(limb) for limb in comp_data.get("limbs", [])]
            colors = comp_data.get("colors")
            if not colors:
                colors = [(255, 255, 255)] * len(limbs)
            components.append(
                PoseHeaderComponent(
                    name=comp_data["name"],
                    points=comp_data["points"],
                    limbs=limbs,
                    colors=colors,
                    point_format=comp_data.get("format", "XYZC")
                )
            )
        
        header = PoseHeader(version=version, dimensions=dimensions, components=components)
        print(f"‚úÖ Loaded header from: {metadata_path}")
    except FileNotFoundError:
        if reference_pose is None:
            raise ValueError(f"Metadata file not found: {metadata_path}. Provide reference_pose or ensure metadata file exists.")
        header = reference_pose.header
        fps = reference_pose.body.fps
        print("‚ö†Ô∏è No metadata file found, using reference_pose")
    
    frames = int(df["frame"].max()) + 1
    people = int(df["person"].max()) + 1
    points = header.total_points()
    dims = 3 if "z" in df.columns else 2
    
    # Map (component, point) -> point index
    point_map = {}
    idx = 0
    for comp in header.components:
        for p_name in comp.points:
            point_map[(comp.name, p_name)] = idx
            idx += 1
    
    data = np.zeros((frames, people, points, dims), dtype=np.float32)
    confidence = np.zeros((frames, people, points), dtype=np.float32)
    
    for row in df.itertuples(index=False):
        frame_idx = int(row.frame)
        person_idx = int(row.person)
        point_idx = int(point_map[(row.component, row.point)])
        data[frame_idx, person_idx, point_idx, 0] = float(row.x)
        data[frame_idx, person_idx, point_idx, 1] = float(row.y)
        if dims > 2:
            data[frame_idx, person_idx, point_idx, 2] = float(getattr(row, "z", 0.0))
        confidence[frame_idx, person_idx, point_idx] = float(row.confidence)
    
    pose = build_pose_from_arrays(data, confidence, fps, header)
    
    # Save to file
    with open(output_path, 'wb') as f:
        pose.write(f)
    
    return pose


In [42]:
# Example: CSV -> .pose (now standalone!)
restored_pose_csv = csv_to_pose(f"{save_path}/output.csv", f"{save_path}/restored_from_csv.pose")

‚úÖ Loaded header from: ../output/convert_pose_formats/output_metadata.json


## 6Ô∏è‚É£ Comprehensive conversion function

## ‚ú® Standalone Conversion Tests

All conversion methods now work **independently** without requiring `reference_pose`!
- ‚úÖ Compact JSON: Includes full header
- ‚úÖ NPZ: Stores components as JSON
- ‚úÖ Parquet: Uses custom metadata
- ‚úÖ CSV: Creates companion `_metadata.json` file

In [26]:
# Test Standalone Conversions (No reference_pose needed!)
print("üß™ Testing Standalone Conversions...\n")

# 1. Test Compact JSON
print("1Ô∏è‚É£ Testing Compact JSON...")
pose_to_json_compact(pose, f"{save_path}/test.compact.json")
restored_compact = json_compact_to_pose(f"{save_path}/test.compact.json", f"{save_path}/test_compact_restored.pose")
print(f"   ‚úÖ Original shape: {pose.body.data.shape}")
print(f"   ‚úÖ Restored shape: {restored_compact.body.data.shape}")
print("‚úÖ Compact JSON: PASSED\n")

# 2. Test NPZ
print("2Ô∏è‚É£ Testing NPZ...")
pose_to_npz(pose, f"{save_path}/test.npz", compressed=True)
restored_npz = npz_to_pose(f"{save_path}/test.npz", f"{save_path}/test_npz_restored.pose")
print(f"   ‚úÖ Original shape: {pose.body.data.shape}")
print(f"   ‚úÖ Restored shape: {restored_npz.body.data.shape}")
print("‚úÖ NPZ: PASSED\n")

# 3. Test CSV
print("3Ô∏è‚É£ Testing CSV...")
pose_to_csv(pose, f"{save_path}/test.csv")
restored_csv = csv_to_pose(f"{save_path}/test.csv", f"{save_path}/test_csv_restored.pose")
print(f"   ‚úÖ Original shape: {pose.body.data.shape}")
print(f"   ‚úÖ Restored shape: {restored_csv.body.data.shape}")
print("‚úÖ CSV: PASSED\n")

# 4. Test Parquet (efficient format)
print("4Ô∏è‚É£ Testing Parquet...")
pose_to_parquet_efficient(pose, f"{save_path}/test_efficient.parquet")
restored_parquet = parquet_efficient_to_pose(f"{save_path}/test_efficient.parquet", f"{save_path}/test_parquet_restored.pose")
print(f"   ‚úÖ Original shape: {pose.body.data.shape}")
print(f"   ‚úÖ Restored shape: {restored_parquet.body.data.shape}")
print("‚úÖ Parquet: PASSED\n")

print("=" * 60)
print("üéâ ALL CONVERSIONS ARE NOW STANDALONE!")
print("=" * 60)
print("No need for reference_pose anymore! ‚ú®")

üß™ Testing Standalone Conversions...

1Ô∏è‚É£ Testing Compact JSON...
‚úÖ Saved (compact) to: ../output/convert_pose_formats/test.compact.json
   ‚úÖ Original shape: (133, 1, 203, 3)
   ‚úÖ Restored shape: (133, 1, 203, 3)
‚úÖ Compact JSON: PASSED

2Ô∏è‚É£ Testing NPZ...
‚úÖ Saved to: ../output/convert_pose_formats/test.npz
   üì¶ File size: 306.53 KB
   ‚úÖ Original shape: (133, 1, 203, 3)
   ‚úÖ Restored shape: (133, 1, 203, 3)
‚úÖ NPZ: PASSED

3Ô∏è‚É£ Testing CSV...
üîÑ Converting to CSV...
‚úÖ Saved to: ../output/convert_pose_formats/test.csv
   üìã Metadata: ../output/convert_pose_formats/test_metadata.json
   üìä Rows: 26,705
‚úÖ Loaded header from: ../output/convert_pose_formats/test_metadata.json
   ‚úÖ Original shape: (133, 1, 203, 3)
   ‚úÖ Restored shape: (133, 1, 203, 3)
‚úÖ CSV: PASSED

4Ô∏è‚É£ Testing Parquet...
‚úÖ Saved (efficient) to: ../output/convert_pose_formats/test_efficient.parquet
   üìã Metadata: ../output/convert_pose_formats/test_efficient_metadata.jso

In [82]:
def convert_pose(pose_path: str, output_format: str, output_path: str = None):
    """
    Comprehensive conversion function.
    
    Parameters:
    -----------
    pose_path : str
        Path to the .pose file.
    output_format : str
        Desired format: 'json', 'npz', 'parquet', 'csv'.
    output_path : str, optional
        Output file path (auto-generated if not provided).
    """
    # Load file
    pose = load_pose_file(pose_path)
    
    # Build output filename
    base_path = Path(pose_path).stem
    
    format_map = {
        'json': ('.json', pose_to_json),
        'npz': ('.npz', pose_to_npz),
        'parquet': ('.parquet', pose_to_parquet),
        'csv': ('.csv', pose_to_csv)
    }
    
    if output_format not in format_map:
        raise ValueError(
            f"Unsupported format: {output_format}. Supported formats: {list(format_map.keys())}"
        )
    
    ext, converter = format_map[output_format]
    
    if output_path is None:
        output_path = f"{base_path}{ext}"
    
    return converter(pose, output_path)

# Usage examples:
# convert_pose("input.pose", "json")
# convert_pose("input.pose", "npz")
# convert_pose("input.pose", "parquet")
# convert_pose("input.pose", "csv")

## 7Ô∏è‚É£ Batch conversion

In [None]:
from pathlib import Path
from tqdm import tqdm

def batch_convert(input_dir: str, output_format: str, output_dir: str = None):
    """
    Convert a folder of .pose files.
    """
    input_path = Path(input_dir)
    pose_files = list(input_path.glob("*.pose"))
    
    if not pose_files:
        print("‚ö†Ô∏è No .pose files found")
        return
    
    print(f"üìÅ Found {len(pose_files)} files")
    
    # Create output folder
    if output_dir is None:
        output_dir = input_path / f"converted_{output_format}"
    else:
        output_dir = Path(output_dir)
    
    output_dir.mkdir(exist_ok=True)
    
    # Convert files
    for pose_file in tqdm(pose_files, desc="Converting"):
        try:
            ext = {'json': '.json', 'npz': '.npz', 'parquet': '.parquet', 'csv': '.csv'}[output_format]
            output_path = output_dir / f"{pose_file.stem}{ext}"
            convert_pose(str(pose_file), output_format, str(output_path))
        except Exception as e:
            print(f"‚ùå Error in {pose_file.name}: {e}")
    
    print(f"\n‚úÖ Converted files saved to: {output_dir}")

# batch_convert("pose_files_folder", "parquet")

## 8Ô∏è‚É£ Compare file sizes

In [43]:
def compare_file_sizes(pose_path: str):
    """
    Compare file sizes across different formats.
    """
    import tempfile
    import os
    
    pose = load_pose_file(pose_path)
    original_size = Path(pose_path).stat().st_size
    
    results = {'pose (original)': original_size}
    
    with tempfile.TemporaryDirectory() as tmpdir:
        # JSON
        json_path = os.path.join(tmpdir, "test.json")
        pose_to_json(pose, json_path)
        results['json'] = Path(json_path).stat().st_size
        
        # NPZ (compressed)
        npz_path = os.path.join(tmpdir, "test.npz")
        pose_to_npz(pose, npz_path, compressed=True)
        results['npz (compressed)'] = Path(npz_path).stat().st_size
        
        # Parquet
        parquet_path = os.path.join(tmpdir, "test.parquet")
        pose_to_parquet_efficient(pose, parquet_path)
        results['parquet (efficient)'] = Path(parquet_path).stat().st_size
    
    # Show results
    print("\n" + "=" * 50)
    print("üìä File size comparison")
    print("=" * 50)
    
    for fmt, size in sorted(results.items(), key=lambda x: x[1]):
        ratio = size / original_size * 100
        print(f"{fmt:25} {size/1024:10.2f} KB  ({ratio:6.1f}%)")



In [45]:
compare_file_sizes(f"{save_path}/restored_from_npz.pose")

‚úÖ Saved to: C:\Users\micrk\AppData\Local\Temp\tmp1hk83buf\test.json
‚úÖ Saved to: C:\Users\micrk\AppData\Local\Temp\tmp1hk83buf\test.npz
   üì¶ File size: 306.53 KB
‚úÖ Saved (efficient) to: C:\Users\micrk\AppData\Local\Temp\tmp1hk83buf\test.parquet
   üìã Metadata: C:\Users\micrk\AppData\Local\Temp\tmp1hk83buf\test_metadata.json
   üìä Frames: 133

üìä File size comparison
npz (compressed)              306.53 KB  (  72.1%)
pose (original)               424.93 KB  ( 100.0%)
parquet (efficient)           680.45 KB  ( 160.1%)
json                         3498.86 KB  ( 823.4%)


## üìö Format summary

| Format | Pros | Cons | Best use |
|--------|------|------|----------|
| `.pose` | Native library format, compressed | Library-specific | Storage and library workflows |
| `.json` | Human-readable, web-friendly | Large size | Web, debugging |
| `.npz` | Efficient for NumPy, compressed | Requires NumPy | Scientific analysis |
| `.parquet` | Efficient for big data | Requires pandas | Large-scale analytics |
| `.csv` | Compatible with most tools | Very large size | Excel, other tools |
