In [None]:
import os
import sys

import nest_asyncio

# Enable nested event loops for Jupyter notebooks
nest_asyncio.apply()

# Change to project root directory
project_root = "/home/pkotowski/AMD-RosCon2025-Demo/rai"
# os.chdir(project_root)

# Set ROS2 environment variables
os.environ.update(
    {
        "ROS_LOCALHOST_ONLY": "1",
        "ROS_DISTRO": "humble",
        "ROS_PYTHON_VERSION": "3",
        "ROS_VERSION": "2",
        "COLCON_PREFIX_PATH": f"{project_root}/install",
        "AMENT_PREFIX_PATH": f"{project_root}/install/robotic_manipulation:{project_root}/install/rai_open_set_vision:{project_root}/install/rai_nomad:{project_root}/install/rai_interfaces:{project_root}/install/rai_bringup:{project_root}/install/led_strip:/opt/ros/humble",
    }
)

# Add ROS2 system paths (equivalent to sourcing /opt/ros/humble)
sys.path.insert(0, "/opt/ros/humble/lib/python3.10/site-packages")
sys.path.insert(0, "/opt/ros/humble/local/lib/python3.10/dist-packages")

# Add RAI package paths
sys.path.insert(0, f"{project_root}/src/rai_core")
sys.path.insert(0, f"{project_root}/src/rai_sim")
sys.path.insert(0, f"{project_root}/src/rai_s2s")
sys.path.insert(0, f"{project_root}/src/rai_bench")
sys.path.insert(0, f"{project_root}/src/rai_whoami")

# Add ROS2 extension packages
sys.path.insert(0, f"{project_root}/build/rai_open_set_vision")
sys.path.insert(0, f"{project_root}/src/rai_extensions/rai_open_set_vision")
sys.path.insert(0, f"{project_root}/src/rai_extensions/rai_nomad")


print("✅ RAI Framework setup complete!")
print(f"Working directory: {os.getcwd()}")
print(f"ROS_DISTRO: {os.environ.get('ROS_DISTRO')}")

In [None]:
import logging

# Test the new Python interpreter with tools mechanism
import sys
from typing import List

import rclpy
import rclpy.qos
from langchain_core.messages import BaseMessage, HumanMessage
from langchain_core.tools import BaseTool
from rai import get_llm_model
from rai.communication.ros2 import wait_for_ros2_services, wait_for_ros2_topics
from rai.communication.ros2.connectors import ROS2Connector
from rai.tools.ros2.manipulation import (
    GetObjectPositionsTool,
    MoveObjectFromToTool,
    ResetArmTool,
)
from rai_open_set_vision.tools import GetGrabbingPointTool

sys.path.insert(0, "src/rai_core")

from rai.tools.python_interpreter_with_tools import (
    PythonInterpreterWithTools,
    create_coding_agent_with_tools,
)

logger = logging.getLogger(__name__)

rclpy.init()
connector = ROS2Connector(executor_type="single_threaded")

required_services = ["/grounded_sam_segment", "/grounding_dino_classify"]
required_topics = ["/color_image5", "/depth_image5", "/color_camera_info5"]
wait_for_ros2_services(connector, required_services)
wait_for_ros2_topics(connector, required_topics)

node = connector.node
node.declare_parameter("conversion_ratio", 1.0)

print("Testing Python interpreter with tools...")

# Create agent with manipulation tools exposed

In [None]:
def format_pose(pose):
    return (
        f"(x={pose.position.x:.2f}, y={pose.position.y:.2f}, z={pose.position.z:.2f})"
    )


GetObjectPositionsTool.format_pose = staticmethod(format_pose)

tools: List[BaseTool] = [
    GetObjectPositionsTool(
        connector=connector,
        target_frame="panda_link0",
        source_frame="RGBDCamera5",
        camera_topic="/color_image5",
        depth_topic="/depth_image5",
        camera_info_topic="/color_camera_info5",
        get_grabbing_point_tool=GetGrabbingPointTool(connector=connector),
    ),
    MoveObjectFromToTool(connector=connector, manipulator_frame="panda_link0"),
    ResetArmTool(connector=connector, manipulator_frame="panda_link0"),
    # GetROS2ImageConfiguredTool(connector=connector, topic="/color_image5"),
]
llm = get_llm_model(model_type="complex_model", streaming=True, vendor="anthropic")

coding_agent, system_prompt = create_coding_agent_with_tools(
    tools=tools,  # Use the tools from previous cells
    llm=llm,
    embodiment_prompt="""You are a helpful robotic manipulation assistant. You are given a task to manipulate objects in a warehouse. You have access to the following tools: GetObjectPositionsTool, MoveObjectFromToTool, ResetArmTool. You can use the tools to get the positions of the objects, move the objects, and reset the arm.

Objects in the scene: cube (4), apple (2), carrot (1), maize (1)

The table is a rectangle with the following dimensions:
- length (x dim) = 0.4 m
- width (y dim) = 1m

The table's available area is at:
- x > 0.2 and x < 0.6
- y > -0.5 and y < 0.5 (this is the width of the table)

When I say move forward/backward, you move along the x axis.
When I say move left/right, you move along the y axis.
When I say move up/down, you move along the z axis.

Rules:
- Always make sure you do not put one object on top of another object unless it is necessary
- Always obtain current positions of the objects and calculate the target positions of the objects before moving them
- Always put obtained positions in variables and use them in the code
- Never use hardcoded positions in the code
- Always use `move_object_from_to` function to move the object from the source position to the target position.
- Always use detected objects positions directly to grab them. Do not add any offset to the objects positions
- Always check objects positions after each move action to verify if the object is in the correct position
""",
)

print("✅ Coding agent created successfully!")

In [None]:
print(system_prompt)

In [None]:
# Test the agent with a simple request


from langchain_core.messages import AIMessage

messages: List[BaseMessage] = []

while True:
    prompt = input("Enter a prompt: ")
    if prompt in ["exit", "quit", "q"]:
        break
    messages.append(HumanMessage(content=prompt))
    for chunk in coding_agent.stream(
        {"messages": messages}, stream_mode=["messages", "updates"]
    ):
        stream_mode = chunk[0]
        if stream_mode == "messages":
            messages_chunk, metadata = chunk[1]
            if isinstance(messages_chunk.content, str):
                print(messages_chunk.content, end="", flush=True)
            elif isinstance(messages_chunk.content, dict):
                print(messages_chunk.content["text"], end="", flush=True)
        elif stream_mode == "updates":
            if "thinker" in chunk[1]:
                message = chunk[1]["thinker"]["messages"][-1]
                if isinstance(message, AIMessage) and message.tool_calls:
                    for tool_call in message.tool_calls:
                        if tool_call["name"] == "python_interpreter":
                            print("\n-------------------------------")
                            print("Running code:")
                            print(tool_call["args"]["code"])
                            print("\n--------------------------------")
                        else:
                            print(f"Unknown tool: {tool_call['name']}")
            # elif "tools" in chunk[1]:
            #     message = chunk[1]["tools"]["messages"][-1]
            #     if isinstance(message, ToolMessage):
            #         print("\n--------------------------------")
            #         print("Tool result:")
            #         print(message.content)
            #         print("\n--------------------------------")

In [None]:
tools[2]._run()

In [None]:
tools[0]._run("cube")

In [None]:
tools[0]._run("cube")

In [None]:
python_tool = PythonInterpreterWithTools(tools=tools)