## rekep+cot

In [None]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-


import os
import time
from vlm import MultiVLM          
API_KEY     = ""
BASE_URL    = ""
MODEL       = "gpt-4o"
SYSTEM_MSG  = "You are a helpful assistant."
ERP_USER    = ""

SAVE_ROOT   = "./rekep_results_cot"

TASK_CONFIG = {
    0: {
        "instruction": "Pick up a tennis ball",
        "objects": "two tennis ball, a robot, a desk",
        "image_url": ["./rekep_results/d/1/query_img.png"]
    },
    1: {
        "instruction": "Open the door of the microwave oven",
        "objects": "a microwave oven, a tennis ball, a robot, a desk",
        "image_url": ["./rekep_results/1/1/query_img.png"]
    },
    2: {
        "instruction": "reorient the pen and drop it into a holder",
        "objects": "a pen, two holder, a robot, a desk",
        "image_url": ["./rekep_results/2/1/query_img.png"]
    },
    3: {
        "instruction": "pick up the holder horizontally or vertically, Mark 1 represents horizontal grasping, mark 0 represents vertical grasping",
        "objects": "a pen holder, a robot, a drawer, a desk",
        "image_url": ["./rekep_results/3/1/query_img.png"]
    },
    4: {
        "instruction": "close the drawer",
        "objects": "a drawer, a toy, a robot, a desk",
        "image_url": ["./rekep_results/4/1/query_img.png"]
    },
    5: {
        "instruction": "reorient the pen and drop it into a holder",
        "objects": "an apple, a pen, two holder, a robot, a desk",
        "image_url": ["./rekep_results/5/1/query_img.png"]
    },
    6: {
        "instruction": "Put the apple and the pen holder on the drawer, and the apple should be placed in the pen holder",
        "objects": "an apple, a pen holder, a drawer, a robot, a desk",
        "image_url": ["./rekep_results/6/1/query_img.png"]
    },
    7: {
        "instruction": "Put the apple and the tennis ball in either the drawer or the pen holder, together or separately. Ensure the drawer is closed.",
        "objects": "an apple, a tennis ball, a pen holder, a drawer, a robot, a desk",
        "image_url": ["./rekep_results/7/1/query_img.png"]
    }
}

def ensure_dir(path):
    os.makedirs(path, exist_ok=True)

def build_content_scene(instruction, objects):
    tmpl = '''
The task that the robot needs to complete is to {instruction}. 
Scenario description: There are {objects} in the picture
You need to first analyze what objects each label represents and the state of each object. 
Format Example: 
Label: label 1 is an apple, label 2 is a drawer, and label 3 is a pen holder. 
Status: Apple is on table, drawer is on table, drawer is closed, pen holder is on table.
Note: We define articulated objects as open only when they are fully open, and if they are only slightly open, they are considered closed. 
'''
    return tmpl.format(instruction=instruction, objects=objects).strip()

def build_content_notice(instruction, scene_state):
    tmpl = '''
The task that the robot needs to complete is to {instruction}. 
To complete the task, you need to specify a plan based on the picture.
Scenario description: {scene_state}.
Action skills that the robot can use are:
- pick
- place on
- place in
- open
- close

# Pay attention to the size of objects and the relationships between objects in three-dimensional space. 
# Please note that the actions in the plan should not cause damage to other environmental objects. 
For example:
- Opening the drawer may knock over the vase, you should first move the vase away.
- Picking up an apple may accidentally knock over the pen holder next to the apple, you should first move the vase away.
- To fit the mouse into a box, it is necessary to select a box of appropriate size.
- To prevent blocking the apples underneath when opening the drawer, you should first move the apples away before opening the drawer.
What are the precautions when specifying a plan for this task, describe in one sentence?
'''
    return tmpl.format(instruction=instruction, scene_state=scene_state).strip()

def build_content_plan(instruction, scene_state, notice):
    tmpl = '''
The task that the robot needs to complete is to {instruction}. 
To complete the task, you need to specify a plan based on the picture.
Scenario description: {scene_state}.
Action skills that the robot can use are:
- pick
- place on
- place in
- open
- close
# Only after opening the articulated objects can other objects be inserted inside. 
# After picking up an object, the robot cannot perform opening or closing actions until the object is put down.
Notice: {notice}
Directly output action sequence:
'''
    return tmpl.format(instruction=instruction, scene_state=scene_state, notice=notice).strip()

def run_once(task_id, cfg):
    instruction = cfg["instruction"]
    objects     = cfg["objects"]
    image_url   = cfg["image_url"]

    content1 = build_content_scene(instruction, objects)
    vlm = MultiVLM(API_KEY, MODEL, SYSTEM_MSG, BASE_URL, erp=ERP_USER)
    scene_state = vlm.call_model_with_multi(content1, image_url)
    print(f"[Task{task_id}] Scene State: {scene_state}")

    content2 = build_content_notice(instruction, scene_state)
    notice = vlm.call_model_with_multi(content2, image_url)
    print(f"[Task{task_id}] Notice: {notice}")

    content3 = build_content_plan(instruction, scene_state, notice)
    plan = vlm.call_model_with_multi(content3, image_url)
    print(f"[Task{task_id}] Plan: {plan}")

    task_dir = os.path.join(SAVE_ROOT, str(task_id))
    ensure_dir(task_dir)
    timestamp = time.strftime('%Y%m%d%H%M%S', time.localtime())
    file_path = os.path.join(task_dir, f"{timestamp}.txt")

    context = f"""
content: {content3}
image_url: {image_url}
response: {plan}
""".strip()

    with open(file_path, "w", encoding="utf-8") as f:
        f.write(context)
    print(f"[Task{task_id}] Saved → {file_path}\n")
    return file_path

def main(task_range=(3, 4), samples=10):
    for task_id in range(*task_range):
        if task_id not in TASK_CONFIG:
            print(f"Skip undefined task {task_id}")
            continue
        cfg = TASK_CONFIG[task_id]
        print(f"\n========== Task {task_id} begin ({samples} rounds) ==========")
        for rnd in range(1, samples + 1):
            print(f"\n>>> Task {task_id} / Round {rnd}")
            try:
                run_once(task_id, cfg)
            except Exception as e:
                print(f"[Task{task_id} Round{rnd}] Error: {e}")
        print(f"========== Task {task_id} done ==========\n")

if __name__ == "__main__":
    main()



>>> Task 3 / Round 1
[Task3] Scene State: Label: Label 0 represents the holder for vertical grasping, Label 1 represents the holder for horizontal grasping. 

Status: The pen holder is on an orange box which is placed on top of a red box, the drawer is on the desk and appears to be closed, the desk is unoccupied otherwise with the exception of the robot's equipment visible in the background. The pen holder is on the box on the desk.
[Task3] Notice: The robot must carefully select the horizontal or vertical grasping action based on the position and stability of the pen holder to avoid tipping it over or disrupting nearby objects.
[Task3] Plan: pick(1)
place on(desk)
[Task3] Saved → /home/dell/workspace/xwj/Enhanced_ReKep4xarm_Tinker-ros2_migration/result/rekep_cot/3/20251129174312.txt


>>> Task 3 / Round 2
[Task3] Scene State: Label Analysis:
- Label 0 is the position to grasp the pen holder vertically.
- Label 1 is the position to grasp the pen holder horizontally.

State Analysis:
