In [1]:
import pddlgym
import pddlparser, circuit

In [2]:
domain_fname = "cafeworld2.pddl"
problem_dir = "cafeworld_problem/"

# Get the model for this domain and problem
model1 = pddlparser.load_and_ground_pddl(domain_fname, problem_dir)
model2 = pddlparser.load_and_ground_pddl(domain_fname, problem_dir)

# Check if models are compatible; i.e., they have the same grounded predicates and actions (but each action has potentially different precondition or effect)
if pddlparser.are_models_compatible(model1, model2):
    print("Models are compatible!")
else:
    print("WARNING: Models are not compatible!")

# Map predicates to circuit variables
predicates_to_vars = {model1.predicates[i].pddl_str(): i for i in range(len(model1.predicates))}

Models are compatible!


In [3]:
# Takes initial state distribution, a list of PPDDL actions, and a model, and returns the final PC representing state distribution
def apply_action_sequence(initial_pc, action_sequence, model, predicates_to_vars, verbose=True):
    final_pc = initial_pc.copy()
    for t, a in enumerate(action_sequence):
        if verbose:
            print(f"Timestep t={t + 1}, action {a}")
        precondition = model.preconditions[a]
        effect = model.effects[a]

        final_pc = circuit.update_pc_with_action(final_pc, precondition, effect, predicates_to_vars)
    return final_pc

In [25]:
# We just want to initialize a distribution where the precondition holds for an action
action1 = 3

precondition1 = model1.preconditions[model1.actions[action1]]
effect1 = model1.effects[model1.actions[action1]]

action2 = 0

precondition2 = model1.preconditions[model1.actions[action2]]
effect2 = model1.effects[model1.actions[action2]]

In [27]:
initial_state_1 = circuit.initial_state_pc({p.pddl_str(): True for p in precondition1.literals}, predicates_to_vars)
initial_state_2 = circuit.initial_state_pc({p.pddl_str(): True for p in precondition2.literals}, predicates_to_vars)

In [28]:
circuit.precondition_holds(initial_state_2, precondition2, predicates_to_vars)

True

In [29]:
u1 = apply_action_sequence(initial_state_1, [model1.actions[action1], model1.actions[2], model1.actions[3], model1.actions[4]] * 3, model1, predicates_to_vars)
u2 = apply_action_sequence(initial_state_2, [model2.actions[action2], model2.actions[5], model2.actions[6], model2.actions[4]] * 3, model2, predicates_to_vars)

Timestep t=1, action grasp(gripper:manipulator,counter:location,canred:can,fetch:robot)
Timestep t=2, action move(tablered:location,counter:location,fetch:robot)
Timestep t=3, action grasp(gripper:manipulator,counter:location,canred:can,fetch:robot)
Timestep t=4, action teleport(tablered:location,fetch:robot)
Timestep t=5, action grasp(gripper:manipulator,counter:location,canred:can,fetch:robot)
Timestep t=6, action move(tablered:location,counter:location,fetch:robot)
Timestep t=7, action grasp(gripper:manipulator,counter:location,canred:can,fetch:robot)
Timestep t=8, action teleport(tablered:location,fetch:robot)
Timestep t=9, action grasp(gripper:manipulator,counter:location,canred:can,fetch:robot)
Timestep t=10, action move(tablered:location,counter:location,fetch:robot)
Timestep t=11, action grasp(gripper:manipulator,counter:location,canred:can,fetch:robot)
Timestep t=12, action teleport(tablered:location,fetch:robot)
Timestep t=1, action grasp(gripper:manipulator,tablered:location

In [30]:
import time

start = time.time()
dist = circuit.cw_dist(u1, u2)
end = time.time()
print(f"Distance of {dist} computed in {end - start} seconds!")

KeyboardInterrupt: 

In [31]:
u1.size()

29347

In [32]:
u2.size()

4433

In [33]:
start = time.time()
tv_dist, tv_assign = circuit.approximate_tv(u1, u2)
end = time.time()

print(f"Lower-bound TV distance of {tv_dist} in {end - start} seconds!")

Lower-bound TV distance of 0.7949905155801774 in 1.6690647602081299 seconds!


In [34]:
tv_assign

{3: 1, 6: 0, 5: 1, 4: 0, 2: 0, 0: 1, 7: 1, 1: 0}

In [35]:
u1.forward(tv_assign)

0.0

In [36]:
u2.forward(tv_assign)

0.7949905155801774