In [54]:
import z3
from enum import Enum

In [35]:
# Utility/Relational Functions
def part_with_conditions(key, conditions):
    return {"key": key, "conditions": conditions, "relation": "part-with-condition"}

def agent_with_conditions(key, conditions): 
    return {"key": key, "conditions": conditions, "relation": "agent-with-condition"}

def all_of(conditions): 
    return {"conditions":conditions,"relation":"all-of"}

def any_of(conditions):
    return {"conditions":conditions,"relation":"any-of"}

def has_property_ge(prop, value):
    return {"property":prop,"value":value,"relation":"has-property-ge"}

def has_property_le(prop, value):
    return {"property":prop,"value":value,"relation":"has-property-le"}

def has_property_lt(prop, value):
    return {"property":prop,"value":value,"relation":"has-property-lt"}

def has_property_gt(prop, value):
    return {"property":prop,"value":value,"relation":"has-property-gt"}

def property_add(value1,value2):
    return {"value1":value1,"value2":value2,"relation":"property-add"}

def property_subtract(value1,value2):
    return {"value1":value1,"value2":value2,"relation":"property-subtract"}

def property_multiply(value1,value2):
    return {"value1":value1,"value2":value2,"relation":"property-multiply"}

def property_divide(value1,value2):
    return {"value1":value1,"value2":value2,"relation":"property-divide"}

def property_not(value):
    return {"value":value,"relation":"property-not"}

def is_part(part):
    return {"part":part,"relation":"is-part"}

def is_agent(agent):
    return {"agent": agent, "relation": "is-agent"}

def part_location(location):
    return {"property":"location","value":location,"relation":"has-property"}

def has_property(prop,value):
    return {"property":prop,"value":value,"relation":"has-property"}

def created_part(key, conditions):
    return {"key": key, "conditions": conditions, "relation": "created-part"}

def destroyed_part(key):
    return {"key": key, "relation": "destroyed-part"}

def previous_property(key, prop):
    return {"key":key, "prop":prop, "relation": "previous-property"}

def current_property(key, prop):
    return {"key":key, "prop":prop, "relation": "current-property"}

In [30]:
agents = {
    "panda": {"name":"Panda","type":"robot"},
    "charlie": {"name":"Charlie","type":"human"}
}

In [31]:
locations = {
    "C":{"name":"Core Source"},
    "G":{"name":"Gasket Source"},
    "P":{"name":"Pan Source"},
    "R1":{"name":"Pan Caulk Area 1"},
    "R2":{"name":"Pan Caulk Area 2"},
    "T1":{"name":"Gasket Caulk Area 1"},
    "T2":{"name":"Gasket Caulk Area 2"},
    "T3":{"name":"Gasket Caulk Area 3"},
    "T4":{"name":"Gasket Caulk Area 4"},
    "W":{"name":"Main Workspace"}
}

In [241]:
parts = {
    "pan": {
        "name":"Pan",
        "properties":{
            "taped":{"type":"bool","default":False},
            "caulked":{"type":"bool","default":False}
        },
        "spawn":"P"
    },
    "gasket": {
        "name":"Gasket",
        "properties":{
            "caulked":{"type":"bool","default":False}
        },
        "spawn":"G"
    },
    "core": {
        "name":"Core",
        "properties":{
            "pans": {"type":"int","default":0,"max":2},
            "gaskets": {"type":"int","default":0,"max":4},
            "rivets": {"type":"int","default":0,"max":16},
            "cleaned": {"type":"bool","default":False}
        },
        "spawn":"C"
    },
}

In [33]:
tasks = {
    "start_core": { 
        "children": [
            {
                "id":"A0",
                "name":"Move core to W",
                "pre":[
                    part_with_conditions("initial-core", [
                        is_part("core"),
                        has_property("pans", 0),
                        has_property("gaskets", 0),
                        part_location("C")
                    ])
                ],
                "post":[
                    part_with_conditions("initial-core", [
                        part_location("W")
                    ])
                ],
                "primitives":[]
            }
        ]
    },
    "handle_pan": {
        "children": [
            {
                "id":"B0",
                "name":"Apply tape to pan",
                "pre":[
                    part_with_conditions("initial-pan", [
                        is_part("pan"),
                        has_property("taped", False),
                        part_location("P")
                    ])
                ],
                "post":[
                    part_with_conditions("initial-pan", [
                        has_property("taped", True)
                    ])
                ],
                "primitives":[]
            },
            {
                "id":"B1",
                "name":"Caulk Pan",
                "pre":[
                    part_with_conditions("taped-pan", [
                        is_part("pan"),
                        has_property("taped", True),
                        has_property("caulked", False),
                        part_location("P")
                    ])
                ],
                "post":[
                    part_with_conditions("taped-pan", [
                        has_property("caulked", True)
                    ])],
                "primitives":[]
            }
        ]
    },
    "attach_pan": {
        "children": [
            {
                "id":"C0",
                "name":"Move Pan to W",
                "pre":[
                    part_with_conditions("caulked-pan", [
                        is_part("pan"),
                        has_property("taped", True),
                        has_property("caulked", True),
                        part_location("P")
                    ])
                ],
                "post":[
                    part_with_conditions("taped-pan", [
                        part_location("W"),
                    ])
                ],
                "primitives":[]
            },
            {
                "id":"C1",
                "name":"Apply Pan 1 to Core",
                "pre":[
                    part_with_conditions("caulked-pan-input", [
                        is_part("pan"),
                        has_property("caulked",True),
                        part_location("W")
                    ]),
                    part_with_conditions("panned-core", [
                        is_part("core"),
                        part_location("W"),
                        has_property_ge("pans", 0),
                        has_property_le("pans", 1)
                    ])
                ],
                "post":[
                    part_with_conditions("panned-core", [
                        has_property("pans",property_add(previous_property("panned-core", "pans"), 1))
                    ]),
                    destroyed_part("caulked-pan-input")
                ],
                "primitives":[]
            },
            {
                "id":"C2",
                "name":"Cleanup Core",
                "pre":[
                    part_with_conditions("panned-core", [
                        is_part("core"),
                        part_location("W"),
                        has_property("pans", 2),
                        has_property("cleaned", False)
                    ])
                ],
                "post":[
                    part_with_conditions("panned-core", [
                        has_property("cleaned", True)
                    ])],
                "primitives":[]
            }
        ]
    },
    "handle_gasket": {
        "children": [
            {
                "id":"D0",
                "name":"Move Gasket to T",
                "pre":[
                    part_with_conditions("default-gasket", [
                        is_part("gasket"),
                        has_property("caulked", False),
                        part_location("G")
                    ])
                ],
                "post":[
                    part_with_conditions("default-gasket", [
                        any_of([
                            part_location("T1"),
                            part_location("T2"),
                            part_location("T3"),
                            part_location("T4"),
                        ])
                    ])
                ],
                "primitives":[]
            },
            {
                "id":"D1",
                "name":"Caulk Gasket",
                "pre":[
                    part_with_conditions("caulked-gasket", [
                        is_part("gasket"),
                        has_property("caulked", False),
                        any_of([
                            part_location("T1"),
                            part_location("T2"),
                            part_location("T3"),
                            part_location("T4"),
                        ])
                    ])
                ],
                "post":[
                    
                    part_with_conditions("caulked-gasket", [
                        has_property("caulked", True),
                    ])
                ],
                "primitives":[]
            }
        ]
    },
    "attach_gasket": {
        "children": [
            {
                "id":"E0",
                "name":"Move Gasket to W",
                "pre":[
                    part_with_conditions("caulked-gasket", [
                        is_part("gasket"),
                        has_property("caulked", True),
                        any_of([
                            part_location("T1"),
                            part_location("T2"),
                            part_location("T3"),
                            part_location("T4"),
                        ])
                    ])
                ],
                "post":[
                    part_with_conditions("caulked-gasket", [
                        part_location("W")
                    ])
                ],
                "primitives":[]
            },
            {
                "id":"E1",
                "name": "Apply Gasket to Core",
                "pre": [
                    part_with_conditions("default-gasket", [
                        is_part("gasket"),
                        has_property("caulked", True),
                        part_location("W")
                    ]),
                    part_with_conditions("cleaned-core", [
                        is_part("core"),
                        has_property("cleaned", True),
                        has_property("pans", 2),
                        has_property_ge("gaskets",0),
                        has_property_le("gaskets",3),
                        has_property("rivets", property_multiply(current_property("cleaned-core", "gaskets"), 4)),
                        part_location("W")
                    ]),
                ],
                "post": [
                    part_with_conditions("cleaned-core", [
                        has_property("gaskets", property_add(previous_property("cleaned-core", "gaskets"), 1)),
                    ]),
                    destroyed_part("default_gasket")
                ],
                "primitives":[]
            },
            {
                "id":"E2",
                "name":"Apply Pop-Rivets to Core",
                "pre":[
                    part_with_conditions("gasketed-core", [
                        is_part("core"),
                        has_property("pans", 2),
                        has_property("cleaned", True),
                        has_property_ge("gaskets",0),
                        has_property_le("gaskets",4),
                        has_property("rivets", property_multiply(property_subtract(current_property("gasketed-core", "gaskets"),1), 4)),
                        part_location("W")
                    ])
                ],
                "post":[
                    part_with_conditions("cleaned-core", [
                        has_property("gaskets", property_add(previous_property("gasketed-core", "gaskets"), 4)),
                    ]),
                ],
                "primitives":[]
            },
        ]
    },
}

In [None]:
tasks

In [36]:
def state_matches_preconditions(state,action): 
    matches = False
    for pre in action["pre"]:
        pre_condition_matches_one_entry = True
        for condition in pre["conditions"]:
            for entry in state:
                if not entry_matches_precondition(condition,state[entry]):
                    entry_matches = False
        if not entry_matches and matches:
            matches = False
    return matches

def coerced_part(entry):
    if entry["type"] == "spawner":
        new_part = {}
        for prop in entry["part"]["properties"]:
            new_part[prop] = entry["part"]["properties"]["default"]
        new_part["location"] = entry["part"]["spawner"]
        return new_part
    else:
        return entry

def entry_matches_precondition(condition,entry,state) -> bool:
    if condition["relation"] == "part-with-condition":
        part_identifier = condition["key"]
        state.update({[part_identifier]:entry})
        passes = True
        for inner_condition in condition["conditions"]:
            if not entry_matches_precondition(inner_condition,entry,state):
                passes = False
                break
        return passes
    elif condition["relation"] == "agent-with-condition":
        pass
    elif condition["relation"] == "any-of":
        passes = False
        for inner in condition["conditions"]:
            if entry_matches_precondition(inner,entry):
                passes = True
                break
        return passes
    elif condition["relation"] == "all-of":
        passes = True
        for inner in condition["conditions"]:
            if not entry_matches_precondition(inner,entry):
                passes = False
                break
        return passes
    elif condition["relation"] == "has-property-ge":
        # Get the coerced part (if spawner)
        part = coerced_part(entry)
        # Evaluate the part
        return part[condition["property"]] >= condition["value"]
    elif condition["relation"] == "has-property-gt":
        # Get the coerced part (if spawner)
        part = coerced_part(entry)
        # Evaluate the part
        return part[condition["property"]] > condition["value"]
    elif condition["relation"] == "has-property-le":
        # Get the coerced part (if spawner)
        part = coerced_part(entry)
        # Evaluate the part
        return part[condition["property"]] <= condition["value"]
    elif condition["relation"] == "has-property-lt":
        # Get the coerced part (if spawner)
        part = coerced_part(entry)
        # Evaluate the part
        return part[condition["property"]] < condition["value"]
    elif condition["relation"] == "property-add":
        pass
    elif condition["relation"] == "property-subtract":
        pass
    elif condition["relation"] == "property-multiply":
        pass
    elif condition["relation"] == "property-divide":
        pass
    elif condition["relation"] == "is-part":
        # Get the coerced part (if spawner)
        part = coerced_part(entry)
        # Evaluate the part
        return part["type"] == condition["part"]
    elif condition["relation"] == "is-agent":
        pass
    elif condition["relation"] == "has-property":
        # Get the coerced part (if spawner)
        part = coerced_part(entry)
        # Evaluate the part
        return part[condition["property"]] == condition["value"]
    elif condition["relation"] == "created-part":
        pass
    elif condition["relation"] == "destroyed-part":
        pass
    elif condition["relation"] == "previous-property":
        pass
    elif condition["relation"] == "current-property":
        return entry_matches_precondition(condition[""])
        pass

def state_with_updates(state,action):
    new_state = state

    for post in action["post"]:
        for condition in post["conditions"]:
            # TODO add/update post condition to the state
            # for entry in state:
                # if entry_matches_precondition(condition,state[entry]):
                    
            pass
    return new_state

In [28]:
# List of states
states = [] # This will probably be a database or lookup later

# Assign all agents as not busy
initial_state = {}
# Agents start as not busy
for a in agents:
    initial_state[a] = agent_with_conditions(a, [
        is_agent(a),
        has_property("busy", None)
    ])
# Spawn all the parts
for p in parts:
    # TODO
    part_info = parts[p]
    part_info.update({"type":p})
    initial_state[p+"_spawner"] = {"type":"spawner","part":part_info}
# Append the initial state
states.append(initial_state)
for state in states:
    for prop in state:
        print(prop+":")
        print(state[prop])


def agent_actions(a, task):
    return [
        {
            "id":"AgentNoneToSome",
            "name":"Toggles whether the agent is busy or not",
            "pre":[
                agent_with_conditions(a, [
                    is_agent(a),
                    has_property("busy", None)
                ])
            ],
            "post":[
                agent_with_conditions(a, [
                    has_property("busy", task)
                ])
            ],
            "primitives":[]
        },
        {
            "id":"AgentSomeToNone",
            "name":"Toggles whether the agent is busy or not",
            "pre":[
                agent_with_conditions(a, [
                    is_agent(a),
                    has_property("busy", task)
                ])
            ],
            "post":[
                agent_with_conditions(a, [
                    has_property("busy", None)
                ])
            ],
            "primitives":[]
        },
        {
            "id":"AgentSomeToSome",
            "name":"Toggles whether the agent is busy or not",
            "pre":[
                agent_with_conditions(a, [
                    is_agent(a),
                    has_property("busy", task)
                ])
            ],
            "post":[
                agent_with_conditions(a, [
                    has_property("busy", previous_property(a, "busy"))
                ])
            ],
            "primitives":[]
        },
        ]

# Mapping of transitions between states
# Example: states[3] can transition to states[7]
# So: transitions[3] = [7] or 7 in transitions[3]
transitions = {} # This will probably be a database or lookup later

# State exploration/expansion based on actions
for ind, state in enumerate(states):
    for task in tasks:
        for action in tasks[task]["children"]:
            # Make sure we can use the action on the given state
            if state_matches_preconditions(state, action):
                for a in agents:
                    possible_actions = agent_actions(a, action)

                    # Check if we can mark the current agent as busy
                    if state_matches_preconditions(state, possible_actions[0]):
                        # Mark agent as busy
                        transition_state = state_with_updates(state, possible_actions[0])

                        # Agent completes the action
                        update_with_action = state_with_updates(transition_state, action)
                        # Toggle the agent back to not busy
                        new_state = state_with_updates(update_with_action, possible_actions[1])

                        # Add agent busy state to state space
                        if transition_state not in states:
                            states.append(transition_state)
                            i = len(states) - 1
                        else:
                            i = states.index(transition_state)

                        # Add transition to agent busy state
                        if ind not in transitions:
                            transitions[ind] = [i]
                        else:
                            transitions[ind].append(i)

                        # Add final state
                        if new_state not in states:
                            states.append(new_state)
                            final_i = len(states) - 1
                        else:
                            final_i = states.index(new_state)

                        # Add final transition
                        if i not in transitions:
                            transitions[i] = [final_i]
                        else:
                            transitions[i].append(final_i)
print(states)
# State expansion based on agents
# for ind, state in enumerate(states):
#     # for every s -> t, make it s -> i -> t where i is the state that agent_i is busy doing that task
#     possible_transitions = transitions[ind]
#     for a in agents:
#         if state_matches_preconditions(state, busy_toggle):
#             new_state = state_with_updates(state, busy_toggle)
#             pass


Panda:
{'key': 'Panda', 'conditions': [{'agent': 'Panda', 'relation': 'is-agent'}, {'property': 'busy', 'value': None, 'relation': 'has-property'}], 'relation': 'agent-with-condition'}
Charlie:
{'key': 'Charlie', 'conditions': [{'agent': 'Charlie', 'relation': 'is-agent'}, {'property': 'busy', 'value': None, 'relation': 'has-property'}], 'relation': 'agent-with-condition'}
pan_spawner:
{'type': 'spawner', 'part': {'name': 'Pan', 'properties': {'taped': {'type': 'bool', 'default': False}, 'caulked': {'type': 'bool', 'default': False}}, 'spawn': 'P', 'key': 'pan'}}
gasket_spawner:
{'type': 'spawner', 'part': {'name': 'Gasket', 'properties': {'caulked': {'type': 'bool', 'default': False}}, 'spawn': 'G', 'key': 'gasket'}}
core_spawner:
{'type': 'spawner', 'part': {'name': 'Core', 'properties': {'pans': {'type': 'int', 'default': 0, 'max': 2}, 'gaskets': {'type': 'int', 'default': 0, 'max': 4}, 'rivets': {'type': 'int', 'default': 0, 'max': 16}, 'cleaned': {'type': 'bool', 'default': False}

NameError: name 'state_matches_preconditions' is not defined

In [246]:
class Condition(object):
    """
    A base class for all properties
    """
    def __init__(self):
        super(Property,self).__init__()

class HasProperty(Condition):
    """
    A condition that specifies an entry has a certain property
    """

class HasType(Condition):
    """
    A condition that specifies an entry has a certain type
    """


class State(object):

    def __init__(self, conditions=[]):
        self.conditions = conditions

    def add_condition(self,condition):
        self.conditions.append(condition)

    def update_condition(self,condition):
        if self.check([condition]):
            self.conditions = [condition]+self.conditions
        else:
            s = z3.Solver()
            new_conditions = []
            for candidate_condition in [condition]+self.conditions:
                s.push()
                s.add(candidate_condition)
                if s.check() == z3.sat:
                    new_conditions.append(candidate_condition)
                else:
                    s.pop()
            self.conditions = new_conditions

    def check(self,conditions=[]):
        s = z3.Solver()
        s.add(
            *conditions,
            *self.conditions
        )
        return s.check()==z3.sat

    def transition(self,conditions):
        new_state = State(self.conditions)
        for condition in conditions:
            new_state.update_condition(condition)
        return new_state
    
    def __eq__(self,other):
        # s = z3.Solver()
        rules1 = sorted([str(z3.simplify(condition)) for condition in self.conditions])
        rules2 = sorted([str(z3.simplify(condition)) for condition in other.conditions])

        return rules1==rules2
    
    def __str__(self):
        return str(z3.simplify(z3.And(*self.conditions)))
    
    def __repr__(self):
        return str(z3.simplify(z3.And(*self.conditions)))
        


In [252]:
IdentifierSort = z3.DeclareSort("IdentifierSort")
PropertyValue = z3.Datatype("PropertyValue")
PropertyValue.declare("bool",("value",z3.BoolSort()))
PropertyValue.declare("int",("value",z3.IntSort()))
PropertyValue.declare("id",("value",IdentifierSort))
PropertyValue = PropertyValue.create()

def spec_to_z3(part_spec,agent_spec):
    t = z3.Datatype("Part")
    for key in part_spec:
        z3_props = []
        for prop in part_spec[key]["properties"]:
            if part_spec[key]["properties"][prop]["type"] == "int":
                z3_props.append((key+"_"+prop,z3.RealSort()))
            elif part_spec[key]["properties"][prop]["type"] == "bool":
                z3_props.append((key+"_"+prop,z3.BoolSort()))
        z3_props.append((key+"_location",IdentifierSort))
        t.declare(key,*z3_props)
    for key in agent_spec:
        z3_props = [(key+"_busy",IdentifierSort),(key+"location",IdentifierSort)]
        t.declare(key,*z3_props)
    t = t.create()
    return t


P = z3.Const("P",IdentifierSort)
G = z3.Const("G",IdentifierSort)
C = z3.Const("C",IdentifierSort)
Entry = spec_to_z3(parts,agents)
entries = z3.Function("entries",IdentifierSort,Entry)

pan1 = Entry.pan(z3.Bool("pan1_taped"),z3.Bool("pan1_caulked"),z3.Const("pan1_location",IdentifierSort))
pan2 = Entry.pan(z3.Bool("pan2_taped"),z3.Bool("pan2_caulked"),z3.Const("pan2_location",IdentifierSort))
gasket1 = Entry.gasket(z3.Bool("gasket1_caulked"),z3.Const("pan3_location",IdentifierSort))
core1 = Entry.core(z3.Real("core1_pans"),z3.Real("core1_gaskets"),z3.Real("core1_rivets"),z3.Bool("core1_cleaned"),z3.Const("core1_location",IdentifierSort))
x, y, z = z3.Consts("x y z", IdentifierSort)
initial_state = State([entries(x)==pan1,entries(y)==pan2,Entry.pan_taped(entries(x))==False,Entry.pan_taped(entries(y))==False])

print(initial_state)
print(initial_state.check())
new_state = initial_state.transition([Entry.pan_taped(entries(y))==True])

print(new_state)
print(new_state.check())
print(initial_state==new_state)

And(entries(x) ==
    pan(pan1_taped, pan1_caulked, pan1_location),
    entries(y) ==
    pan(pan2_taped, pan2_caulked, pan2_location),
    Not(pan_taped(entries(x))),
    Not(pan_taped(entries(y))))
True
And(pan_taped(entries(y)),
    entries(x) ==
    pan(pan1_taped, pan1_caulked, pan1_location),
    entries(y) ==
    pan(pan2_taped, pan2_caulked, pan2_location),
    Not(pan_taped(entries(x))))
True
False


In [181]:
s1 = State([z3.Int("b")==2,z3.Int("a")==1])
s2 = State([z3.Int("a")==1,z3.Int("b")==2])
s1 == s2

True