# **Setup Environment**

In [None]:
# Change directory to the project root so that relative paths in .env 
# (e.g., model weights, data directories) resolve correctly
%cd ../

In [None]:
import sys
from dotenv import load_dotenv
sys.path.append('.')
load_dotenv()

from typing import Iterator
import itertools
from app.human_pose_estimator import (
    HumanPoseEstimator2D,
    HumanDetectorConfig,
    PoseEstimatorConfig
)
from app.human_pose_estimator.types import RGBFrame
from app.s3 import files_service
from app.video import (
    decode_from_minio_streaming,
    encode_and_upload_to_minio_streaming,
    render_annotated_frame,
    stream_video_from_presigned_url
)

human_detector_config = HumanDetectorConfig()
pose_estimator_config = PoseEstimatorConfig()

In [None]:
# Start MinIO â€” required by code that interacts with object storage
!pixi run just minio-up

In [None]:
files_service.create_bucket()

In [None]:
# # After finishing your work, shut down MinIO to free resources
# !pixi run just minio-down

In [None]:
# Change directory to 'assets/' so that files can be referenced directly 
# (e.g., 'test.png' instead of 'assets/test.png')
%cd playground-testing/assets/

## **Test**

In [None]:
def annotated_frame_generator(
    frame_generator: Iterator[RGBFrame],
    estimator: HumanPoseEstimator2D
) -> Iterator[RGBFrame]:
    """
    Transform a stream of raw video frames into annotated overlays.

    This generator applies pose estimation to each input frame and renders
    visual annotations (bounding box, keypoints, and skeleton) directly onto
    the image. It is resolution-agnostic and processes frames sequentially,
    making it suitable for memory-constrained streaming pipelines.

    Parameters
    ----------
    frame_generator : Iterator[RGBFrame]
        Source of RGB video frames to annotate.
    estimator : HumanPoseEstimator2D
        Pose estimation model to apply on each frame.

    Yields
    ------
    RGBFrame
        Annotated frame with overlaid pose visualization.
    """

    for frame in frame_generator:
        result = estimator(frame)
        annotated = render_annotated_frame(
            frame=frame,
            keypoints=result.keypoints,
            bounding_box=result.bbox
        )
        yield annotated

In [None]:
def test_visualization_end_to_end(
    input_video_path: str,
    output_video_path: str,
    input_storage_key: str,
    output_storage_key: str,
    fps_target: float,
    crf: int
) -> None:
    """
    End-to-end test for streaming video pose estimation and annotated video export.

    Parameters
    ----------
    input_video_path : str
        Local path to the source video file.
    output_video_path : str
        Local path where the annotated video will be saved.
    input_storage_key : str
        MinIO object key for the input video.
    output_storage_key : str
        MinIO object key for the output video.
    fps_target : float
        Target frame rate for decoding and encoding.
    crf : int
        Constant Rate Factor for H.264 encoding (lower = higher quality).
        Must be in `[0, 51]`.
    """

    # 1. Upload input video to MinIO
    with open(input_video_path, "rb") as f:
        files_service.upload_fileobj(storage_key=input_storage_key, fileobj=f)

    # 2. Initialize raw frame stream from MinIO
    raw_frame_gen = decode_from_minio_streaming(
        storage_key=input_storage_key,
        expires=300,
        fps_target=fps_target
    )

    # 3. Extract first frame to determine resolution
    try:
        first_frame = next(raw_frame_gen)
    except StopIteration:
        raise RuntimeError("Input video contains no frames")

    height, width = first_frame.shape[:2]

    # 4. Reconstruct full frame stream including the first frame
    full_raw_gen = itertools.chain([first_frame], raw_frame_gen)

    # 5. Initialize pose estimator
    human_pose_estimator = HumanPoseEstimator2D(
        human_detector_config=human_detector_config,
        pose_estimator_config=pose_estimator_config
    )

    # 6. Wrap in annotation generator
    annotated_gen = annotated_frame_generator(
        frame_generator=full_raw_gen,
        estimator=human_pose_estimator
    )

    # 7. Encode and upload annotated video to MinIO
    encode_and_upload_to_minio_streaming(
        frame_generator=annotated_gen,
        storage_key=output_storage_key,
        width=width,
        height=height,
        fps_target=fps_target,
        crf=crf
    )

    # 8. Download resulting video for local inspection
    presigned_url = files_service.generate_presigned_url(
        storage_key=output_storage_key,
        expires=300
    )

    # 9. Save resulting video
    with open(output_video_path, "wb") as f:
        for chunk in stream_video_from_presigned_url(presigned_url):
            f.write(chunk)

    print(f"Successfully loaded video file '{input_video_path}', estimated pose, and saved resulting video to '{output_video_path}'.")

In [None]:
test_visualization_end_to_end(
    input_video_path = "test.mp4",
    output_video_path = "3.annotated_test.mp4",
    input_storage_key = "test3/input_video",
    output_storage_key = "test3/output_video",
    fps_target = 5.0,
    crf = 30
)