In [1]:
import os
import json
import numpy as np
import robotic as ry
from tqdm import tqdm
from utils import get_rand_scale_vec
from lookup import features, contraint_types
from random_kitchen_generator import generate_random_kitchen

In [2]:
# C = ry.Config()
# C.addFile(ry.raiPath("scenarios/pandaOmnibase.g"))
C = generate_random_kitchen(seed=84979, root_path="/home/denis/rai-robotModels/robocasa/")
frame_names = C.getFrameNames()
features["qItself"][1] = len(C.getJointState())

Seed: 84979
Files found inside /home/denis/rai-robotModels/robocasa/rai_jointed/fixtures/stoves: 9
Files found inside /home/denis/rai-robotModels/robocasa/rai_jointed/fixtures/sinks: 10
Files found inside /home/denis/rai-robotModels/robocasa/rai_jointed/fixtures/ovens: 6
Files found inside /home/denis/rai-robotModels/robocasa/rai_jointed/fixtures/dishwashers: 3


## Generate KOMO texts

In [3]:
# Main Motions
max_phases = 4
max_phase_contraints = 4
target_prob = .7
scale_as_vec_prob = .2
slices = 32
accumulated_collisions = True

# Sub-motions
prob_sub = .5
max_sub = 4
uniform_sub_prob = .2

# Mode Switches
prob_mode_switch = .5
max_mode_switches = 2
# max_mode_switches = 0
uniform_mode_switch_prob = .1

# Special Features
lucky_frames = ["r_gripper", "r_palm"]
prob_lucky_frame = .7
lucky_features = ["positionDiff", "position", "vectorX", "vectorY", "vectorZ", "quaternion", "negDistance"]
prob_lucky_feature = .3

In [4]:
def gen_rand_contraint(start: float, end: float=None) -> str:

    if end != None:
        times = f"[{start}, {end}]"
    else:
        times = f"[{start}]"

    if prob_lucky_feature:
        feature_name = np.random.choice(lucky_features)
    else:
        feature_name = np.random.choice(list(features.keys()))

    contraint_type = np.random.choice(contraint_types)
    # scale_pow = np.random.randint(-2, 3) # As matrix?
    if feature_name == "negDistance":
        scale_pow = 1
    else:
        scale_pow = np.random.choice([-1, 1]) # As matrix?

    scale_as_vec = np.random.random() < scale_as_vec_prob
    if features[feature_name][1] != 1 and scale_as_vec:
        scale_vec = get_rand_scale_vec(features[feature_name][1])
        scale = "["
        for v in scale_vec:
            if v == 1:
                scale += f"1e{scale_pow}, "
            else:
                scale += "0, "
        scale += "]"
        scale = scale.replace(", ]", "]")
    else:
        scale = f"[1e{scale_pow}]"
    
    frames_count = features[feature_name][0]
    frames = np.random.choice(frame_names, frames_count, replace=False).tolist()
    
    if np.random.random() < prob_lucky_frame:
        lucky_frame = np.random.choice(lucky_frames)
        if frames_count == 2 and np.random.choice([0, 1]):
            frames[1] = str(lucky_frame)
        else:
            frames[0] = str(lucky_frame)

    specific_target = np.random.random() < target_prob
    if specific_target:
        if feature_name == "negDistance":
            target = np.round(np.random.uniform(-2, 0, 1), 3).tolist()
        else:
            if feature_name == "qItself":
                target_range = np.pi*2
            else:
                target_range = 2
            target = np.round(np.random.uniform(-target_range, target_range, (features[feature_name][1])), 3).tolist()
        contraint_line = f"komo.addObjective({times}, ry.FS.{feature_name}, {frames}, ry.OT.{contraint_type}, {scale}, {target})\n"
    else:
        contraint_line = f"komo.addObjective({times}, ry.FS.{feature_name}, {frames}, ry.OT.{contraint_type}, {scale})\n"

    return contraint_line


def random_start_end(phases_count: int, uniform_prob: float) -> tuple[float, float]:
    if np.random.random() < uniform_prob:
        start = np.random.uniform(0, phases_count-.1)
        start = np.round(start, 1)
        end = np.random.uniform(start+.1, phases_count)
        end = np.round(end, 1)
    else:
        start = np.random.randint(0, phases_count)
        end = np.random.randint(start+1, phases_count+1)

    return start, end


def generate_random_komo() -> str:
    phases_count = np.random.randint(1, max_phases+1)
    sub_count = np.random.randint(0, max_sub+1)
    mode_switch_count = np.random.randint(0, max_mode_switches+1)

    final_komo = ""
    final_komo += f"komo = ry.KOMO(C, {phases_count}, {slices}, 2, {accumulated_collisions})\n"
    final_komo += f"komo.addControlObjective([], 0, 1e-2)\n"
    final_komo += f"komo.addControlObjective([], 1, 1e-1)\n"
    final_komo += f"komo.addControlObjective([], 2, 1e0)\n"
    final_komo += f"komo.addObjective([], ry.FS.jointLimits, [], ry.OT.ineq)\n"
    if accumulated_collisions:
        final_komo += f"komo.addObjective([], ry.FS.accumulatedCollisions, [], ry.OT.eq)\n"

    # Main phases
    for i in range(phases_count):
        repeats = np.random.randint(1, max_phase_contraints+1)
        for _ in range(repeats):
            final_komo += gen_rand_contraint(i+1)

    # Sub-motions
    if np.random.random() < prob_sub:
        for i in range(sub_count):
            start, end = random_start_end(phases_count, uniform_sub_prob)
            final_komo += gen_rand_contraint(start, end)

    # Mode Switches
    if phases_count >= 2 and np.random.random() < prob_mode_switch:
        for i in range(mode_switch_count):
            first_switch = i == 0
            start, end = random_start_end(phases_count, uniform_mode_switch_prob)
            frames = np.random.choice(frame_names, 2, replace=False).tolist()
            if np.random.random() < prob_lucky_frame:
                lucky_frame = np.random.choice(lucky_frames)
                frame_idx = np.random.choice([0, 1])
                frames[frame_idx] = lucky_frame
            final_komo += f"komo.addModeSwitch([{start}, {end}], ry.SY.stable, {frames}, {first_switch})\n"

    return final_komo

In [5]:
komo_text = generate_random_komo()
print(komo_text)

komo = ry.KOMO(C, 3, 32, 2, True)
komo.addControlObjective([], 0, 1e-2)
komo.addControlObjective([], 1, 1e-1)
komo.addControlObjective([], 2, 1e0)
komo.addObjective([], ry.FS.jointLimits, [], ry.OT.ineq)
komo.addObjective([], ry.FS.accumulatedCollisions, [], ry.OT.eq)
komo.addObjective([1], ry.FS.vectorY, ['knob_front_right_visual0'], ry.OT.eq, [1e-1, 0, 1e-1])
komo.addObjective([1], ry.FS.vectorX, ['r_palm'], ry.OT.sos, [1e-1], [0.984, 1.373, 0.334])
komo.addObjective([1], ry.FS.negDistance, ['r_panda_finger_joint2_origin', 'r_gripper'], ry.OT.ineq, [1e1], [-0.365])
komo.addObjective([1], ry.FS.vectorX, ['r_gripper'], ry.OT.eq, [1e-1], [-0.534, -1.467, 0.223])
komo.addObjective([2], ry.FS.vectorY, ['r_gripper'], ry.OT.sos, [1e-1], [-1.919, 1.554, -0.086])
komo.addObjective([2], ry.FS.positionDiff, ['r_palm', 'object_visual3'], ry.OT.ineq, [1e-1], [1.384, -1.706, -1.696])
komo.addObjective([2], ry.FS.position, ['r_gripper'], ry.OT.eq, [1e-1], [-1.596, 1.596, -0.613])
komo.addObjective(

In [6]:
komo_samples = 100_000

outfolder = "./komo_samples"
text_outfolder = os.path.join(outfolder, "texts")
if not os.path.exists(text_outfolder):
    os.makedirs(text_outfolder)
else:
    for file in os.listdir(text_outfolder):
        file_path = os.path.join(text_outfolder, file)
        os.remove(file_path)

for i in tqdm(range(komo_samples)):
    komo_text = generate_random_komo()
    filename = os.path.join(text_outfolder, f"komo{i}.txt")
    with open(filename, "w") as file:
        file.write(komo_text)

100%|██████████| 100000/100000 [00:21<00:00, 4735.69it/s]


## Classify KOMO texts

In [7]:
total_komos = 0
total_feasible_komos = 0
total_error_komos = 0

feasible_komos = []
error_komos = []

for i in tqdm(range(komo_samples)):
    
    filename = os.path.join(text_outfolder, f"komo{i}.txt")
    with open(filename, "r") as file:
        komo_text = file.read()

    # print(f"Processing file {filename}...")
    total_komos += 1
    
    try:
        komo = ry.KOMO(C, 1, 1, 0, False)
        exec(komo_text)
        ret = ry.NLP_Solver(komo.nlp(), verbose=0).solve()
    except:
        total_error_komos += 1
        error_komos.append(i)
        continue
    
    if ret.feasible:
        total_feasible_komos += 1
        feasible_komos.append(i)
    
    if (i+1)%(komo_samples//10) == 0:
        print(f"Total feasible komos: {total_feasible_komos}")
        print(f"Total error komos: {total_error_komos}")
        print(f"Total komos: {total_komos}")
        print(f"{(total_feasible_komos/total_komos)*100:.2f}% feasible")

  0%|          | 3/100000 [00:03<35:54:17,  1.29s/it]== ERROR:frame.cpp:setParent:965(-2) CHECK failed: 'f!=this' -- loop at frame 'r_finger1' when connecting 'r_finger1' to parent 'r_palm'
== ERROR:frame.cpp:setParent:965(-2) CHECK failed: 'f!=this' -- loop at frame 'r_panda_rightfinger_0' when connecting 'r_panda_rightfinger_0' to parent 'r_panda_joint2'
  0%|          | 6/100000 [00:03<12:45:40,  2.18it/s]

STACK9 /bin/python3() [0x58238f]
STACK8 /home/denis/.local/lib/python3.12/site-packages/robotic/_robotic.so(+0x40a85) [0x7da38ff1fa85]
STACK7 /home/denis/.local/lib/python3.12/site-packages/robotic/_robotic.so(+0x92c0d) [0x7da38ff71c0d]
STACK6 KOMO::addModeSwitch(rai::ArrayDouble const&, rai::SkeletonSymbol, rai::Array<rai::String> const&, bool)
STACK5 KOMO::addSwitch(double, bool, bool, rai::JointType, rai::SwitchInitializationType, char const*, char const*, rai::Transformation const&)
STACK4 KOMO::addSwitch(double, bool, std::shared_ptr<rai::KinematicSwitch> const&)
STACK3 KOMO::applySwitch(rai::KinematicSwitch const&)
STACK2 rai::KinematicSwitch::apply(rai::Array<rai::Frame*>&) const
STACK1 rai::Frame::setParent(rai::Frame*, bool, bool)
STACK0 rai::LogToken::~LogToken()
STACK9 /bin/python3() [0x58238f]
STACK8 /home/denis/.local/lib/python3.12/site-packages/robotic/_robotic.so(+0x40a85) [0x7da38ff1fa85]
STACK7 /home/denis/.local/lib/python3.12/site-packages/robotic/_robotic.so(+0x92c

  0%|          | 25/100000 [00:42<41:54:00,  1.51s/it]== ERROR:frame.cpp:setParent:965(-2) CHECK failed: 'f!=this' -- loop at frame 'r_panda_joint2_origin' when connecting 'r_panda_joint2_origin' to parent 'r_panda_hand'
  0%|          | 27/100000 [00:44<36:02:11,  1.30s/it]

STACK9 /bin/python3() [0x58238f]
STACK8 /home/denis/.local/lib/python3.12/site-packages/robotic/_robotic.so(+0x40a85) [0x7da38ff1fa85]
STACK7 /home/denis/.local/lib/python3.12/site-packages/robotic/_robotic.so(+0x92c0d) [0x7da38ff71c0d]
STACK6 KOMO::addModeSwitch(rai::ArrayDouble const&, rai::SkeletonSymbol, rai::Array<rai::String> const&, bool)
STACK5 KOMO::addSwitch(double, bool, bool, rai::JointType, rai::SwitchInitializationType, char const*, char const*, rai::Transformation const&)
STACK4 KOMO::addSwitch(double, bool, std::shared_ptr<rai::KinematicSwitch> const&)
STACK3 KOMO::applySwitch(rai::KinematicSwitch const&)
STACK2 rai::KinematicSwitch::apply(rai::Array<rai::Frame*>&) const
STACK1 rai::Frame::setParent(rai::Frame*, bool, bool)
STACK0 rai::LogToken::~LogToken()


  0%|          | 40/100000 [01:05<57:52:43,  2.08s/it]== ERROR:frame.cpp:setParent:965(-2) CHECK failed: 'f!=this' -- loop at frame 'r_panda_link8' when connecting 'r_panda_link8' to parent 'r_gripper'


STACK9 /bin/python3() [0x58238f]
STACK8 /home/denis/.local/lib/python3.12/site-packages/robotic/_robotic.so(+0x40a85) [0x7da38ff1fa85]
STACK7 /home/denis/.local/lib/python3.12/site-packages/robotic/_robotic.so(+0x92c0d) [0x7da38ff71c0d]
STACK6 KOMO::addModeSwitch(rai::ArrayDouble const&, rai::SkeletonSymbol, rai::Array<rai::String> const&, bool)
STACK5 KOMO::addSwitch(double, bool, bool, rai::JointType, rai::SwitchInitializationType, char const*, char const*, rai::Transformation const&)
STACK4 KOMO::addSwitch(double, bool, std::shared_ptr<rai::KinematicSwitch> const&)
STACK3 KOMO::applySwitch(rai::KinematicSwitch const&)
STACK2 rai::KinematicSwitch::apply(rai::Array<rai::Frame*>&) const
STACK1 rai::Frame::setParent(rai::Frame*, bool, bool)
STACK0 rai::LogToken::~LogToken()


  0%|          | 42/100000 [01:07<46:09:46,  1.66s/it]

In [8]:
result = f"""Total feasible komos - Total error komos - Total komos: 
{total_feasible_komos} - {total_error_komos} - {total_komos}. 
({(total_feasible_komos/total_komos)*100:.2f}% feasible)"""
print(result)
print(feasible_komos)
results_file_path = os.path.join(outfolder, "feasibles.json")
results_dict = {
    "feasible_komos": feasible_komos,
    "error_komos": error_komos,
}
json.dump(results_dict, open(results_file_path, 'w'))

Total feasible komos - Total error komos - Total komos: 
0 - 0 - 10. 
(0.00% feasible)
[]


## Visualize feasible KOMOs

In [9]:
feasibe_samples_count = 10
feasibe_samples = np.random.choice(feasible_komos, feasibe_samples_count, replace=False)

for i in feasibe_samples:

    filename = os.path.join(text_outfolder, f"komo{i}.txt")
    with open(filename, "r") as file:
        komo_text = file.read()
    print(f"__________ SAMPLE {i} __________")
    print(komo_text)

    komo = ry.KOMO(C, 1, 1, 0, False)
    exec(komo_text)
    ret = ry.NLP_Solver(komo.nlp(), verbose=0).solve()

    komo.view_play(True)

ValueError: 'a' cannot be empty unless no samples are taken