In [3]:
from PIL import Image
from pathlib import Path
from vlmx.agent import Agent, AgentConfig
from dotenv import load_dotenv
from vlmx.context_agent import ContextAwareAgent
import os
import tempfile
from vlmx.tool_use_agent import ToolUseAgent, TOOL_INSTRUCTION
from vlmx.artifact import ArtifactDisplayHandler, artifacts_to_prompt_parts, ArtifactCollector
from typing import Dict, Any, Optional, List, Union
import sys
from io import StringIO
from vlmx.utils import string_to_file, join_path, extract_code_from_string
from contextlib import contextmanager
from io import StringIO
from PIL import Image
from pathlib import Path

In [4]:
def construct_prompt(prompt_txt_path, content_dict, special_separator="<|>"):
    with open(prompt_txt_path, 'r') as f:
        prompt = f.read()
    for key, value in content_dict.items():
        if isinstance(value, str):
            # assert not os.path.exists(value), f"Expected a string value for {key}, but got a file path: {value}"
            prompt = prompt.replace(f"{special_separator}{key}{special_separator}", value)
    prompt_parts = []
    current_pos = 0
    while True:
        # Find next separator
        next_sep = prompt.find(special_separator, current_pos)
        if next_sep == -1:
            # Add remaining text
            if current_pos < len(prompt):
                prompt_parts.append(prompt[current_pos:])
            break
            
        # Add text before separator
        if current_pos < next_sep:
            prompt_parts.append(prompt[current_pos:next_sep])
            
        # Find end of key
        end_sep = prompt.find(special_separator, next_sep + len(special_separator))
        if end_sep == -1:
            raise ValueError(f"Unmatched separator at position {next_sep}")
            
        # Extract key and get corresponding image
        key = prompt[next_sep + len(special_separator):end_sep]
        if key in content_dict:
            prompt_parts.append(content_dict[key])
        else:
            raise ValueError(f"Key {key} not found in content_dict")
            
        current_pos = end_sep + len(special_separator)
    return prompt_parts


In [20]:
text_only_dict = {'SKILL': 'pick up the blue block', 'CURRENT_IMAGE': red_image}
prompt_parts = construct_prompt('./prompts/skill_completion.txt', text_only_dict)
for part in prompt_parts:
    print(part)

You are providing guidance to a robot as it completes a manipulation task.
In particular, you need to tell the robot when it has successfully completed the current skill (subtask) so it knows when it can move on to the next step.
We will show you an image that shows the current scene, and we will ask you if a particular skill (subtask) has been successfully completed.
You must answer with either 'True' or 'False'.
Ok here is the image of the current state:

<PIL.Image.Image image mode=RGB size=100x100 at 0x11A51EDD0>

Has the robot successfully completed the skill: "pick up the blue block" (answer 'True' or 'False')


In [5]:
SYSTEM_INSTRUCTION = "Please follow the instructions in the prompt."
class HelperAgent(Agent):
    OUT_RESULT_PATH = "test.txt"

    def _make_system_instruction(self):
        return SYSTEM_INSTRUCTION

    def _make_prompt_parts(self, question: str):
        return question

    def parse_response(self, response):
        print("response:", response.text)
        ## string_to_file(response.txt, "path.txt")

In [32]:
# Let's test skill skill_completion.txt for the grasp_blue_block task
video_dir = '/Users/christopherwatson/Documents/ML/pi0_transitions/stack_blocks/perfect'
images_dict = {
    'PRE': Path(video_dir) / 'pick_up_the_blue_block_PRE.jpg',
    'ALMOST': Path(video_dir) / 'pick_up_the_blue_block_ALMOST.jpg',
    'GRASP': Path(video_dir) / 'pick_up_the_blue_block_GRASP.jpg',
    'LIFTING': Path(video_dir) / 'pick_up_the_blue_block_LIFTING.jpg',
}
for k in images_dict:
    images_dict[k] = Image.open(images_dict[k])
model_name_list = ['gpt-4o']
model_dict = {}
load_dotenv()
for model_name in model_name_list:
    cfg = AgentConfig(model_name=model_name,
            out_dir=f"test_results",)
    model_dict[model_name] = HelperAgent(cfg)

for image_name, image in images_dict.items():
    for model_name, model in model_dict.items():
        print(f"Trying {model_name}... on {image_name}")
        messages = construct_prompt('./prompts/skill_completion_predicate.txt', {'SKILL': 'pick up the blue block', 'PRE_IMAGE': images_dict['PRE'], 'CURRENT_IMAGE': image, 'PREDICATE': 'GRASPING BLUE BLOCK'})
        response = model.generate_prediction(messages, gen_config={'temperature': 0.0})
        print(response.text)





Trying gpt-4o... on PRE
response: False
False
Trying gpt-4o... on ALMOST
response: False
False
Trying gpt-4o... on GRASP
response: False
False
Trying gpt-4o... on LIFTING
response: False
False
