In [None]:
# make sure the framework is installed
!cd .. && uv pip install -e .
from tqdm.auto import tqdm
from robot_frames import RobotFrame
from robot_analysis_function import RobotUpsideDownAnalysis

from framework.prefetcher import FramePrefetcher
from framework.utils import (
    get_first_frame_index,
    get_last_frame_index,
    proportion_with_answer,
    warm_up_frame_class,
)

from framework.process_frames import process_frames

In [None]:
# initalize the groundlight client for the class
RobotFrame.initialize_gl()

# Warm up the robot detector - it will submit 10% of the frames to the robot detector to warm up the detector.
# The frames are picked at random to get a representative sample of the entire video.
# Then I'll go label these by hand with the assistance of the cloud labelers.
# Alternatively, if you want clear designation between training and testing data, you can provide a list of frame indices to warm up on, e.g.
# warm up on the first N minutes and test on the remaining M minutes.
warm_up_frame_class(frame_class=RobotFrame, proportion=0.1)

In [None]:
# Once I'm happy with the detector, I'll run all of the frames through groundlight.
start_index = get_first_frame_index()
end_index = get_last_frame_index()
indicies = list(range(start_index, end_index + 1))

prefetcher = FramePrefetcher(
    frame_class=RobotFrame,
    indicies=indicies,
    action="process",
)
for index in tqdm(indicies, desc="Processing frames"):
    frame = prefetcher.get_frame(index)

In [None]:
# Now I wait a minute for the frames above to be processed (and inference timeouts to resolve).
# Then I pull down updated inference results for frames where answers are available now but were not available before.
prefetcher = FramePrefetcher(
    frame_class=RobotFrame,
    indicies=indicies,
    action="update",
)
for index in tqdm(indicies, desc="Updating frames"):
    frame = prefetcher.get_frame(index)

# Check what proportion of frames have an answer.
proportion = proportion_with_answer(
    frame_class=RobotFrame,
    indices=indicies,
    has_answer_function=RobotFrame.has_answer,
)
print(f"Proportion of frames with an answer: {proportion}")


In [None]:
# My analysis function is a class that counts the number of frames the robot is upside down and stores that to state.
# it then draws the state on the frame and returns the frame.
analysis_class = RobotUpsideDownAnalysis()


process_frames(
    run_name="robot_upside_down_detection",
    indices=indicies,
    output_path="output.mp4",
    analysis_function=analysis_class.analyze_frame,
    frame_class=RobotFrame,
    input_video_path="../data/boston_dynamics.mp4",
)