In [1]:
import robotic as ry
import numpy as np
import time
import unified_planning as up
from unified_planning.shortcuts import *
from unified_planning.plot import plot_sequential_plan


print("Initializing Rai Config...")
C = ry.Config()
C.addFile(ry.raiPath('../rai-robotModels/task.g'))
viewer = C.view()

Location = UserType("Location")
Object = UserType("Object")
Tool = UserType("Tool")

tray1 = up.model.Object("tray1", Object)
tray2 = up.model.Object("tray2", Object)
mug   = up.model.Object("mug", Object)
sponge    = up.model.Object("sponge", Tool)
l_gripper = up.model.Object("l_gripper", Tool)
table1    = up.model.Object("table1", Location)
bin       = up.model.Object("bin", Location)

on = Fluent("on", BoolType(), obj=Object, loc=Location)
holding = Fluent("holding", BoolType(), tool=Tool)
ready_to_pick = Fluent("ready_to_pick", BoolType(), obj=Object)
needs_processing = Fluent("needs_processing", BoolType(), obj=Object)
dirty = Fluent("dirty", BoolType(), loc=Location)
clean = Fluent("clean", BoolType(), loc=Location)
wiping_allowed = Fluent("wiping_allowed", BoolType())
usable_tool = Fluent("usable_tool", BoolType(), obj=Object, tool=Tool)

pick_and_place = InstantaneousAction("pick_and_place", obj=Object, src=Location, dest=Location, tool=Tool)
pick_place_obj, pick_place_src, pick_place_dest, pick_place_tool = pick_and_place.parameters
pick_and_place.add_precondition(on(pick_place_obj, pick_place_src))
pick_and_place.add_precondition(ready_to_pick(pick_place_obj))
pick_and_place.add_precondition(usable_tool(pick_place_obj, pick_place_tool))
pick_and_place.add_precondition(needs_processing(pick_place_obj))
pick_and_place.add_effect(on(pick_place_obj, pick_place_src), False)
pick_and_place.add_effect(on(pick_place_obj, pick_place_dest), True)
pick_and_place.add_effect(needs_processing(pick_place_obj), False)
pick_and_place.add_effect(ready_to_pick(pick_place_obj), False)

allow_next_pick_tray2 = InstantaneousAction("allow_next_pick_tray2")
allow_next_pick_tray2.add_precondition(on(tray1, bin))
allow_next_pick_tray2.add_precondition(needs_processing(tray2))
allow_next_pick_tray2.add_effect(ready_to_pick(tray2), True)

allow_next_pick_mug = InstantaneousAction("allow_next_pick_mug")
allow_next_pick_mug.add_precondition(on(tray2, bin))
allow_next_pick_mug.add_precondition(needs_processing(mug))
allow_next_pick_mug.add_effect(ready_to_pick(mug), True)

allow_wiping = InstantaneousAction("allow_wiping")
allow_wiping.add_precondition(on(tray1, bin))
allow_wiping.add_precondition(on(tray2, bin))
allow_wiping.add_precondition(on(mug, bin))
allow_wiping.add_effect(wiping_allowed, True)

wipe = InstantaneousAction("wipe", loc=Location, tool=Tool)
wipe_loc, wipe_tool = wipe.parameters
wipe.add_precondition(dirty(wipe_loc))
wipe.add_precondition(wiping_allowed)
wipe.add_effect(dirty(wipe_loc), False)
wipe.add_effect(clean(wipe_loc), True)

problem = Problem("table_cleaning_with_pick_and_place")
problem.add_fluent(on, default_initial_value=False)
problem.add_fluent(holding, default_initial_value=False)
problem.add_fluent(ready_to_pick, default_initial_value=False)
problem.add_fluent(needs_processing, default_initial_value=False)
problem.add_fluent(dirty, default_initial_value=False)
problem.add_fluent(clean, default_initial_value=False)
problem.add_fluent(wiping_allowed, default_initial_value=False)
problem.add_fluent(usable_tool, default_initial_value=False)

problem.add_action(pick_and_place)
problem.add_action(allow_next_pick_tray2)
problem.add_action(allow_next_pick_mug)
problem.add_action(allow_wiping)
problem.add_action(wipe)

problem.add_objects([tray1, tray2, mug, sponge, l_gripper, table1, bin])
problem.set_initial_value(on(tray1, table1), True)
problem.set_initial_value(on(tray2, table1), True)
problem.set_initial_value(on(mug, table1), True)
problem.set_initial_value(dirty(table1), True)
problem.set_initial_value(holding(l_gripper), False)
problem.set_initial_value(holding(sponge), False)
problem.set_initial_value(wiping_allowed, False)
problem.set_initial_value(ready_to_pick(tray1), True)  
problem.set_initial_value(ready_to_pick(tray2), False)
problem.set_initial_value(ready_to_pick(mug), False)
problem.set_initial_value(needs_processing(tray1), True)
problem.set_initial_value(needs_processing(tray2), True)
problem.set_initial_value(needs_processing(mug), True)

problem.set_initial_value(usable_tool(tray1, l_gripper), True)
problem.set_initial_value(usable_tool(tray2, l_gripper), True)
problem.set_initial_value(usable_tool(mug, l_gripper), True)

problem.add_goal(on(tray1, bin))
problem.add_goal(on(tray2, bin))
problem.add_goal(on(mug, bin))
problem.add_goal(clean(table1))

def unified_komo_dynamic_with_rrt(C, tasks, duration=6.0, steps=40):
    komo = ry.KOMO(C, duration, steps, 1, False)
    komo.addControlObjective([], 1, 1e-1)   
    komo.addQuaternionNorms([], 3.0)      

    for i, task in enumerate(tasks):
        object_name = task["object_name"]
        target_name = task["target_name"]
        grasp_time = 2 * i + 1
        place_time = 2 * i + 2
        try:
            object_frame = C.getFrame(object_name)
            object_size = object_frame.info().get("size", [0.3, 0.3, 0.1])
            object_height = object_size[2]
            initial_orientation = object_frame.getQuaternion()
        except Exception as e:
            print(f"Error retrieving properties for {object_name}: {e}")
            object_size = [0.3, 0.3, 0.1]
            object_height = 0.1
            initial_orientation = [1, 0, 0, 0]
        if "placement_position" in task:
            next_position = task["placement_position"]
        else:
            target_frame = C.getFrame(target_name)
            next_position = target_frame.getPosition()
            next_position[2] += object_height / 2.0

        print(f"Calculated position for {object_name}: {next_position}")
        komo.addModeSwitch([grasp_time, place_time], ry.SY.stable, ["l_gripper", object_name])
        komo.addObjective([grasp_time], ry.FS.positionDiff, ["l_gripper", object_name], ry.OT.sos, [1e1])
        komo.addObjective([grasp_time], ry.FS.quaternionDiff, ["l_gripper", object_name],
                          ry.OT.eq, [1e2], initial_orientation)
        komo.addModeSwitch([place_time, -1.], ry.SY.stable, [target_name, object_name])
        komo.addObjective([place_time], ry.FS.position, [object_name], ry.OT.eq, [1e2], next_position)
        komo.addObjective([place_time], ry.FS.quaternionDiff, [object_name, target_name],
                          ry.OT.eq, [1e2], initial_orientation)


    try:
        komo.updateRootObjects(C)
        ret = ry.NLP_Solver(komo.nlp(), verbose=0).solve()
        print("KOMO result:", ret)
    except Exception as e:
        print(f"Error during KOMO optimization: {e}")
        return
    try:
        print("Playing KOMO trajectory for multiple tasks...")
        komo.view_play(True, 0.1)
    except Exception as e:
        print(f"Viewer playback error: {e}")
    print("Using RRT for path feasibility across multiple pick-and-places...")
    komo_path = komo.getPath()
    current_state = C.getJointState()
    feasible_path = None
    attempts = 0

    while feasible_path is None:
        attempts += 1
        print(f"RRT Attempt {attempts}")
        rrt = ry.PathFinder()
        rrt.setProblem(C, [current_state], [komo_path[-1]])
        result = rrt.psbi_solve()

        if result and result.x is not None:
            feasible_path = result.x
            print(f"Feasible path found after {attempts} attempt(s).")
        else:
            print("No feasible path found, retrying...")
    print("Visualizing RRT trajectory for multiple objects...")
    for t in range(feasible_path.shape[0]):
        C.setJointState(feasible_path[t])
        C.view(False, f"RRT Step {t}")
        time.sleep(0.1)
    for task in tasks:
        obj = task["object_name"]
        tgt = task["target_name"]
        try:
            C.attach(obj, tgt)
            print(f"Attached {obj} to {tgt}")
        except Exception as e:
            print(f"Error attaching {obj} to {tgt}: {e}")
    for task in tasks:
        obj = task["object_name"]
        try:
            pos = C.getFrame(obj).getPosition()
            print(f"Final position of {obj} -> {pos}")
        except Exception as e:
            print(f"Error retrieving final position of {obj}: {e}")
    print("Done with multi-object planning. Updating Config view...")
    C.view()
    time.sleep(2)

def execute_plan_with_multiobject_komo(C, plan):
    current_state = C.getJointState()
    tasks = []  

    for idx, action_instance in enumerate(plan.actions):
        action_name = action_instance.action.name
        params = [str(arg) for arg in action_instance.actual_parameters]

        if action_name == "pick_and_place":
            if len(params) != 4:
                print(f"Warning: 'pick_and_place' expects 4 parameters, got {len(params)}. Skipping.")
                continue
            obj, src, dest, tool = params
            tasks.append({"object_name": obj, "target_name": dest})

        else:
            print(f"Executing symbolic action: '{action_name}' with params {params}...")
            if action_name == "allow_next_pick_tray2":
                print("Allowing next pick for tray2.")
            elif action_name == "allow_next_pick_mug":
                print("Allowing next pick for mug.")
            elif action_name == "allow_wiping":
                print("Allowing wiping.")
            elif action_name == "wipe":
                print("Wiping the table.")

    if len(tasks) > 0:
        print("\nExecuting all pick-and-place actions in a single KOMO call...")
        unified_komo_dynamic_with_rrt(C, tasks, duration=8.0, steps=50)
        current_state = C.getJointState()  # Update the joint state after execution

    return current_state

def main():
    with OneshotPlanner(name="pyperplan") as planner:
        result = planner.solve(problem)
        if result.status == up.engines.PlanGenerationResultStatus.SOLVED_SATISFICING:
            print("PDDL Plan found:")
            for action in result.plan.actions:
                print(" ->", action)
            print("\n=== Executing plan with multi-object pick-and-place ===")
            final_state = execute_plan_with_multiobject_komo(C, result.plan)
            print("Done. Final state reached.")
        else:
            print("No valid PDDL Plan found.")

if __name__ == "__main__":
    main()


Initializing Rai Config...
[96m[1mNOTE: To disable printing of planning engine credits, add this line to your code: `up.shortcuts.get_environment().credits_stream = None`
[0m[96m  *** Credits ***
[0m[96m  * In operation mode `OneshotPlanner` at line 238 of `/tmp/ipykernel_2993/2306333524.py`, [0m[96myou are using the following planning engine:
[0m[96m  * Engine name: pyperplan
  * Developers:  Albert-Ludwigs-Universität Freiburg (Yusra Alkhazraji, Matthias Frorath, Markus Grützner, Malte Helmert, Thomas Liebetraut, Robert Mattmüller, Manuela Ortlieb, Jendrik Seipp, Tobias Springenberg, Philip Stahl, Jan Wülfing)
[0m[96m  * Description: [0m[96mPyperplan is a lightweight STRIPS planner written in Python.[0m[96m
[0m[96m
[0mPDDL Plan found:
 -> pick_and_place(tray1, table1, bin, l_gripper)
 -> allow_next_pick_tray2
 -> pick_and_place(tray2, table1, bin, l_gripper)
 -> allow_next_pick_mug
 -> pick_and_place(mug, table1, bin, l_gripper)
 -> allow_wiping
 -> wipe(table1, l_