In [35]:
from openai import OpenAI
import json
import pandas as pd
import os
import random
import re
from bt_validator import validate_behavior_tree

random.seed(42)

In [36]:
data_path = "../data"
captions_path = os.path.join(data_path, "captions.json")
robot_base_actions_path = os.path.join(data_path, "robot_base_actions.json")
robot_optional_actions_path = os.path.join(data_path, "robot_optional_actions.json")

with open(captions_path, 'r') as f:
    captions_data = json.load(f)

with open(robot_base_actions_path, 'r') as f:
    robot_base_actions_data = json.load(f)

with open(robot_optional_actions_path, 'r') as f:
    robot_optional_actions_data = json.load(f)


df_captions = pd.DataFrame(captions_data)
df_robot_base_actions = pd.DataFrame(robot_base_actions_data)
df_robot_optional_actions = pd.DataFrame(robot_optional_actions_data)
df_robot_actions = pd.concat([df_robot_base_actions, df_robot_optional_actions])

display(df_captions.head())
display(df_robot_actions.head())

Unnamed: 0,0
0,a man in a white shirt and hat
1,a herd of sheep walking down the street
2,a stop sign on a street
3,a sheep grazing in a field of grass
4,a white bathroom with a sink and a toilet


Unnamed: 0,action,description,parameters
0,NavigateTo,"Moves the robot to a specified target (e.g., l...",[target]
1,Pick,Picks up a specified object.,[object]
2,Place,Places an object at a specified location.,"[object, location]"
3,LocateObject,Searches for a specified object in the environ...,[object]
4,RequestAssistance,Requests help for a specific task.,[task]


In [37]:

# Generate Object Context from captions data
def generate_object_context(captions):
    object_context = [
        f"{caption[0]} ({round(random.uniform(1, 100), 1)}m)"
        for _, caption in captions.iterrows()
    ]
    return "\n".join(object_context)

# Generate Actions Dictionary for system prompt
def generate_actions_dictionary(robot_actions):
    actions_dict = [
        f"{action['action']}: {action['description']} (Arguments: {', '.join(action['parameters'])})"
        for _, action in robot_actions.iterrows()
    ]
    return "\n".join(actions_dict)


In [38]:
captions = df_captions
base_actions = df_robot_base_actions
optional_actions = df_robot_optional_actions

actions = pd.concat([base_actions, optional_actions])

object_context = generate_object_context(captions)
actions_dictionary = generate_actions_dictionary(actions)

In [39]:
def generate_query_prompt(object_context, actions_dictionary, last_queries):
    base_prompt = """
You are simulating a human planning a task for a robot. The robot is equipped with sensors, manipulators, and mobility capabilities, and can perform tasks based on the objects in its environment and the actions available. It can also plan multi-step tasks or respond to impossible queries by indicating they cannot be completed.

Object Context: 
{object_context}

Actions Dictionary:
{actions_dictionary}

Generate a human-like query for the robot based on the given context and actions. Ensure that the queries vary in complexity:
- Some queries should be straightforward (e.g., single-step tasks).
- Some queries should be multi-step or involve reasoning (e.g., combining multiple actions or making decisions based on object attributes).
- Occasionally, generate queries that are impossible to execute (e.g., due to missing objects, incompatible actions, or physical limitations).

Here is the list of old queries that you created:
{last_queries}

Be creative and simulate realistic scenarios. The queries should be natural and diverse, for example:
- Go find something to eat that's not too salty.
- Close the hot water valve in the kitchen.
- Go make dinner and set the table.
- Help the firefighter put out the fire by spraying water on it.

Generate only a single query.
"""
    # select first 10-30 random captions and 5-15 random actions
    captions_sample = captions.sample(n=random.randint(3, 6))
    actions_sample = actions.sample(n=random.randint(2, 4))

    object_context = generate_object_context(captions_sample)
    actions_dictionary = generate_actions_dictionary(actions_sample)

    full_prompt = base_prompt.format(
        object_context=object_context,
        actions_dictionary=actions_dictionary,
        last_queries=last_queries
    )
    return full_prompt


last_queries = "test\ntest2"

query = generate_query_prompt(object_context, actions_dictionary, last_queries)
print(query)


You are simulating a human planning a task for a robot. The robot is equipped with sensors, manipulators, and mobility capabilities, and can perform tasks based on the objects in its environment and the actions available. It can also plan multi-step tasks or respond to impossible queries by indicating they cannot be completed.

Object Context: 
a group of people riding horses in a forest (37.4m)
a stop sign on a street (90.3m)
a white toilet (64.8m)

Actions Dictionary:
CheckBatteryLevel: Checks the robot's current battery level. (Arguments: )
ReportError: Reports an encountered error to the user. (Arguments: error)
LogEvent: Logs an event or data point for future reference. (Arguments: event)
ToggleSwitch: Toggles a switch on or off. (Arguments: switch)

Generate a human-like query for the robot based on the given context and actions. Ensure that the queries vary in complexity:
- Some queries should be straightforward (e.g., single-step tasks).
- Some queries should be multi-step or 

In [40]:
def generate_query(query_prompt):
    client = OpenAI()

    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": query_prompt}],
        max_tokens=1000,
        temperature=0.7
    )
    return response.choices[0].message.content.strip()


In [41]:
def generate_bt_prompt(object_context, actions_dictionary, query):
    base_prompt = """
You are a robot that generate Behavior Tree with actions and context in a ROS2-compatible environment. The robot is equipped with sensors, manipulators, and mobility capabilities, and can execute tasks using behavior trees.

Object Context: 
{object_context}

Actions Dictionary:
{actions_dictionary}

Generate a Behavior Tree XML structure based on the given context and actions. The tree should:
- Include nodes for sequence (`Sequence`), fallback (`Fallback`), and actions based on the actions dictionary.
- Use realistic action nodes with specified parameters for the given scenario.
- Allow for graceful recovery from failures using fallback nodes.
- If the query is not possible to execute, you must explain why and return an empty Behavior Tree XML.


Detail what you are going to do and then generate a Behavior Tree XML for it. Exemple for the query: "Go find something to eat that's not too salty."

You must respect the following format:

[Explanation]
Good, I will check in my database which objects match the query. If there is a good match, I will navigate to it and pick it up. Otherwise, I will try to find a kitchen, open the fridge, and try to find something to eat that's not too salty. Then I'll bring it back to the table.

[Behavior Tree]
<root main_tree_to_execute="MainTree">
  <BehaviorTree ID="MainTree">
    <Fallback>
      <Sequence>
        <Action ID="LocateObject" object="food_item"/>
        <Action ID="NavigateTo" target="food_item"/>
        <Action ID="Pick" object="food_item"/>
        <Action ID="Place" object="food_item" location="table"/>
      </Sequence>
      <Sequence>
        <Action ID="NavigateTo" target="kitchen"/>
        <Action ID="LocateObject" object="fridge"/>
        <Action ID="OpenDoor" door="fridge"/>
        <Action ID="InspectObject" object="fridge_contents"/>
        <Action ID="IdentifyObject" object="low_sodium_food"/>
        <Fallback>
          <Sequence>
            <Action ID="Pick" object="low_sodium_food"/>
            <Action ID="Place" object="low_sodium_food" location="table"/>
          </Sequence>
          <Action ID="RequestAssistance" task="find_food"/>
        </Fallback>
      </Sequence>
    </Fallback>
  </BehaviorTree>
</root>


The answer must contains only those two parts (Explanation and Behavior Tree XML). 
Query: {query}
""" # Generate samples for object context and actions 
  
    object_context_snippet = "\n".join(object_context.split("\n")[:3]) # Limit to 3 lines for brevity 
    actions_dictionary_snippet = "\n".join(actions_dictionary.split("\n")[:6]) # Limit to 6 actions for brevity
    full_prompt = base_prompt.format(
      object_context=object_context_snippet,
      actions_dictionary=actions_dictionary_snippet,
      query=query
      )
    return full_prompt


query = "Go find something to eat that's not too salty."
bt_prompt = generate_bt_prompt(object_context, actions_dictionary, query)
print(bt_prompt)


You are a robot that generate Behavior Tree with actions and context in a ROS2-compatible environment. The robot is equipped with sensors, manipulators, and mobility capabilities, and can execute tasks using behavior trees.

Object Context: 
a man in a white shirt and hat (64.3m)
a herd of sheep walking down the street (3.5m)
a stop sign on a street (28.2m)

Actions Dictionary:
NavigateTo: Moves the robot to a specified target (e.g., location or object). (Arguments: target)
Pick: Picks up a specified object. (Arguments: object)
Place: Places an object at a specified location. (Arguments: object, location)
LocateObject: Searches for a specified object in the environment. (Arguments: object)
RequestAssistance: Requests help for a specific task. (Arguments: task)
Wait: Waits for a specified amount of time. (Arguments: duration)

Generate a Behavior Tree XML structure based on the given context and actions. The tree should:
- Include nodes for sequence (`Sequence`), fallback (`Fallback`),

In [42]:
def extract_explanation_and_bt(bt_prompt):
    explanation_match = re.search(r"\[Explanation\]\n(.*?)\n\[Behavior Tree\]", bt_prompt, re.S)
    bt_match = re.search(r"\[Behavior Tree\]\n(.*)", bt_prompt, re.S)  # Adjusted to capture everything after [Behavior Tree]
    
    explanation = explanation_match.group(1).strip() if explanation_match else ""
    bt = bt_match.group(1).strip() if bt_match else ""
    return explanation, bt

explanation, bt = extract_explanation_and_bt(bt_prompt)
print(explanation)
print(bt)

Good, I will check in my database which objects match the query. If there is a good match, I will navigate to it and pick it up. Otherwise, I will try to find a kitchen, open the fridge, and try to find something to eat that's not too salty. Then I'll bring it back to the table.
<root main_tree_to_execute="MainTree">
  <BehaviorTree ID="MainTree">
    <Fallback>
      <Sequence>
        <Action ID="LocateObject" object="food_item"/>
        <Action ID="NavigateTo" target="food_item"/>
        <Action ID="Pick" object="food_item"/>
        <Action ID="Place" object="food_item" location="table"/>
      </Sequence>
      <Sequence>
        <Action ID="NavigateTo" target="kitchen"/>
        <Action ID="LocateObject" object="fridge"/>
        <Action ID="OpenDoor" door="fridge"/>
        <Action ID="InspectObject" object="fridge_contents"/>
        <Action ID="IdentifyObject" object="low_sodium_food"/>
        <Fallback>
          <Sequence>
            <Action ID="Pick" object="low_sodium_

In [43]:
def generate_dataset(df_captions, df_robot_actions, dataset_size=20):
    dataset = []
    last_queries = []  # To store the last 10 queries
    
    for _ in range(dataset_size):
        # Randomly select captions and actions
        captions_sample = df_captions.sample(n=random.randint(3, 6))
        actions_base = df_robot_base_actions
        actions_optional_sample = df_robot_optional_actions.sample(n=random.randint(2, 4))
        actions_sample = pd.concat([actions_base, actions_optional_sample])

        # Generate object context and actions dictionary
        object_context = generate_object_context(captions_sample)
        actions_dictionary = generate_actions_dictionary(actions_sample)
        
        # Generate query
        query_prompt = generate_query_prompt(object_context, actions_dictionary, last_queries)
        query = generate_query(query_prompt)
        bt_prompt = generate_bt_prompt(object_context, actions_dictionary, query)
        answer = generate_query(bt_prompt)
        explanation, bt = extract_explanation_and_bt(answer)

        
        # Update the last queries
        last_queries.append(query)
        if len(last_queries) > 10:
            last_queries.pop(0)  # Keep only the last 10 queries
        
        # Save the data entry
        dataset_entry = {
            "object_context": object_context,
            "actions_dictionary": actions_dictionary,
            "query": query,
            "explanation": explanation,
            "bt": bt
        }
        dataset.append(dataset_entry)
    

    
    print(f"Dataset of {dataset_size} entries generated")
    return dataset

# Generate the dataset
dataset = generate_dataset(df_captions, df_robot_actions, dataset_size=100)

Dataset of 100 entries generated


In [44]:
# validate the behavior tree
for entry in dataset:
    is_valid, errors = validate_behavior_tree(entry['bt'])
    if not is_valid:
        print(f"Invalid behavior tree: {errors}")


Invalid behavior tree: ["Unexpected error: [Errno 2] No such file or directory: ''"]
Invalid behavior tree: ["Unexpected error: [Errno 2] No such file or directory: ''"]
Invalid behavior tree: ["Unexpected error: [Errno 2] No such file or directory: ''"]
Invalid behavior tree: ['XML Parsing Error: mismatched tag: line 23, column 12']
Invalid behavior tree: ["Unexpected error: [Errno 2] No such file or directory: ''"]


In [45]:
save_path="../data/queries_dataset.json"
# Save the dataset to a JSON file
with open(save_path, "w") as file:
    json.dump(dataset, file, indent=4)