In [1]:
%load_ext autoreload
%autoreload 2

In [5]:
from action import Action
from PDDL import PDDL_Parser
import sys, pprint
from collections import OrderedDict
from typing import List, Tuple, Dict, Iterable
from collections import OrderedDict
import re, copy
import itertools
import z3
from skill_classes import EffectTypePDDL, SkillPDDL
from utils import product_dict, nested_list_replace, get_atoms

In [6]:
zeno_dom = "examples/existential-taxi/taxi-domain.pddl"
zeno_prob = "examples/existential-taxi/prob01.pddl"

In [7]:
# domain, problem = dinner_dom, dinner_prob
domain, problem = zeno_dom, zeno_prob

parser = PDDL_Parser()
print('----------------------------')
# pprint.pprint(parser.scan_tokens(problem))
print('----------------------------')
parser.parse_domain(domain)
parser.parse_problem(problem)
print('Domain name: ' + parser.domain_name)
print(f'~~~Types~~~')
for t in parser.types: print(t)
print(f'~~~Type Hierarchy~~~')
for k,v in parser.type_hierarchy.items():
    print(f"{k}: {v}")
print(f'~~~Predicates~~~')
for nm, args in parser.predicates.items():
    print(f"{nm}({args}")
#     print("")
print('----------------------------')
print(f'~~~Functions~~~')
for nm, args in parser.functions.items():
    print(f"{nm}({args}")
#     print("")
print('----------------------------')
print('~~~Actions~~~')
for act in parser.actions:
    print(act)
    # print("effect types:")
    # for ef in action2EffectTypes(act): print(f"\t{ef}")
    print("\n")
print('----------------------------')
print('Problem name: ' + parser.problem_name)
print('Objects: ' + str(parser.objects))
print('State: ' + str(parser.state))
print('Positive goals: ' + str(parser.positive_goals))
print('Negative goals: ' + str(parser.negative_goals))

----------------------------
----------------------------
Domain name: universal_multipasstaxi
~~~Types~~~
object
passenger
taxi
~~~Type Hierarchy~~~
object: ['passenger', 'taxi']
passenger: []
taxi: []
~~~Predicates~~~
passenger-in-taxi(OrderedDict([('?p', 'passenger'), ('?t', 'taxi')])
----------------------------
~~~Functions~~~
passenger-y(OrderedDict([('?p', 'passenger')])
passenger-x(OrderedDict([('?p', 'passenger')])
taxi-x(OrderedDict([('?t', 'taxi')])
taxi-y(OrderedDict([('?t', 'taxi')])
----------------------------
~~~Actions~~~
action: pickup
  parameters: [['?p', 'passenger'], ['?t', 'taxi']]
  positive_preconditions: [['=', ['passenger-x', '?p'], ['taxi-x', '?t']], ['=', ['passenger-y', '?p'], ['taxi-y', '?t']], ['forall', ['?pn', '-', 'passenger'], ['not', ['passenger-in-taxi', '?pn', '?t']]]]
  negative_preconditions: []
  add_effects: [['passenger-in-taxi', '?p', '?t']]
  del_effects: []



action: dropoff
  parameters: [['?p', 'passenger'], ['?t', 'taxi']]
  positive_p

In [8]:
def list_is_flat(l):
    for x in l: 
        if isinstance(x, list):
            return False
    return True

## Make z3 vars

Make all z3 bools

In [9]:
def make_z3_atoms(things_dict, z3_class, str2var = None):
    """
    things_dict should be parser.predicates or parser.functions
    z3_class should be z3.Bool or z3.Int
    str2var is the dictionary you want to update. If none, a new OrderedDict is created
    """
    if str2var is None:
        str2var = OrderedDict()
    for p_name, p_args in things_dict.items():
        # print(p_name)
        groundings = product_dict(**OrderedDict([(varnm, parser.get_objects_of_type(vartype)) for (varnm, vartype) in p_args.items()]))
        for x in groundings:
            # print(x)
            s = p_name + "(" + ", ".join(x.values()) + ")"
            # print(s)
            grounded_predicate = z3_class(s)
            assert s not in str2var.keys(), f"{s}: {str2var[s]}, {grounded_predicate}"
            str2var[s] = grounded_predicate
    return str2var

In [11]:
str2var = OrderedDict()
make_z3_atoms(parser.predicates, z3.Bool, str2var)
make_z3_atoms(parser.functions, z3.Int, str2var)
print(str2var)

OrderedDict([('passenger-in-taxi(curly, t0)', passenger-in-taxi(curly, t0)), ('passenger-y(curly)', passenger-y(curly)), ('passenger-x(curly)', passenger-x(curly)), ('taxi-x(t0)', taxi-x(t0)), ('taxi-y(t0)', taxi-y(t0))])


In [12]:
for k, v in str2var.items():
    print(k,v, type(v))

passenger-in-taxi(curly, t0) passenger-in-taxi(curly, t0) <class 'z3.z3.BoolRef'>
passenger-y(curly) passenger-y(curly) <class 'z3.z3.ArithRef'>
passenger-x(curly) passenger-x(curly) <class 'z3.z3.ArithRef'>
taxi-x(t0) taxi-x(t0) <class 'z3.z3.ArithRef'>
taxi-y(t0) taxi-y(t0) <class 'z3.z3.ArithRef'>


In [13]:
# Will use this in AST type parser for complex preconditions. Not yet in use. Maybe useful to look at how we traversed the ast in the RDDL parser
str2operator = {
    "<": lambda a, b: a < b,
    "<=": lambda a, b: a <= b,
    ">": lambda a, b: a > b,
    ">=": lambda a, b: a >= b,
    "=": lambda a, b: a == b,
    "*": lambda a, b: a * b,
    "+": lambda a, b: a + b,
    "/": lambda a, b: a / b,
    "-": lambda a, b: a - b,
    "increase": lambda a, b: a + b,
    "decrease": lambda a, b: a - b,
    "assign": lambda a, b: b
}

In [14]:
a = list(parser.actions)[2]
for p in a.positive_preconditions: print(p)

['forall', ['?pn', '-', 'passenger'], ['not', ['passenger-in-taxi', '?pn', '?t']]]


In [15]:
def list2var_str(x):
    return x[0] + "(" + ", ".join(x[1:]) + ")"

# Recursive condition compiler

The compiler can be passed:
1. A flat list of strings to convert into a z3 var
2. A nested list, where each sublist must be converted into a z3 expression before being combined via the operator, which is the first element


TODO give compile_condition and compile_precondition more intuitive names

In [16]:
def compile_expression(expr):
    if isinstance(expr, int):
        return expr
    if isinstance(expr, str):
        try:
            expr = int(expr)
            return expr
        # TODO deal with other cases? What do pvars with no args look like?
        except ValueError as e:
            raise e
    if isinstance(expr, list):
        if list_is_flat(expr):
            return str2var[list2var_str(expr)]
        else:
            assert len(expr) == 3, f"Don't understand how to compile: {expr}"
            if operator == "forall":
                pass
            else:
                operator = str2operator[expr[0]]
            operator_args = [compile_expression(x) for x in expr[1:]]
            return operator(*operator_args)
    else:
        raise NotImplementedError(f"Don't understand how to compile non lists: {expr}; {type(expr)}")
            

In [17]:
print(a)

action: move-north
  parameters: [['?t', 'taxi']]
  positive_preconditions: [['forall', ['?pn', '-', 'passenger'], ['not', ['passenger-in-taxi', '?pn', '?t']]]]
  negative_preconditions: []
  add_effects: [['increase', ['taxi-y', '?t'], '1']]
  del_effects: []



In [18]:
str_grounded_actions = [parser.get_action_groundings(a) for a in parser.actions]
print("\n".join([str(x) for x in str_grounded_actions[0]]))

action: pickup
  parameters: [['curly', 'passenger'], ['t0', 'taxi']]
  positive_preconditions: [['=', ['passenger-x', 'curly'], ['taxi-x', 't0']], ['=', ['passenger-y', 'curly'], ['taxi-y', 't0']], ['forall', ['?pn', '-', 'passenger'], ['not', ['passenger-in-taxi', '?pn', 't0']]]]
  negative_preconditions: []
  add_effects: [['passenger-in-taxi', 'curly', 't0']]
  del_effects: []



In [21]:
def action2precondition(a: Action) -> z3.BoolRef:
    clauses = [compile_expression(p) for p in a.positive_preconditions] + [z3.Not(compile_expression(p)) for p in a.negative_preconditions]
    return z3.And(*clauses)

In [22]:
def z3_identical(a, b):
    return a.sort() == b.sort() and str(a) == str(b)

In [24]:
a, b, c = z3.Ints("a b c")
dum = a + b < c
print(dum)
[x for x in get_atoms(dum) if not z3_identical(x,a)]

a + b < c


[b, c]

In [31]:
def action2effect_types(a: Action) -> List[EffectTypePDDL]:
    effect_types = []

    for eff in a.add_effects:
        # Check for complex effects like 'increase'
        if eff[0] in ["increase", 'decrease']:
            effected_var = str2var[list2var_str(eff[1])]
            effect_details = compile_expression(eff)
            params = tuple([x for x in get_atoms(effect_details) if not z3_identical(x, effected_var)])
            effect_type = EffectTypePDDL(effected_var, effect_details, params=params)
            effect_types.append(effect_type)
        else:
            effected_var = compile_expression(eff)
            # This may accidentally cause different EfectTypes to be identified bc True == 1.
            effect_details = True
            effect_type = EffectTypePDDL(effected_var, effect_details)
            effect_types.append(effect_type)
    # Assume for now that del_effects only sets bools to false
    for eff in a.del_effects:
        effected_var = compile_expression(eff)
        # This may accidentally cause different EfectTypes to be identified bc True == 1.
        
        effect_details = False
        effect_type = EffectTypePDDL(effected_var, effect_details)
        effect_types.append(effect_type)
    return tuple(effect_types)  


In [32]:
?SkillPDDL

[0;31mInit signature:[0m
[0mSkillPDDL[0m[0;34m([0m[0;34m[0m
[0;34m[0m    [0mprecondition[0m[0;34m:[0m[0mz3[0m[0;34m.[0m[0mz3[0m[0;34m.[0m[0mExprRef[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0maction[0m[0;34m:[0m[0mUnion[0m[0;34m[[0m[0mstr[0m[0;34m,[0m [0mList[0m[0;34m[[0m[0mstr[0m[0;34m][0m[0;34m,[0m [0mTuple[0m[0;34m[[0m[0mstr[0m[0;34m][0m[0;34m][0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0meffects[0m[0;34m:[0m[0mUnion[0m[0;34m[[0m[0mIterable[0m[0;34m[[0m[0mskill_classes[0m[0;34m.[0m[0mEffectTypePDDL[0m[0;34m][0m[0;34m,[0m [0mskill_classes[0m[0;34m.[0m[0mEffectTypePDDL[0m[0;34m][0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mside_effects[0m[0;34m:[0m[0mUnion[0m[0;34m[[0m[0mIterable[0m[0;34m[[0m[0mskill_classes[0m[0;34m.[0m[0mEffectTypePDDL[0m[0;34m][0m[0;34m,[0m [0mskill_classes[0m[0;34m.[0m[0mEffectTypePDDL[0m[0;34m][0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;

In [33]:
def action2skill(a):
    precondition = action2precondition(a)
    effects = action2effect_types(a)
    skill = SkillPDDL(precondition, a.name, effects)
    return skill

In [34]:
for a_id in range(len(str_grounded_actions)):
    a = str_grounded_actions[a_id][0]
    print(a)
    print(action2skill(a))

action: board
  parameters: [['person1', 'person'], ['plane1', 'aircraft'], ['city0', 'city']]
  positive_preconditions: [['located', 'person1', 'city0'], ['located', 'plane1', 'city0']]
  negative_preconditions: []
  add_effects: [['in', 'person1', 'plane1'], ['increase', ['onboard', 'plane1'], '1']]
  del_effects: [['located', 'person1', 'city0']]

action: debark
  parameters: [['person1', 'person'], ['plane1', 'aircraft'], ['city0', 'city']]
  positive_preconditions: [['in', 'person1', 'plane1'], ['located', 'plane1', 'city0']]
  negative_preconditions: []
  add_effects: [['located', 'person1', 'city0'], ['decrease', ['onboard', 'plane1'], '1']]
  del_effects: [['in', 'person1', 'plane1']]

action: fly-slow
  parameters: [['plane1', 'aircraft'], ['city0', 'city'], ['city0', 'city']]
  positive_preconditions: [['located', 'plane1', 'city0'], ['>=', ['fuel', 'plane1'], ['*', ['distance', 'city0', 'city0'], ['slow-burn', 'plane1']]]]
  negative_preconditions: []
  add_effects: [['locat

In [35]:
a = str_grounded_actions[a_id][0]

In [36]:
action2effect_types(a)

(ET(capacity(plane1),True,()),)

In [21]:
for a_id in range(len(str_grounded_actions)):
    a = str_grounded_actions[a_id][0]
    print(a)
    for p in a.positive_preconditions:
        print(p)
        print(compile_expression(p))
        print("\n")

action: board
  parameters: [['person1', 'person'], ['plane1', 'aircraft'], ['city0', 'city']]
  positive_preconditions: [['located', 'person1', 'city0'], ['located', 'plane1', 'city0']]
  negative_preconditions: []
  add_effects: [['in', 'person1', 'plane1'], ['increase', ['onboard', 'plane1'], '1']]
  del_effects: [['located', 'person1', 'city0']]

['located', 'person1', 'city0']
located(person1, city0)


['located', 'plane1', 'city0']
located(plane1, city0)


action: debark
  parameters: [['person1', 'person'], ['plane1', 'aircraft'], ['city0', 'city']]
  positive_preconditions: [['in', 'person1', 'plane1'], ['located', 'plane1', 'city0']]
  negative_preconditions: []
  add_effects: [['located', 'person1', 'city0'], ['decrease', ['onboard', 'plane1'], '1']]
  del_effects: [['in', 'person1', 'plane1']]

['in', 'person1', 'plane1']
in(person1, plane1)


['located', 'plane1', 'city0']
located(plane1, city0)


action: fly-slow
  parameters: [['plane1', 'aircraft'], ['city0', 'city'], ['

In [22]:
for a_id in range(len(str_grounded_actions)):
    a = str_grounded_actions[a_id][0]
    print(a)
    print(action2precondition(a))
    print("\n")

action: board
  parameters: [['person1', 'person'], ['plane1', 'aircraft'], ['city0', 'city']]
  positive_preconditions: [['located', 'person1', 'city0'], ['located', 'plane1', 'city0']]
  negative_preconditions: []
  add_effects: [['in', 'person1', 'plane1'], ['increase', ['onboard', 'plane1'], '1']]
  del_effects: [['located', 'person1', 'city0']]

And(located(person1, city0), located(plane1, city0))


action: debark
  parameters: [['person1', 'person'], ['plane1', 'aircraft'], ['city0', 'city']]
  positive_preconditions: [['in', 'person1', 'plane1'], ['located', 'plane1', 'city0']]
  negative_preconditions: []
  add_effects: [['located', 'person1', 'city0'], ['decrease', ['onboard', 'plane1'], '1']]
  del_effects: [['in', 'person1', 'plane1']]

And(in(person1, plane1), located(plane1, city0))


action: fly-slow
  parameters: [['plane1', 'aircraft'], ['city0', 'city'], ['city0', 'city']]
  positive_preconditions: [['located', 'plane1', 'city0'], ['>=', ['fuel', 'plane1'], ['*', ['dis

In [23]:
# type(z3.BoolRef(True)) # This breaks the kernel :/