#### Install the `nimbro_api` and `nimbro_api_interfaces` packages:

**1.:** Clone both repositories to the `src` folder of your ROS2 workspace.

**2.:** `colcon build --packages-select nimbro_api nimbro_api_interfaces --symlink-install`

**3.:** `source install/local_setup.bash` 

## Install API keys

#### Save the API keys you want to use to an environment variable:

OpenAI: `echo "export OPENAI_API_KEY='yourkey'" >> ~/.bashrc`

Mistral: `echo "export MISTRAL_API_KEY='yourkey'" >> ~/.bashrc`

OpenRouter: `echo "export OPENROUTER_API_KEY='yourkey'" >> ~/.bashrc`

vLLM: `echo "export VLLM_API_KEY='yourkey'" >> ~/.bashrc`

## Launch

#### Launch multiple completions nodes along with a completions_multiplexer node in your ROS2 environment. Each completions node represents a distinct LLM with its own context and set of model parameters, defined tools, etc. The completions_multiplexer node serves as a central communication hub, managing allocation and relaying interactions between the completions nodes and the user.

`ros2 launch nimbro_api launch.py` # 7 completions + completions_multiplexer + usage_monitor + images + speech + embeddings

In [1]:
import rclpy
from rclpy.node import Node
from nimbro_api.utils.string import levenshtein_match
from nimbro_api.api_director import ApiDirector
from nimbro_api.agent.tools import ToolBase, AgentBase, ToolLog, ToolTerminate, ToolTime, ToolWeather, ToolAccomplish, ToolSpeak
from nimbro_api.utils.node import spin_node_with_multi_threaded_executor

In [2]:
class RosNode(Node):
    def __init__(self):
        super().__init__('ros_node')
        self.api_director = ApiDirector(self)
rclpy.init()
r = RosNode()

In [3]:
class ToolSpeakNaive(ToolBase):

    def __init__(self, parent, name="speak"):
        super().__init__(parent, name)
        self.set_arguments([('person', str), ('text', str), ('requires_answer', bool)])

    def define(self):
        tool = {
            'name': "speak",
            'description': "Speak to a person",
            'parameters': {
                'type': 'object',
                'properties': {
                    'person': {
                        'type': "string",
                        'description': "Specifies the person to speak to. Pass 'everyone' to address everyone in the robot's vicinity, rather than a specific person"    
                    },
                    "text": {
                        'type': "string",
                        'description': "Specifies the text to be said. Be friendly, concise, and helpful"
                    },
                    "requires_answer": {
                        'type': "boolean",
                        'description': "Signals that the spoken text requires an answer and makes the robot wait for it. The answer will then be returned with the response to this function call"
                    }
                },
                'required': ['person', "text", "requires_answer"]
            }
        }
        return tool

    def implement(self, args):
        person, text, requires_answer = self.retrieve_args(args)
        
        self._node.get_logger().info(self._logger_prefix + text)

        valid = True
        success = True
        print(f"Robot: {args['text']}")
        
        if requires_answer:
            
            answer = input(f"{person}: ")
            message = f"The robot successfully said '{text}' to '{person}', who answered '{answer}'."
        else:
            message = f"The robot successfully said '{text}' to '{person}'."
        image = None

        return valid, success, message, image

# Test the new ToolSpeak tool

In [4]:
r.agent = AgentBase(r, completions_auto_release=False)
r.agent.configure()
r.agent.set_tools([
    ToolTerminate(r.agent),
    ToolSpeak(r.agent)
]) 
r.agent.set_task("Ask my human friend Raphael how much he weighs.")
#r.agent.set_task("Ask my human friend Raphael how old he is.")#Ask John if he wants pizza for lunch and confirm his choice.
#r.agent.set_task("Ask John if he wants pizza for lunch and confirm his choice.")#Ask John if he wants pizza for lunch and confirm his choice.

(True,
 "Successfully initialized task: 'Ask my human friend Raphael how much he weighs.'")

In [5]:
r.agent.execute()

Robot: Hey Raphael! I hope you're doing well. I was wondering if you could share how much you weigh?
Raphael: I way 1000 kg.

Robot: Wow, that sounds quite heavy! Are you sure that's your weight? Could you please clarify?
Raphael: No, I meant 100 kg.



(True,
 True,
 'Successfully asked Raphael about his weight, and he clarified that he weighs 100 kg.',
 None)

In [6]:
success, message, context = r.api_director.get_context(completions_id=r.agent._completions_id)
if success:
    for i, m in enumerate(context):
        if i == 0:
            print("---\n")
        else:
            print("\n---\n")
        if len(str(m)) > 1000:
            print("image")
        else:
            print(m)
        if i == len(context) - 1:
            print("\n---")

---

{'role': 'user', 'content': [{'type': 'text', 'text': 'Ask my human friend Raphael how much he weighs.'}]}

---

{'role': 'assistant', 'content': None, 'tool_calls': [{'type': 'function', 'id': 'call_WGz6vc7EdsUm5EL1cMRAkXui', 'function': {'name': 'speak', 'arguments': '{"person":"Raphael","communication_objective":"I would like to know how much you weigh."}'}}]}

---

{'role': 'tool', 'tool_call_id': 'call_WGz6vc7EdsUm5EL1cMRAkXui', 'content': 'I successfully asked Raphael about his weight, and he clarified that he weighs 100 kg.'}

---

{'role': 'assistant', 'content': None, 'tool_calls': [{'type': 'function', 'id': 'call_n1v3gbDHnEzoRflobQ3OElgv', 'function': {'name': 'terminate_task_completion', 'arguments': '{"success":true,"message":"Successfully asked Raphael about his weight, and he clarified that he weighs 100 kg."}'}}]}

---


In [7]:
i = [r.agent._tools[i]._name for i in range(len(r.agent._tools))].index("speak")
success, message, context = r.api_director.get_context(completions_id=r.agent._tools[i].session_id)
if success:
    for i, m in enumerate(context):
        if i == 0:
            print("---\n")
        else:
            print("\n---\n")
        if len(str(m)) > 1000:
            print("image")
        else:
            print(m)
        if i == len(context) - 1:
            print("\n---")

---

{'role': 'system', 'content': "You are a communication assistant that must accomplish a communication objective. Don't give up easily and try again and again until the objective is accomplished. Interpret the user's answers at bare hand and make sure the answers provided by the user are sensible before stopping the conversation. Don't be too pushy, if you get your answer, thank the user then stop the conversation."}

---

{'role': 'user', 'content': [{'type': 'text', 'text': "Accomplish the following communication objective directed at 'Raphael':I would like to know how much you weigh."}]}

---

{'role': 'assistant', 'content': None, 'tool_calls': [{'type': 'function', 'id': 'call_DEPXGJV6LS6CXo7Nw3K5LCiH', 'function': {'name': 'speak', 'arguments': '{"text":"Hey Raphael! I hope you\'re doing well. I was wondering if you could share how much you weigh?","requires_answer":true}'}}]}

---

{'role': 'tool', 'tool_call_id': 'call_DEPXGJV6LS6CXo7Nw3K5LCiH', 'content': "The robot succes

# Test the ToolSpeakNaive

In [8]:
class ToolSpeakNaive(ToolBase):

    def __init__(self, parent, name="speak"):
        super().__init__(parent, name)
        self.set_arguments([('person', str), ('text', str), ('requires_answer', bool)])

    def define(self):
        tool = {
            'name': "speak",
            'description': "Speak to a person",
            'parameters': {
                'type': 'object',
                'properties': {
                    'person': {
                        'type': "string",
                        'description': "Specifies the person to speak to. Pass 'everyone' to address everyone in the robot's vicinity, rather than a specific person"    
                    },
                    "text": {
                        'type': "string",
                        'description': "Specifies the text to be said. Be friendly, concise, and helpful"
                    },
                    "requires_answer": {
                        'type': "boolean",
                        'description': "Signals that the spoken text requires an answer and makes the robot wait for it. The answer will then be returned with the response to this function call"
                    }
                },
                'required': ['person', "text", "requires_answer"]
            }
        }
        return tool

    def implement(self, args):
        person, text, requires_answer = self.retrieve_args(args)
        
        self._node.get_logger().info(self._logger_prefix + text)

        valid = True
        success = True
        print(f"Robot: {args['text']}")
        
        if requires_answer:
            
            answer = input(f"{person}: ")
            message = f"The robot successfully said '{text}' to '{person}', who answered '{answer}'."
        else:
            message = f"The robot successfully said '{text}' to '{person}'."
        image = None

        return valid, success, message, image

In [9]:
r.agent.set_tools([
    ToolTerminate(r.agent),
    ToolSpeakNaive(r.agent),
])
#r.agent.set_task("Test the log tool, by telling a joke.")
# r.agent.set_task("Use the log tool to report the current time.")
#r.agent.set_task("Ask my human friend Raphael how old he is.")
#r.agent.set_task("Tell Raphael a joke.")
#r.agent.set_task("Ask John if he wants pizza for lunch and confirm his choice.")#Ask John if he wants pizza for lunch and confirm his choice.
#r.agent.set_task("Ask my human friend Raphael how old he is.")#Ask John if he wants pizza for lunch and confirm his choice.
r.agent.set_task("Ask my human friend Raphael how much he weighs.")


(True,
 "Successfully updated task: 'Ask my human friend Raphael how much he weighs.'")

In [11]:
r.agent.execute()

Robot: Hi Raphael, could you please tell me how much you weigh?
Raphael: I am 1000 kg.


(True, True, 'Raphael responded that he weighs 1000 kg.', None)

In [12]:
success, message, context = r.api_director.get_context(completions_id=r.agent._completions_id)
if success:
    for i, m in enumerate(context):
        if i == 0:
            print("---\n")
        else:
            print("\n---\n")
        if len(str(m)) > 1000:
            print("image")
        else:
            print(m)
        if i == len(context) - 1:
            print("\n---")

---

{'role': 'user', 'content': [{'type': 'text', 'text': 'Ask my human friend Raphael how much he weighs.'}]}

---

{'role': 'assistant', 'content': None, 'tool_calls': [{'type': 'function', 'id': 'call_jG3AdzvRZGY0HCtDtCBgFDAx', 'function': {'name': 'speak', 'arguments': '{"person":"Raphael","text":"Hi Raphael, could you please tell me how much you weigh?","requires_answer":true}'}}]}

---

{'role': 'tool', 'tool_call_id': 'call_jG3AdzvRZGY0HCtDtCBgFDAx', 'content': "The robot successfully said 'Hi Raphael, could you please tell me how much you weigh?' to 'Raphael', who answered 'I am 1000 kg.'."}

---

{'role': 'assistant', 'content': None, 'tool_calls': [{'type': 'function', 'id': 'call_xqyinubnBWVGs7rjKbBKeaFL', 'function': {'name': 'terminate_task_completion', 'arguments': '{"success":true,"message":"Raphael responded that he weighs 1000 kg."}'}}]}

---
