In [13]:
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    BitsAndBytesConfig,
    TrainingArguments,
)
from peft import IA3Config

import torch
from trl import SFTTrainer
import json
import pandas as pd
from datasets import Dataset

import bitsandbytes as bnb

torch.cuda.empty_cache()

In [14]:
torch_dtype = torch.float16
attn_implementation = "eager"

# QLoRA config
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch_dtype,
    bnb_4bit_use_double_quant=True,
)

# base_model = "meta-llama/Llama-3.2-1B-Instruct"
base_model = "llama-1B-bt-xml-lora"
# Load model
model = AutoModelForCausalLM.from_pretrained(
    base_model,
    quantization_config=bnb_config,
    device_map="auto",
    attn_implementation=attn_implementation
)

# Load tokenizer
tokenizer= AutoTokenizer.from_pretrained(base_model, trust_remote_code=True)
tokenizer.add_special_tokens({'pad_token': '[PAD]'})

0

In [2]:
torch_dtype = torch.float16
attn_implementation = "eager"

# QLoRA config
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch_dtype,
    bnb_4bit_use_double_quant=True,
)

base_model = "meta-llama/Llama-3.2-1B-Instruct"

# Load model
model = AutoModelForCausalLM.from_pretrained(
    base_model,
    quantization_config=bnb_config,
    device_map="auto",
    attn_implementation=attn_implementation
)

# Load tokenizer
tokenizer= AutoTokenizer.from_pretrained(base_model, trust_remote_code=True)
tokenizer.add_special_tokens({'pad_token': '[PAD]'})

1

In [5]:
data = pd.read_json("../data/behavior_tree_dataset.json")
data.head()


Unnamed: 0,object_context,actions_dictionary,query,explanation,bt_xml,bt_json
0,"[{'name': 'a ladder', 'position': {'x': 30.0, ...","[{'name': 'Wait', 'description': 'Wait for a s...",Can you please locate the cooking pot and then...,I will first locate the cooking pot in the env...,"<root main_tree_to_execute=""LocateAndNavigateT...","{'type': 'Sequence', 'name': 'LocateAndNavigat..."
1,"[{'name': 'a fireproof blanket', 'position': {...","[{'name': 'Wait', 'description': 'Wait for a s...",Could you locate the thermal blanket and then ...,"Good, I will first attempt to locate the therm...","<root main_tree_to_execute=""LocateAndPrepareMe...","{'type': 'Sequence', 'name': 'LocateAndPrepare..."
2,"[{'name': 'a pair of boots', 'position': {'x':...","[{'name': 'Wait', 'description': 'Wait for a s...",Could you please locate the fire extinguisher ...,,"<root main_tree_to_execute=""LocateAndRetrieveB...","{'type': 'Sequence', 'name': 'LocateAndRetriev..."
3,"[{'name': 'a smartphone', 'position': {'x': 7....","[{'name': 'Wait', 'description': 'Wait for a s...",Can you please locate the rope and then naviga...,I will first locate the rope and retrieve its ...,"<root main_tree_to_execute=""LocateAndNavigateT...","{'type': 'Sequence', 'name': 'LocateAndNavigat..."
4,"[{'name': 'a rope', 'position': {'x': 3.9, 'y'...","[{'name': 'Wait', 'description': 'Wait for a s...",Can you please locate the folding knife and th...,"Good, I will first locate the folding knife an...","<root main_tree_to_execute=""LocateKnifeAndNavi...","{'type': 'Sequence', 'name': 'LocateKnifeAndNa..."


In [6]:
object_context = data['object_context']
actions_dictionary = data['actions_dictionary']
query = data['query']
explanation = data['explanation']
bt_xml = data['bt_xml']
bt_json = data['bt_json']

from prompt_data import template, action_list, object_list, question_example, xml_example, json_example, answer_example, short_template, training_template

data_prompt = """<|begin_of_text|><|start_header_id|>system<|end_header_id|>
{}<|eot_id|>
<|start_header_id|>user<|end_header_id|>
{}<|eot_id|>
<|start_header_id|>assistant<|end_header_id|>
{}
"""


def formatting_prompt(examples):
    instructions = examples["instruction"]
    inputs = examples["input"]
    outputs = examples["output"]
    texts = []
    for instruction,input_, output in zip(instructions, inputs, outputs):
        text = data_prompt.format(instruction,input_, output)
        texts.append(text)
    return { "text" : texts, }


json_system = template.format(
    format_type="JSON",
    example=question_example + "\n" + answer_example + "\n" + json_example,
    available_actions=action_list,
    object_list=object_list,
)

xml_systems = [
    short_template.format(
        format_type="XML",
        example=question_example + "\n" + answer_example + "\n" + xml_example,
        available_actions= action_list,
        object_list=reduced_object_list,
    ) for reduced_object_list in object_context]

training_systems = [
    training_template.format(
        available_actions= action_list,
        object_list=object_list,
    ) for object_list in object_context]

formatted_data = pd.DataFrame({
    'complete_instruction' : xml_systems,
    'instruction': training_systems,
    'input': query,
    'output': bt_xml,
})

formatted_data.head()

Unnamed: 0,complete_instruction,instruction,input,output
0,"You are GoatBrain, an AI assistant that proces...","You are GoatBrain, an AI assistant that proces...",Can you please locate the cooking pot and then...,"<root main_tree_to_execute=""LocateAndNavigateT..."
1,"You are GoatBrain, an AI assistant that proces...","You are GoatBrain, an AI assistant that proces...",Could you locate the thermal blanket and then ...,"<root main_tree_to_execute=""LocateAndPrepareMe..."
2,"You are GoatBrain, an AI assistant that proces...","You are GoatBrain, an AI assistant that proces...",Could you please locate the fire extinguisher ...,"<root main_tree_to_execute=""LocateAndRetrieveB..."
3,"You are GoatBrain, an AI assistant that proces...","You are GoatBrain, an AI assistant that proces...",Can you please locate the rope and then naviga...,"<root main_tree_to_execute=""LocateAndNavigateT..."
4,"You are GoatBrain, an AI assistant that proces...","You are GoatBrain, an AI assistant that proces...",Can you please locate the folding knife and th...,"<root main_tree_to_execute=""LocateKnifeAndNavi..."


In [7]:
# Create the dataset and apply the mapping
training_data = Dataset.from_pandas(formatted_data)
training_data = training_data.map(formatting_prompt, batched=True)

Map:   0%|          | 0/235 [00:00<?, ? examples/s]

In [8]:
tokenized = tokenizer(training_data[1]['text'], return_tensors="pt")

sequence_length = tokenized.input_ids.size(-1)
print(f"Sequence length: {sequence_length}")

Sequence length: 920


In [7]:

def find_all_linear_names(model):
    cls = bnb.nn.Linear4bit
    lora_module_names = set()
    for name, module in model.named_modules():
        if isinstance(module, cls):
            names = name.split('.')
            lora_module_names.add(names[0] if len(names) == 1 else names[-1])
    if 'lm_head' in lora_module_names:  # needed for 16 bit
        lora_module_names.remove('lm_head')
    return list(lora_module_names)

modules = find_all_linear_names(model)


# IA3 config
peft_config = IA3Config(task_type="CAUSAL_LM", target_modules=["k_proj", "v_proj", "down_proj"], feedforward_modules=["down_proj"])


new_model = "llama-3.2-1b-bt-xml-ia3"

#Hyperparamter
training_arguments = TrainingArguments(
    output_dir=new_model,
    per_device_train_batch_size=1,
    per_device_eval_batch_size=1,
    gradient_accumulation_steps=2,
    optim="paged_adamw_32bit",
    num_train_epochs=10,
    evaluation_strategy="steps",
    eval_steps=0.1,
    logging_steps=1,
    warmup_steps=10,
    logging_strategy="steps",
    learning_rate=1e-4,
    fp16=True,
    bf16=False,
    group_by_length=True,
    report_to="wandb",
)


from trl import SFTConfig

# Create the SFT config
sft_config = SFTConfig(
    max_seq_length=1100,
    packing=False,
    **training_arguments.to_dict()
)

# Create train/test split
full_dataset = training_data.train_test_split(test_size=0.1, seed=42)


# Setting sft parameters
trainer = SFTTrainer(
    model=model,
    train_dataset=full_dataset["train"],
    eval_dataset=full_dataset["test"],
    peft_config=peft_config,
    args=sft_config,
    processing_class=tokenizer
)





Map:   0%|          | 0/211 [00:00<?, ? examples/s]

Map:   0%|          | 0/24 [00:00<?, ? examples/s]

In [15]:
tokenized = tokenizer(training_data[0]['instruction'], return_tensors="pt")

sequence_length = tokenized.input_ids.size(-1)
print(f"Sequence length: {sequence_length}")

Sequence length: 601


In [16]:
instructions = training_data["complete_instruction"][0]
inputs = training_data["input"][0]
outputs = training_data["output"][0]
text = data_prompt.format(instructions, inputs, "")

In [17]:
text

'<|begin_of_text|><|start_header_id|>system<|end_header_id|>\nYou are GoatBrain, an AI assistant that processes questions and tasks. For questions, provide direct answers. For tasks:\n1. Acknowledge the task\n2. Generate a behavior tree in XML format\n3. Always enclose the tree in <plan></plan> tags\n\nNode types:\n- Sequence: Executes in order, stops on failure\n- Fallback: Tries until success\n- Retry: Retries N times\n- Loop: Continuous execution\n- Other nodes: Specific actions\n\nExample:\nCan you retrieve the red apple from the kitchen counter and place it on the cutting board in the dining room?\nI will locate the red apple on the kitchen counter, pick it up with medium grip strength and high precision, and then place it on the wooden cutting board on the dining table\n\n<plan>\n<root main_tree_to_execute="RetrieveAndPlaceAppleSequence">\n    <BehaviorTree ID="RetrieveAndPlaceAppleSequence">\n        <Sequence name="RetrieveAndPlaceApple">\n            <Retry num_attempts="3">\n

In [18]:
instructions = training_data["complete_instruction"][0]
inputs = training_data["input"][0]
outputs = training_data["output"][0]
text = data_prompt.format(instructions, inputs, "")

inputs = tokenizer([
    text
], return_tensors='pt', padding=True, truncation=True).to("cuda")


outputs = model.generate(**inputs, max_new_tokens = 500, use_cache = True)

answer=tokenizer.batch_decode(outputs)
answer=answer[0].split("<|start_header_id|>assistant<|end_header_id|>")[-1]
print("Answer of the question is:", answer)

Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.


Answer of the question is: 

<plan>
<root main_tree_to_execute="LocateAndNavigateToPlan">
    <BehaviorTree ID="LocateAndNavigateToPlan">
        <Locate object="cooking pot" position_x="20.0" position_y="6.0" position_z="1.2" method="camera_scan"/>
        <Locate object="ladder" position_x="30.0" position_y="14.5" position_z="3.2" method="camera_scan"/>
        <Sequence name="LocateAndNavigateToPlan">
            <Locate object="cooking pot" position_x="20.0" position_y="6.0" position_z="1.2" method="camera_scan"/>
            <Locate object="ladder" position_x="30.0" position_y="14.5" position_z="3.2" method="camera_scan"/>
            <Retry num_attempts="2">
                <Navigate x="30.0" y="14.5" z="3.2"/>
            </Retry>
        </Sequence>
    </BehaviorTree>
</root>
</plan>

Actions allowed:
[{'name': 'Wait', 'description': 'Wait for a specific duration', 'params': {'duration': 'float, duration of the waiting time in seconds, e.g., 2.0'}}, {'name': 'Locate', 'descrip

In [12]:
trainer.train()

[34m[1mwandb[0m: Using wandb-core as the SDK backend.  Please refer to https://wandb.me/wandb-core for more information.
[34m[1mwandb[0m: Currently logged in as: [33msimonroy99[0m ([33msimonroy99-cole-de-technologie-sup-rieure[0m). Use [1m`wandb login --relogin`[0m to force relogin


  0%|          | 0/1050 [00:00<?, ?it/s]

Trainer.tokenizer is now deprecated. You should use Trainer.processing_class instead.
Trainer.tokenizer is now deprecated. You should use Trainer.processing_class instead.


  0%|          | 0/24 [00:00<?, ?it/s]

KeyboardInterrupt: 

In [13]:
inputs = tokenizer([
    text
], return_tensors='pt', padding=True, truncation=True).to("cuda")


outputs = model.generate(**inputs, max_new_tokens = 500, use_cache = True)

answer=tokenizer.batch_decode(outputs)

Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.


In [14]:
EOS_TOKEN = "<|eot_id|>"

In [15]:
answer=answer[0].split("<|start_header_id|>assistant<|end_header_id|>")[-1].split(EOS_TOKEN)[0]
print("Answer of the question is:", answer)

Answer of the question is: 

<plan>
<root main_tree_to_execute="LocateAndNavigateToPlan">
    <Locate object="cooking pot" position_x="{pot_position_x}" position_y="{pot_position_y}" position_z="{pot_position_z}" method="camera_scan"/>
    <Navigate x="{pot_position_x}" y="{pot_position_y}" z="{pot_position_z}"/>
</root>
</plan>

Actions allowed:
[{'name': 'Wait', 'description': 'Wait for a specific duration', 'params': {'duration': 'float, duration of the waiting time in seconds, e.g., 2.0'}}, {'name': 'Locate', 'description': 'Find an object', 'params': {'object': 'cooking pot'}, 'output': {'position_x': 'float, X coordinate of the cooking pot', 'position_y': 'float, Y coordinate of the cooking pot', 'position_z': 'float, Z coordinate of the cooking pot'}}, {'name': 'Navigate', 'description': 'Go to a destination', 'params': {'x': 'float, X coordinate of the destination, e.g., 1.5', 'y': 'float, Y coordinate of the destination, e.g., 0.0', 'z': 'float, Z coordinate of the destination

In [22]:
json_system = template.format(
    format_type="XML",
    example=question_example + "\n" + answer_example + "\n" + xml_example,
    available_actions=action_list,
    object_list=object_list,
)

In [25]:
from evaluation import evaluate_model

json_1b = evaluate_model(model=model,
                        tokenizer=tokenizer,
                        formatting_prompt=formatting_prompt,
                        validation_type="xml",
                        query_file="./query_dataset.json",
                        instruction=json_system,
                        action_list=action_list)

100%|██████████| 50/50 [22:49<00:00, 27.39s/it]


In [26]:
print(json_1b['score'])

{'score': 0.44,
 'plans': {'no plan': [['Retrieve the red apple from the kitchen counter and place it on the wooden cutting board on the dining table.',
    '\n\nTo retrieve the red apple from the kitchen counter and place it on the wooden cutting board on the dining table, I will follow these steps:\n\n### Retrieval:\n1. Locate the red apple on the kitchen counter using the Locate object with the specified parameters.\n2. Pick up the red apple using the Pick object with the specified grip strength and precision level.\n3. Move the apple to the wooden cutting board using the Move object.\n\n### Placement:\n1. Place the apple on the wooden cutting board using the Place object with the specified surface and orientation.\n\nHere is the behavior tree:\n\n<plan>\n<root main_tree_to_execute="RetrieveAndPlaceAppleSequence">\n    <BehaviorTree ID="RetrieveAndPlaceAppleSequence">\n        <Locate object="red apple sitting on the kitchen counter" \n                position_x="{apple_position_x}"