diff --git a/src/rai_bench/rai_bench/examples/manipulation_o3de.py b/src/rai_bench/rai_bench/examples/manipulation_o3de.py index bd0257c5c..675ea8c01 100644 --- a/src/rai_bench/rai_bench/examples/manipulation_o3de.py +++ b/src/rai_bench/rai_bench/examples/manipulation_o3de.py @@ -31,7 +31,6 @@ model_name=args.model_name, vendor=args.vendor, ) - run_benchmark( llm=llm, out_dir=experiment_dir, diff --git a/src/rai_bench/rai_bench/examples/tool_calling_custom_agent.py b/src/rai_bench/rai_bench/examples/tool_calling_custom_agent.py new file mode 100644 index 000000000..7fc878781 --- /dev/null +++ b/src/rai_bench/rai_bench/examples/tool_calling_custom_agent.py @@ -0,0 +1,100 @@ +# Copyright (C) 2025 Robotec.AI +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +import uuid +from datetime import datetime +from pathlib import Path + +from rai.agents.langchain.core import ( + Executor, + create_megamind, + get_initial_megamind_state, +) + +from rai_bench import ( + define_benchmark_logger, +) +from rai_bench.tool_calling_agent.benchmark import ToolCallingAgentBenchmark +from rai_bench.tool_calling_agent.interfaces import TaskArgs +from rai_bench.tool_calling_agent.tasks.warehouse import SortingTask +from rai_bench.utils import get_llm_for_benchmark + +if __name__ == "__main__": + now = datetime.now() + out_dir = f"src/rai_bench/rai_bench/experiments/tool_calling/{now.strftime('%Y-%m-%d_%H-%M-%S')}" + experiment_dir = Path(out_dir) + experiment_dir.mkdir(parents=True, exist_ok=True) + bench_logger = define_benchmark_logger(out_dir=experiment_dir, level=logging.DEBUG) + + task = SortingTask(task_args=TaskArgs(extra_tool_calls=50)) + task.set_logger(bench_logger) + + supervisor_name = "gpt-4o" + + executor_name = "gpt-4o-mini" + model_name = f"supervisor-{supervisor_name}_executor-{executor_name}" + supervisor_llm = get_llm_for_benchmark(model_name=supervisor_name, vendor="openai") + executor_llm = get_llm_for_benchmark( + model_name=executor_name, + vendor="openai", + ) + + benchmark = ToolCallingAgentBenchmark( + tasks=[task], + logger=bench_logger, + model_name=model_name, + results_dir=experiment_dir, + ) + manipulation_system_prompt = """You are a manipulation specialist robot agent. +Your role is to handle object manipulation tasks including picking up and droping objects using provided tools. + +Ask the VLM for objects detection and positions before perfomring any manipulation action. +If VLM doesn't see objects that are objectives of the task, return this information, without proceeding""" + + navigation_system_prompt = """You are a navigation specialist robot agent. +Your role is to handle navigation tasks in space using provided tools. + +After performing navigation action, always check your current position to ensure success""" + + executors = [ + Executor( + name="manipulation", + llm=executor_llm, + tools=task.manipulation_tools(), + system_prompt=manipulation_system_prompt, + ), + Executor( + name="navigation", + llm=executor_llm, + tools=task.navigation_tools(), + system_prompt=navigation_system_prompt, + ), + ] + agent = create_megamind( + megamind_llm=supervisor_llm, + megamind_system_prompt=task.get_system_prompt(), + executors=executors, + task_planning_prompt=task.get_planning_prompt(), + ) + + experiment_id = uuid.uuid4() + benchmark.run_next( + agent=agent, + initial_state=get_initial_megamind_state(task=task.get_prompt()), + experiment_id=experiment_id, + ) + + bench_logger.info("===============================================================") + bench_logger.info("ALL SCENARIOS DONE. BENCHMARK COMPLETED!") + bench_logger.info("===============================================================") diff --git a/src/rai_bench/rai_bench/tool_calling_agent/benchmark.py b/src/rai_bench/rai_bench/tool_calling_agent/benchmark.py index 44923c860..d1843b843 100644 --- a/src/rai_bench/rai_bench/tool_calling_agent/benchmark.py +++ b/src/rai_bench/rai_bench/tool_calling_agent/benchmark.py @@ -19,14 +19,14 @@ from typing import Iterator, List, Optional, Sequence, Tuple from langchain_core.language_models import BaseChatModel -from langchain_core.messages import BaseMessage +from langchain_core.messages import AIMessage, BaseMessage from langchain_core.runnables.config import RunnableConfig from langgraph.errors import GraphRecursionError from langgraph.graph.state import CompiledStateGraph from rai.agents.langchain.core import ( create_conversational_agent, ) -from rai.messages import HumanMultimodalMessage +from rai.agents.langchain.core.react_agent import ReActAgentState from rai_bench.agents import create_multimodal_to_tool_agent from rai_bench.base_benchmark import BaseBenchmark, TimeoutException @@ -38,9 +38,6 @@ TaskResult, ToolCallingAgentRunSummary, ) -from rai_bench.tool_calling_agent.tasks.spatial import ( - SpatialReasoningAgentTask, -) from rai_bench.utils import get_llm_model_name @@ -67,7 +64,12 @@ def __init__( self.tasks_results: List[TaskResult] = [] self.csv_initialize(self.results_filename, TaskResult) - def run_next(self, agent: CompiledStateGraph, experiment_id: uuid.UUID) -> None: + def run_next( + self, + agent: CompiledStateGraph, + initial_state: ReActAgentState, + experiment_id: uuid.UUID, + ) -> None: """Runs the next task of the benchmark. Parameters @@ -87,14 +89,16 @@ def run_next(self, agent: CompiledStateGraph, experiment_id: uuid.UUID) -> None: ) callbacks = self.score_tracing_handler.get_callbacks() run_id = uuid.uuid4() - # NOTE (jmatejcz) recursion limit calculated as all_nodes_num -> one pass though whole node - # plus (task.max_tool_calls_number-1 because the first pass is already added in) - # times number of nodes - 2 because we dont cout start and end node - # this can be to much for larger graphs that dont use all nodes on extra calls - # in such ase adjust this value - recurssion_limit = len(agent.get_graph().nodes) + ( - task.max_tool_calls_number - 1 - ) * (len(agent.get_graph().nodes) - 2) + # NOTE (jmatejcz) recursion limit calculated as (all_nodes_num - 2) * required tool calls + # -2 because we don't want to include START and END node + # then we add numer of additional calls that can be made + # and +2 as we have to pass once though START and END + + recurssion_limit = ( + (len(agent.get_graph().nodes) - 2) * task.required_calls + + task.additional_calls + + 2 + ) config: RunnableConfig = { "run_id": run_id, "callbacks": callbacks, @@ -113,40 +117,27 @@ def run_next(self, agent: CompiledStateGraph, experiment_id: uuid.UUID) -> None: messages: List[BaseMessage] = [] prev_count: int = 0 try: - with self.time_limit(20 * task.max_tool_calls_number): - if isinstance(task, SpatialReasoningAgentTask): - for state in agent.stream( - { - "messages": [ - HumanMultimodalMessage( - content=task.get_prompt(), images=task.get_images() - ) - ] - }, - config=config, - ): - node = next(iter(state)) - all_messages = state[node]["messages"] - for new_msg in all_messages[prev_count:]: - messages.append(new_msg) - prev_count = len(messages) - else: - for state in agent.stream( - { - "messages": [ - HumanMultimodalMessage(content=task.get_prompt()) - ] - }, - config=config, - ): - node = next(iter(state)) + with self.time_limit(200 * task.max_tool_calls_number): + for state in agent.stream( + initial_state, + config=config, + ): + node = next(iter(state)) + if "messages" in state[node]: all_messages = state[node]["messages"] for new_msg in all_messages[prev_count:]: messages.append(new_msg) + if isinstance(new_msg, AIMessage): + self.logger.debug( + f"Message from node '{node}': {new_msg.content}, tool_calls: {new_msg.tool_calls}" + ) prev_count = len(messages) except TimeoutException as e: self.logger.error(msg=f"Task timeout: {e}") except GraphRecursionError as e: + tool_calls = task.get_tool_calls_from_messages(messages=messages) + score = task.validate(tool_calls=tool_calls) + score = 0.0 self.logger.error(msg=f"Reached recursion limit {e}") tool_calls = task.get_tool_calls_from_messages(messages=messages) diff --git a/src/rai_bench/rai_bench/tool_calling_agent/interfaces.py b/src/rai_bench/rai_bench/tool_calling_agent/interfaces.py index 7d8cb3187..a92fbd57b 100644 --- a/src/rai_bench/rai_bench/tool_calling_agent/interfaces.py +++ b/src/rai_bench/rai_bench/tool_calling_agent/interfaces.py @@ -585,6 +585,14 @@ def max_tool_calls_number(self) -> int: + self.extra_tool_calls ) + @property + def additional_calls(self) -> int: + """number of additional calls that can be done to still pass task. + Includes extra tool calls params. + and optional tool calls number which depends on task. + """ + return self.optional_tool_calls_number + self.extra_tool_calls + @property def required_calls(self) -> int: """Minimal number of calls required to complete task""" diff --git a/src/rai_bench/rai_bench/tool_calling_agent/tasks/warehouse.py b/src/rai_bench/rai_bench/tool_calling_agent/tasks/warehouse.py new file mode 100644 index 000000000..423430fc7 --- /dev/null +++ b/src/rai_bench/rai_bench/tool_calling_agent/tasks/warehouse.py @@ -0,0 +1,533 @@ +# Copyright (C) 2025 Robotec.AI +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any, Dict, List, Optional, Tuple + +from langchain_core.tools import BaseTool, tool + +from rai_bench.tool_calling_agent.interfaces import Task, TaskArgs, Validator +from rai_bench.tool_calling_agent.subtasks import CheckArgsToolCallSubTask +from rai_bench.tool_calling_agent.validators import OrderedCallsValidator + +WAREHOUSE_ENVIRONMENT_DESCRIPTION = """ +WAREHOUSE LAYOUT: + +TABLE WITH SLOTS: +- Table location: x=10-11, y=1-7 +- Slot 1: (10.0, 1.5) +- Slot 2: (10.0, 3.0) +- Slot 3: (10.0, 4.5) +- Slot 4: (10.0, 6.0) +When navigating to the table remember that you can't navigate into it, +always approach from the side that is closer to rack (use x=10). + +Each slot can contain at most 1 item that can be picked up. +New Items won't appear during the task, so if you picked objects from a ceratin slot, +it will be empty for the rest of the task. + +STORAGE RACKS: +Storage Rack 1 location: x=2-6 y=5-6 +- Boxes: (3.0, 5.0), (5.0, 5.0) +When navigating to the tack remember that you can't navigate into it, +always approach from the side that is closer to starting position (use y=5). + +ROBOT STARTING POSITION: +- Robot starting location: (4.0, 2.0) +""" +SYSTEM_PROMPT = """You are a mobile robot operating in a warehouse environment for pick-and-place operations.""" + + +class EnvStateManager: + """Enhanced env state manager that tracks objects, boxes, and robot state""" + + def __init__(self): + self._state = { + "robot_position": (4.0, 2.0), + "gripper_state": "open", + } + + self._objects = { + "obj_1": { + "world_position": (10.5, 1.5), # Slot 1 position + "color": "blue", + # when picked up by the robot the obj will "disappear" from the vlm view + # when dropped the object will appear with different values + "picked_up": False, + "relative": (0.02, 0.1, 0.05), # relative to robot when at slot + }, + "obj_2": { + "world_position": (10.5, 3.0), # Slot 2 + "color": "red", + "picked_up": False, + "relative": (-0.2, 0.05, 0.05), + }, + "obj_3": { + "world_position": (10.5, 4.5), # Slot 3 + "color": "green", + "picked_up": False, + "relative": (0.1, 0.4, 0.05), + }, + "obj_4": { + "world_position": (10.5, 6.0), # Slot 4 + "color": "green", + "picked_up": False, + "relative": (0.15, -0.25, 0.05), + }, + } + + self._boxes = { + "box_1": { + "world_position": (3.0, 5.0), + "objects": [], # List of objects in this box + "relative": (0.2, 0, 0.05), # relative when robot is at box + }, + "box_2": { + "world_position": (5.0, 5.0), + "objects": [], + "relative": (0.1, -0.05, 0.05), + }, + } + + def get_position(self) -> Tuple[float, float]: + return self._state["robot_position"] + + def set_position(self, x: float, y: float): + self._state["robot_position"] = (x, y) + + def get_held_object(self) -> Optional[str]: + return self._state.get("held_object") + + def pick_up_object_at_position( + self, relative_pos: Tuple[float, float, float] + ) -> Optional[str]: + """Pick up object at relative position from current robot location""" + robot_x, robot_y = self.get_position() + + # Find object at the relative position + for obj, obj_data in self._objects.items(): + if not obj_data["picked_up"]: + # Check if this object is at the current location with matching relative position + if relative_pos == obj_data["relative"]: + # Check if robot is at the right slot for this object + if ( + abs(robot_x - obj_data["world_position"][0]) <= 0.5 + and abs(robot_y - obj_data["world_position"][1]) <= 0.5 + ): + obj_data["picked_up"] = True + self._state["held_object"] = obj + return obj + return None + + def drop_object_at_position(self, relative_pos: Tuple[float, float, float]) -> None: + """Drop held object at relative position from current robot location""" + # Check if placed in box, if yes, change env state + robot_x, robot_y = self.get_position() + # Find which box we're dropping into + for box_id, box_data in self._boxes.items(): + if relative_pos == box_data["relative"]: + # Check if robot is at the right position for this box + if ( + robot_x == box_data["world_position"][0] + and robot_y == box_data["world_position"][1] + ): + # Drop object into box + obj_id = self._state["held_object"] + box_data["objects"].append(obj_id) + + # Update object position to be in the box + self._objects[obj_id]["world_position"] = ( + box_data["world_position"][0] + relative_pos[0], + box_data["world_position"][1] + relative_pos[1], + ) + + self._state["held_object"] = None + + def get_visible_objects_at_position(self) -> List[Dict]: + """Get objects visible at current robot position""" + robot_x, robot_y = self.get_position() + visible_objects = [] + + # Check for objects at table slots + if abs(robot_x - 10.0) <= 0.5: # At sorting table + for obj_id, obj_data in self._objects.items(): + if not obj_data["picked_up"]: + obj_world_pos = obj_data["world_position"] + # Check if object is at current slot + expected_robot_y = obj_world_pos[1] - obj_data["relative"][1] + if abs(robot_y - expected_robot_y) <= 0.5: + visible_objects.append( + { + "id": obj_id, + "color": obj_data["color"], + "relative_position": obj_data["relative"], + } + ) + + return visible_objects + + def get_visible_boxes_at_position(self) -> List[Dict]: + """Get boxes visible at current robot position""" + robot_x, robot_y = self.get_position() + visible_boxes = [] + + # Check for boxes at storage rack + if 2 <= robot_x <= 6 and abs(robot_y - 5.5) <= 0.5: + for box_id, box_data in self._boxes.items(): + box_world_pos = box_data["world_position"] + if abs(robot_x - box_world_pos[0]) <= 0.5: + visible_boxes.append( + { + "id": box_id, + "relative_position": box_data["relative"], + "contents": [ + self._objects[obj_id]["color"] + for obj_id in box_data["objects"] + ], + } + ) + + return visible_boxes + + def get_state_summary(self) -> Dict: + """Get complete state for debugging""" + return { + "robot_position": self._state["robot_position"], + "gripper_state": self._state["gripper_state"], + "held_object": self._state.get("held_object"), + "objects": self._objects, + "boxes": self._boxes, + } + + +class SortingTask(Task): + complexity = "hard" + type = "warehouse" + + def __init__( + self, + task_args: TaskArgs, + validators: Optional[List[Validator]] = None, + **kwargs: Any, + ) -> None: + if not validators: + # after every navigate call + # the where am i should probably be called? should it be mandatory? + # it is for now + # Should ask vlm be called after manipulaiton action? + # So robot can confirm if it pick or droppped object + where_am_i_subtask = CheckArgsToolCallSubTask( + expected_tool_name="where_am_i", + expected_args={}, # No parameters expected + ) + ask_vlm_subtask = CheckArgsToolCallSubTask( + expected_tool_name="ask_vlm", + expected_args={}, + ) + + #### navigate to table, detect and pick up object + navigate_to_slot1_subtask = CheckArgsToolCallSubTask( + expected_tool_name="nav_tool", + expected_args={ + "x": 10.0, + "y": 1.5, + }, + ) + pick_up_1_subtask = CheckArgsToolCallSubTask( + expected_tool_name="pick_up_object", + expected_args={"x": 0.02, "y": 0.1, "z": 0.05}, + ) + #### navigate to the box and drop object + navigate_to_box1_subtask = CheckArgsToolCallSubTask( + expected_tool_name="nav_tool", + expected_args={ + "x": 3.0, + "y": 5.0, + }, + ) + drop_subtask_1 = CheckArgsToolCallSubTask( + expected_tool_name="drop_object", + expected_args={"x": 0.2, "y": 0, "z": 0.05}, + ) + + #### navigate to the table and pick up second object + navigate_to_slot2_subtask = CheckArgsToolCallSubTask( + expected_tool_name="nav_tool", + expected_args={ + "x": 10.0, + "y": 3.0, + }, + ) + # there was no green or blue object so navigate to the next slot + navigate_to_slot3_subtask = CheckArgsToolCallSubTask( + expected_tool_name="nav_tool", + expected_args={ + "x": 10.0, + "y": 4.5, + }, + ) + pick_up_3_subtask = CheckArgsToolCallSubTask( + expected_tool_name="pick_up_object", + expected_args={"x": 0.1, "y": 0.4, "z": 0.05}, + ) + + #### navigate to the 2nd box and drop + navigate_to_box2_subtask = CheckArgsToolCallSubTask( + expected_tool_name="nav_tool", + expected_args={ + "x": 5.0, + "y": 5.0, + }, + ) + drop_subtask_2 = CheckArgsToolCallSubTask( + expected_tool_name="drop_object", + expected_args={"x": 0.1, "y": -0.05, "z": 0.05}, + ) + #### navigate to 4th slot and check for object, its empty so end the task + navigate_to_slot4_subtask = CheckArgsToolCallSubTask( + expected_tool_name="nav_tool", + expected_args={ + "x": 10.0, + "y": 3.0, + }, + ) + validators = [ + #### navigate to slot1, detect and pick up 1st object + OrderedCallsValidator( + subtasks=[ + navigate_to_slot1_subtask, + where_am_i_subtask, + ask_vlm_subtask, + pick_up_1_subtask, + ] + ), + #### navigate to the box1 and drop object + OrderedCallsValidator( + subtasks=[ + navigate_to_box1_subtask, + where_am_i_subtask, + ask_vlm_subtask, + drop_subtask_1, + ] + ), + #### navigate to slot2, detect - there is no blue or green obj + # so navigate to slot3, detect and pick up + OrderedCallsValidator( + subtasks=[ + navigate_to_slot2_subtask, + where_am_i_subtask, + ask_vlm_subtask, + navigate_to_slot3_subtask, + where_am_i_subtask, + ask_vlm_subtask, + pick_up_3_subtask, + ] + ), + #### navigate to the 2nd box and drop + OrderedCallsValidator( + subtasks=[ + navigate_to_box2_subtask, + where_am_i_subtask, + ask_vlm_subtask, + drop_subtask_2, + ] + ), + #### navigate to 4th slot and check for object, its empty so end the task + OrderedCallsValidator( + subtasks=[ + navigate_to_slot4_subtask, + where_am_i_subtask, + ask_vlm_subtask, + ] + ), + ] + super().__init__(validators=validators, task_args=task_args, **kwargs) + self.env_state = EnvStateManager() + + # define tools + @tool + def nav_tool(x: float, y: float): + """Navigate to certain coordinates in the warehouse.""" + self.env_state.set_position(x, y) + return ( + f"Navigating to x: {x}, y: {y} ...\n" + "Check you current position to ensure if movement was done properly" + ) + + @tool + def where_am_i() -> Dict[str, float]: + """Returns your current position""" + x, y = self.env_state.get_position() + return {"x": x, "y": y} + + @tool + def pick_up_object(x: float, y: float, z: float) -> str: + """Move gripper and close it to pick up object from a certain coordinates relative to you""" + held_obj = self.env_state.get_held_object() + if not held_obj: + obj_id = self.env_state.pick_up_object_at_position((x, y, z)) + if obj_id: + obj_color = self.env_state._objects[obj_id]["color"] + return f"Successfully picked up {obj_color} object ({obj_id}) at relative position x: {x}, y: {y}, z: {z}" + else: + return f"No object grabbed successfully at relative position x: {x}, y: {y}, z: {z}" + else: + return f"Can't perform pick up action as you are already holding an {held_obj} object." + + @tool + def drop_object(x: float, y: float, z: float) -> str: + """Move gripper and open it to drop object at a certain coordinates relative to you""" + held_obj = self.env_state.get_held_object() + if not held_obj: + return "Failed to drop - you are not holding any object." + else: + self.env_state.drop_object_at_position((x, y, z)) + return f"Successfully dropped object ({held_obj}) at relative position x: {x}, y: {y}, z: {z}" + + @tool + def ask_vlm() -> str: + """Ask VLM to detect objects at your current location and return their coordinates relative to you""" + visible_objects = self.env_state.get_visible_objects_at_position() + visible_boxes = self.env_state.get_visible_boxes_at_position() + + current_pos = self.env_state.get_position() + x, y = current_pos + + # Generate response based on what's actually visible + responses = [] + + if visible_objects: + for obj in visible_objects: + rel_pos = obj["relative_position"] + responses.append( + f"I see a {obj['color']} object at x: {rel_pos[0]}, y: {rel_pos[1]}, z: {rel_pos[2]} relative to you" + ) + + if visible_boxes: + for box in visible_boxes: + rel_pos = box["relative_position"] + box_num = "1" if "box_1" in box["id"] else "2" + contents_str = ( + f" (contains: {', '.join(box['contents'])} objects)" + if box["contents"] + else " (empty)" + ) + responses.append( + f"I see Box {box_num} at x: {rel_pos[0]}, y: {rel_pos[1]}, z: {rel_pos[2]} relative to you{contents_str}" + ) + + if not responses: + # Check what area we're in for context + if abs(x - 10.5) < 0.5: # At sorting table + slot_num = None + if 1 <= y <= 2: + slot_num = 1 + elif 2.5 <= y <= 3.5: + slot_num = 2 + elif 4 <= y <= 5: + slot_num = 3 + elif 5.5 <= y <= 6.5: + slot_num = 4 + + if slot_num: + return f"I see Slot {slot_num}, but it appears to be empty." + + elif 2 <= x <= 6 and abs(y - 5.5) < 0.5: # At storage rack + return "I see the storage rack area, but no objects or boxes are immediately visible from this position." + + return "I don't see any relevant objects here." + + return " ".join(responses) + + @tool + def get_slot_position(slot_id: str) -> Dict[str, float]: + """Returns the world position of a slot, pass in slot id as integer, for example 1""" + + x, y = self.env_state._slots[f"{slot_id}"]["world_position"] + return {"x": x, "y": y} + + @tool + def get_default_box_position() -> Dict[str, float]: + """Returns the world position of the default box""" + x, y = self.env_state._boxes["default"]["world_position"] + return {"x": x, "y": y} + + self.nav_tool = nav_tool + self.where_am_i = where_am_i + self.pick_up_object = pick_up_object + self.drop_object = drop_object + self.ask_vlm = ask_vlm + + self.nav_tool = nav_tool + self.where_am_i = where_am_i + self.pick_up_object = pick_up_object + self.drop_object = drop_object + self.ask_vlm = ask_vlm + self.get_slot_position = get_slot_position + self.get_default_box_position = get_default_box_position + + @property + def optional_tool_calls_number(self) -> int: + return 10 + + def get_base_prompt(self) -> str: + return ( + "Sort blue and green objects from slots to separate boxes on the rack. " + "Blue objects should go to the 1st box (x: 3.0, y: 5.0), green objects should go to the second box (x: 5.0, y: 5.0). " + "Check the slots in order. If you checked all of them and sorted all blue and green objects the task is done." + ) + + def get_prompt(self) -> str: + return self.get_base_prompt() + + def get_planning_prompt(self) -> str: + """ + Planning prompt help generate summary info for high level + task planning to undrestand the overall progress. + """ + return """ +Determine success and provide brief explanation of what happened by slot, +for example Slot 1: Object color: BLUE, actions: [NAVIGATED to SLOT->CHECKED OBJECTS->PICKED up OBJECT->NAVIGATED to BOX ->DROPPED OBJECT->COMPLETED]. +Mark a slot as COMPLETED only if object from this slot was dropped. +If the slot doesn't contain the right object, for example Slot 2: Object color: RED, actions: [NAVIGATED to SLOT->CHECKED OBJECTS->NOT THE RIGHT COLOR->NOTHING TO DO->COMPLETED]. +""" + + def manipulation_tools(self) -> List[BaseTool]: + return [ + self.pick_up_object, + self.drop_object, + self.ask_vlm, + ] + + def navigation_tools(self) -> List[BaseTool]: + return [ + self.nav_tool, + self.where_am_i, + ] + + @property + def available_tools(self) -> List[BaseTool]: + return [ + self.nav_tool, + self.where_am_i, + self.pick_up_object, + self.drop_object, + self.ask_vlm, + ] + + def get_system_prompt(self) -> str: + return SYSTEM_PROMPT + "\n" + WAREHOUSE_ENVIRONMENT_DESCRIPTION + + def report_sorting_status(self): + print("** Reporting sorting status") + self.env_state.report_sorting_status() diff --git a/src/rai_bench/rai_bench/utils.py b/src/rai_bench/rai_bench/utils.py index e1150082c..79e0e0f51 100644 --- a/src/rai_bench/rai_bench/utils.py +++ b/src/rai_bench/rai_bench/utils.py @@ -123,12 +123,12 @@ def parse_manipulation_o3de_benchmark_args(): return parser.parse_args() -def define_benchmark_logger(out_dir: Path) -> logging.Logger: +def define_benchmark_logger(out_dir: Path, level: int = logging.INFO) -> logging.Logger: log_file = out_dir / "benchmark.log" out_dir.mkdir(parents=True, exist_ok=True) file_handler = logging.FileHandler(log_file) - file_handler.setLevel(logging.INFO) + file_handler.setLevel(level) formatter = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) @@ -137,7 +137,7 @@ def define_benchmark_logger(out_dir: Path) -> logging.Logger: bench_logger = logging.getLogger("Benchmark logger") for handler in bench_logger.handlers: bench_logger.removeHandler(handler) - bench_logger.setLevel(logging.INFO) + bench_logger.setLevel(level) bench_logger.addHandler(file_handler) return bench_logger diff --git a/src/rai_core/rai/agents/__init__.py b/src/rai_core/rai/agents/__init__.py index f439c7be8..ef4bc749e 100644 --- a/src/rai_core/rai/agents/__init__.py +++ b/src/rai_core/rai/agents/__init__.py @@ -13,7 +13,10 @@ # limitations under the License. from rai.agents.base import BaseAgent -from rai.agents.langchain import BaseStateBasedAgent, ReActAgent +from rai.agents.langchain import ( + BaseStateBasedAgent, + ReActAgent, +) from rai.agents.runner import AgentRunner, wait_for_shutdown __all__ = [ diff --git a/src/rai_core/rai/agents/langchain/core/__init__.py b/src/rai_core/rai/agents/langchain/core/__init__.py index ea6cb46bf..87c7b39bc 100644 --- a/src/rai_core/rai/agents/langchain/core/__init__.py +++ b/src/rai_core/rai/agents/langchain/core/__init__.py @@ -14,18 +14,23 @@ from .conversational_agent import State as ConversationalAgentState from .conversational_agent import create_conversational_agent +from .megamind import Executor, create_megamind, get_initial_megamind_state from .react_agent import ( ReActAgentState, create_react_runnable, ) from .state_based_agent import create_state_based_runnable -from .tool_runner import ToolRunner +from .tool_runner import SubAgentToolRunner, ToolRunner __all__ = [ "ConversationalAgentState", + "Executor", "ReActAgentState", + "SubAgentToolRunner", "ToolRunner", "create_conversational_agent", + "create_megamind", "create_react_runnable", "create_state_based_runnable", + "get_initial_megamind_state", ] diff --git a/src/rai_core/rai/agents/langchain/core/megamind.py b/src/rai_core/rai/agents/langchain/core/megamind.py new file mode 100644 index 000000000..68a5b40bb --- /dev/null +++ b/src/rai_core/rai/agents/langchain/core/megamind.py @@ -0,0 +1,305 @@ +# Copyright (C) 2025 Robotec.AI +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +### NOTE (jmatejcz) this agent is still in process of testing and refining +from dataclasses import dataclass +from functools import partial +from typing import ( + Annotated, + List, + Optional, +) + +from langchain.chat_models.base import BaseChatModel +from langchain_core.messages import ( + BaseMessage, + HumanMessage, + SystemMessage, +) +from langchain_core.tools import BaseTool, InjectedToolCallId, tool +from langgraph.graph import END, START, MessagesState, StateGraph +from langgraph.graph.state import CompiledStateGraph +from langgraph.prebuilt import create_react_agent +from langgraph.types import Command +from pydantic import BaseModel, Field + +from rai.agents.langchain.core.tool_runner import SubAgentToolRunner +from rai.messages import ( + HumanMultimodalMessage, +) + + +class StepSuccess(BaseModel): + """Output of success attacher""" + + success: bool = Field(description="Whether the task was completed successfully") + explanation: str = Field(description="Explanation of what happened") + + +class MegamindState(MessagesState): + original_task: str + steps_done: List[str] + step: Optional[str] + step_success: StepSuccess + step_messages: List[BaseMessage] + + +def llm_node( + llm: BaseChatModel, + system_prompt: Optional[str], + state: MegamindState, +) -> MegamindState: + """Process messages using the LLM - returns the agent's response.""" + messages = state["step_messages"].copy() + if not state["step"]: + raise ValueError("Step should be defined at this point") + if system_prompt: + messages.insert(0, HumanMessage(state["step"])) + messages.insert(0, SystemMessage(content=system_prompt)) + + ai_msg = llm.invoke(messages) + # append to both + state["step_messages"].append(ai_msg) + state["messages"].append(ai_msg) + return state + + +def analyzer_node( + llm: BaseChatModel, + planning_prompt: Optional[str], + state: MegamindState, +) -> MegamindState: + """Analyze the conversation and return structured output.""" + if not planning_prompt: + planning_prompt = "" + analyzer = llm.with_structured_output(StepSuccess) + analysis = analyzer.invoke( + [ + SystemMessage( + content=f""" +Analyze if this task was completed successfully: + +Task: {state["step"]} + +{planning_prompt} +Below you have messages of agent doing the task:""" + ), + *state["step_messages"], + ] + ) + state["step_success"] = StepSuccess( + success=analysis.success, explanation=analysis.explanation + ) + state["steps_done"].append(f"{state['step_success'].explanation}") + return state + + +def should_continue_or_structure(state: MegamindState) -> str: + """Decide whether to continue with tools or return structured output.""" + last_message = state["step_messages"][-1] + + # If AI message has tool calls, continue to tools + if hasattr(last_message, "tool_calls") and last_message.tool_calls: + return "tools" + + # Otherwise, return structured output + return "structured_output" + + +def create_react_structured_agent( + llm: BaseChatModel, + tools: Optional[List[BaseTool]] = None, + system_prompt: Optional[str] = None, + planning_prompt: Optional[str] = None, +) -> CompiledStateGraph: + """Create a react agent that returns structured output.""" + + graph = StateGraph(MegamindState) + graph.add_edge(START, "llm") + + if tools: + tool_runner = SubAgentToolRunner(tools) + graph.add_node("tools", tool_runner) + + bound_llm = llm.bind_tools(tools) + graph.add_node("llm", partial(llm_node, bound_llm, system_prompt)) + + graph.add_node( + "structured_output", partial(analyzer_node, llm, planning_prompt) + ) + + graph.add_conditional_edges( + "llm", + should_continue_or_structure, + {"tools": "tools", "structured_output": "structured_output"}, + ) + graph.add_edge("tools", "llm") + graph.add_edge("structured_output", END) + else: + graph.add_node("llm", partial(llm_node, llm, system_prompt)) + graph.add_node( + "structured_output", partial(analyzer_node, llm, planning_prompt) + ) + graph.add_edge("llm", "structured_output") + graph.add_edge("structured_output", END) + + return graph.compile() + + +def create_handoff_tool(agent_name: str, description: str = None): + """Create a handoff tool for transferring tasks to specialist agents.""" + name = f"transfer_to_{agent_name}" + description = description or f" {agent_name} for help." + + @tool(name, description=description) + def handoff_tool( + task_instruction: str, # The specific task for the agent + tool_call_id: Annotated[str, InjectedToolCallId], + ) -> Command: + return Command( + goto=agent_name, + # Send only the task message to the specialist agent, not the full history + update={"step": task_instruction, "step_messages": []}, + graph=Command.PARENT, + ) + + return handoff_tool + + +@dataclass +class Executor: + name: str + llm: BaseChatModel + tools: List[BaseTool] + system_prompt: str + + +def get_initial_megamind_state(task: str): + return MegamindState( + { + "original_task": task, + "messages": [HumanMultimodalMessage(content=task)], + "step": "", + "steps_done": [], + "step_success": StepSuccess(success=False, explanation=""), + "step_messages": [], + } + ) + + +def plan_step(megamind_agent: BaseChatModel, state: MegamindState) -> MegamindState: + """Initial planning step.""" + if "original_task" not in state: + state["original_task"] = state["messages"][0].content[0]["text"] + if "steps_done" not in state: + state["steps_done"] = [] + if "step" not in state: + state["step"] = None + + megamind_prompt = f"You are given objective to complete: {state['original_task']}" + if state["steps_done"]: + megamind_prompt += "\n\n" + megamind_prompt += "Steps that were already done successfully:\n" + steps_done = "\n".join( + [f"{i + 1}. {step}" for i, step in enumerate(state["steps_done"])] + ) + megamind_prompt += steps_done + megamind_prompt += "\n" + + if state["step"]: + if not state["step_success"]: + raise ValueError("Step success should be specified at this point") + + megamind_prompt += "\nBased on that outcome and past steps come up with the next step and delegate it to selected agent." + + else: + megamind_prompt += "\n" + megamind_prompt += ( + "Come up with the fist step and delegate it to selected agent." + ) + + megamind_prompt += "\n\n" + megamind_prompt += ( + "When you decide that the objective is completed return response to user." + ) + messages = [ + HumanMultimodalMessage(content=megamind_prompt), + ] + # NOTE (jmatejcz) the response of megamind isnt appended to messages + # as Command from handoff instantly transitions to next node + megamind_agent.invoke({"messages": messages}) + return state + + +def create_megamind( + megamind_llm: BaseChatModel, + megamind_system_prompt: str, + executors: List[Executor], + task_planning_prompt: Optional[str] = None, +) -> CompiledStateGraph: + """Create a megamind langchain agent + + Args: + executors (List[Executor]): Subagents for megamind, each can be a specialist with + its own tools llm and system prompt + task_planning_prompt (Optional[str]): Prompt that helps summarize the step in a way + that helps planning task + """ + executor_agents = {} + handoff_tools = [] + for executor in executors: + executor_agents[executor.name] = create_react_structured_agent( + llm=executor.llm, + tools=executor.tools, + system_prompt=executor.system_prompt, + planning_prompt=task_planning_prompt, + ) + + handoff_tools.append( + create_handoff_tool( + agent_name=executor.name, + description=f"Assign task to {executor.name} agent.", + ) + ) + if not megamind_system_prompt: + # make a generic system prompt that list executors and their tools + specialists_info = [] + for executor in executors: + tool_names = [tool.name for tool in executor.tools] + tool_list = ", ".join(tool_names) + specialists_info.append(f"- {executor.name}: Available tools: {tool_list}") + + specialists_section = "\n".join(specialists_info) + megamind_system_prompt = f"""You manage specialists to whom you will delegate tasks to complete objective. +Available specialists and their capabilities: +{specialists_section} + +The single task should be delegated to only 1 agent and should be doable by only 1 agent.""" + + megamind_agent = create_react_agent( + megamind_llm, + tools=handoff_tools, + prompt=megamind_system_prompt, + name="megamind", + ) + + graph = StateGraph(MegamindState).add_node( + "megamind", partial(plan_step, megamind_agent) + ) + for agent_name, agent in executor_agents.items(): + graph.add_node(agent_name, agent) + graph.add_edge(agent_name, "megamind") + + graph.add_edge(START, "megamind") + return graph.compile() diff --git a/src/rai_core/rai/agents/langchain/core/plan_agent.py b/src/rai_core/rai/agents/langchain/core/plan_agent.py new file mode 100644 index 000000000..e01247034 --- /dev/null +++ b/src/rai_core/rai/agents/langchain/core/plan_agent.py @@ -0,0 +1,275 @@ +# Copyright (C) 2025 Robotec.AI +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import Any, Dict, List, Optional, Tuple, Union + +from langchain.chat_models.base import BaseChatModel +from langchain_core.messages import BaseMessage, SystemMessage +from langchain_core.tools import BaseTool +from langgraph.graph import END, START, StateGraph +from langgraph.graph.state import CompiledStateGraph +from pydantic import BaseModel, Field + +from rai.agents.langchain.core import ReActAgentState +from rai.agents.langchain.core.react_agent import create_react_runnable +from rai.initialization import get_llm_model +from rai.messages import HumanMultimodalMessage + + +class Plan(BaseModel): + """A plan to help solve a user request.""" + + steps: List[str] = Field( + description="different steps to follow, should be in sorted order" + ) + + +class Response(BaseModel): + """Response to user.""" + + response: str + + +class Act(BaseModel): + """Action to take.""" + + action: Union[Response, Plan] = Field( + description="Action to perform. If you want to respond to user, use Response. " + "If you need to further use tools to get the answer, use Plan." + ) + + +class PlanExecuteState(ReActAgentState): + """State for the plan and execute agent.""" + + # NOTE (jmatejcz) should original_task be replaced with + # passing first message? The message can contain images etc. + original_task: str + plan: List[str] + past_steps: List[Tuple[str, str]] + response: str + + +def should_end(state: PlanExecuteState) -> str: + """Check if we should end or continue planning.""" + if state["response"]: + return END + else: + return "agent" + + +def create_plan_execute_agent( + tools: List[BaseTool], + planner_llm: Optional[BaseChatModel] = None, + executor_llm: Optional[BaseChatModel] = None, + replanner_llm: Optional[BaseChatModel] = None, + system_prompt: Optional[str] = None, +) -> CompiledStateGraph: + """Create a plan and execute agent that can break down complex tasks into steps. + + Parameters + ---------- + tools : List[BaseTool] + List of tools the agent can use during execution + llm : Optional[BaseChatModel], default=None + Language model to use. If None, will use complex_model from config + system_prompt : Optional[str | SystemMultimodalMessage], default=None + System prompt to use (currently not used in this implementation) + + Returns + ------- + CompiledStateGraph + Compiled state graph for the plan and execute agent + + Raises + ------ + ValueError + If tools are not provided or invalid + """ + if planner_llm is None: + planner_llm = get_llm_model("complex_model", streaming=True) + if executor_llm is None: + executor_llm = get_llm_model("complex_model", streaming=True) + if replanner_llm is None: + replanner_llm = get_llm_model("complex_model", streaming=True) + + if not tools: + raise ValueError("Tools must be provided for plan and execute agent") + if system_prompt is None: + system_prompt = "" + + planner_prompt = """For the given objective, come up with a simple step by step plan. + +When creating your plan: +- Design each step to leverage the most appropriate tool from the list above +- Be specific about what information each step should gather or what action it should perform +- Frame steps as clear instructions that can be executed using the available tools +- Do NOT actually call or use any tools yourself - only create the plan +- Each step should be actionable and tool-appropriate + +This plan should involve individual tasks, that if executed correctly will yield the correct answer. +Do not add any superfluous steps. The result of the final step should be the final answer. +Make sure that each step has all the information needed - do not skip steps.""" + + agent_executor = create_react_runnable( + llm=executor_llm, system_prompt=system_prompt, tools=tools + ) + # the prompt will be filled with values when passed to invoke + planner_llm_with_tools = planner_llm.bind_tools(tools) + planner = planner_llm_with_tools.with_structured_output(Plan) # type: ignore + replanner = replanner_llm.with_structured_output(Act) # type: ignore + + def execute_step(state: PlanExecuteState): + """Execute the current step of the plan.""" + + plan = state["plan"] + if not plan: + return {} + task = plan[0] + task_formatted = f"""You are tasked with executing task: {task}.""" + + agent_response = agent_executor.invoke( + {"messages": [HumanMultimodalMessage(content=task_formatted)]}, + config={"recursion_limit": 50}, + ) + return { + "past_steps": [(task, agent_response["messages"][-1].content)], + } + + def plan_step(state: PlanExecuteState): + """Initial planning step.""" + messages = [ + SystemMessage(content=system_prompt + "\n" + planner_prompt), + HumanMultimodalMessage(content=state["original_task"]), + ] + plan = planner.invoke(messages) + return {"plan": plan.steps} + + def replan_step(state: PlanExecuteState): + """Replan based on execution results.""" + # Format past steps for the prompt + past_steps_str = "\n".join( + [ + f"{step}: {result}" + for i, (step, result) in enumerate(state["past_steps"]) + ] + ) + + # Format remaining plan + plan_str = "\n".join([step for i, step in enumerate(state["plan"])]) + + replanner_prompt = f"""For the given objective, come up with a simple step by step plan. +This plan should involve individual tasks, that if executed correctly will yield the correct answer. +Do not add any superfluous steps. The result of the final step should be the final answer. +Make sure that each step has all the information needed - do not skip steps. + +Your objective was this: +{state["original_task"]} + +Your current plan is: +{plan_str} + +You have currently done the following steps: +{past_steps_str} + +Update your plan accordingly if needed. If no more steps are needed and you can return to the user, then respond with that. Otherwise, fill out the plan. Only add steps to the plan that still NEED to be done. Do not return previously done steps as part of the plan.""" + + messages = [ + SystemMessage(content=system_prompt), + HumanMultimodalMessage(content=replanner_prompt), + ] + output = replanner.invoke(messages) + + if isinstance(output.action, Response): + return {"response": output.action.response} + else: + return {"plan": output.action.steps} + + workflow = StateGraph(PlanExecuteState) + + workflow.add_node("planner", plan_step) + workflow.add_node("agent", execute_step) + workflow.add_node("replan", replan_step) + + workflow.add_edge(START, "planner") + # From plan we go to agent + workflow.add_edge("planner", "agent") + # From agent, we replan + workflow.add_edge("agent", "replan") + + workflow.add_conditional_edges( + "replan", + should_end, + ["agent", END], + ) + + return workflow.compile() + + +def create_initial_plan_execute_state( + original_task: str, + messages: Optional[List[BaseMessage]] = None, +) -> PlanExecuteState: + """Create initial state for the plan and execute agent. + + Parameters + ---------- + input_text : str + The user's input/objective to accomplish + messages : Optional[List[BaseMessage]], default=None + Initial messages for the conversation + + Returns + ------- + PlanExecuteState + Initial state for the agent + """ + if messages is None: + messages = [] + + return PlanExecuteState( + messages=messages, + original_task=original_task, + plan=[], + past_steps=[], + response="", + ) + + +def run_plan_execute_agent( + agent: CompiledStateGraph, + original_task: str, + config: Optional[Dict[str, Any]] = None, +) -> Dict[str, Any]: + """Run the plan and execute agent on a given input. + + Parameters + ---------- + agent : CompiledStateGraph + The compiled plan and execute agent + input_text : str + The user's input/objective + config : Optional[Dict[str, Any]], default=None + Configuration for the agent execution + + Returns + ------- + Dict[str, Any] + Final state after execution + """ + initial_state = create_initial_plan_execute_state(original_task) + + # Execute the agent + result = agent.invoke(initial_state, config=config) + + return result diff --git a/src/rai_core/rai/agents/langchain/core/react_agent.py b/src/rai_core/rai/agents/langchain/core/react_agent.py index d49fd4617..34424a84e 100644 --- a/src/rai_core/rai/agents/langchain/core/react_agent.py +++ b/src/rai_core/rai/agents/langchain/core/react_agent.py @@ -16,7 +16,6 @@ from typing import ( List, Optional, - TypedDict, cast, ) @@ -26,6 +25,7 @@ from langchain_core.tools import BaseTool from langgraph.graph import START, StateGraph from langgraph.prebuilt.tool_node import tools_condition +from typing_extensions import TypedDict from rai.agents.langchain.core.tool_runner import ToolRunner from rai.initialization import get_llm_model diff --git a/src/rai_core/rai/agents/langchain/core/tool_runner.py b/src/rai_core/rai/agents/langchain/core/tool_runner.py index 156f19dac..216748e2d 100644 --- a/src/rai_core/rai/agents/langchain/core/tool_runner.py +++ b/src/rai_core/rai/agents/langchain/core/tool_runner.py @@ -47,15 +47,25 @@ def __init__( tool_ = create_tool(tool_) self.tools_by_name[tool_.name] = tool_ + def get_messages(self, input: dict[str, Any]) -> List: + """Get fields from from input that will be processed.""" + return input.get("messages", []) + + def update_input_with_outputs( + self, input: dict[str, Any], outputs: List[Any] + ) -> None: + """Update input with tool outputs.""" + input["messages"].extend(outputs) + def _func(self, input: dict[str, Any], config: RunnableConfig) -> Any: config["max_concurrency"] = ( 1 # TODO(maciejmajek): use better mechanism for task queueing ) - if messages := input.get("messages", []): - message = messages[-1] - else: + messages = self.get_messages(input) + if not messages: raise ValueError("No message found in input") + message = messages[-1] if not isinstance(message, AIMessage): raise ValueError("Last message is not an AIMessage") @@ -142,5 +152,19 @@ def run_one(call: ToolCall): # we sort the messages by type so that the tool messages are sent first # for more information see implementation of ToolMultimodalMessage.postprocess outputs.sort(key=lambda x: x.__class__.__name__, reverse=True) - input["messages"].extend(outputs) + + self.update_input_with_outputs(input, outputs) return input + + +class SubAgentToolRunner(ToolRunner): + """ToolRunner that works with 'step_messages' key used by subagents""" + + def get_messages(self, input: dict[str, Any]) -> List: + return input.get("step_messages", []) + + def update_input_with_outputs( + self, input: dict[str, Any], outputs: List[Any] + ) -> None: + input["messages"].extend(outputs) + input["step_messages"].extend(outputs) diff --git a/src/rai_sim/rai_sim/simulation_bridge.py b/src/rai_sim/rai_sim/simulation_bridge.py index a39ff2906..21c9ebc2b 100644 --- a/src/rai_sim/rai_sim/simulation_bridge.py +++ b/src/rai_sim/rai_sim/simulation_bridge.py @@ -173,9 +173,9 @@ class SimulationBridge(ABC): """ def __init__(self, logger: Optional[logging.Logger] = None): - self.spawned_entities: List[SpawnedEntity] = ( - [] - ) # list of spawned entities with their initial poses + self.spawned_entities: List[ + SpawnedEntity + ] = [] # list of spawned entities with their initial poses if logger is None: self.logger = logging.getLogger(__name__) else: