In [12]:
from collections import namedtuple
from typing import List

import clingo
from clingox.backend import SymbolicBackend

In [13]:
inst1 = """

#program static.

node(1).
node(2).
node(3).

edge((1,2)).
edge((1,3)).
edge((2,1)).
edge((2,3)).
edge((3,1)).
edge((3,2)).

edge_weight((1,2), 10).
edge_weight((2,3), 10).
edge_weight((3,1), 10).
edge_weight((2,1), 100).
edge_weight((3,2), 100).
edge_weight((1,3), 100).

"""

In [14]:
reasoning = """

#program types(t).

action(walk(E,t)) :- edge(E).
fluent(tour(t)).
fluent(incomplete(t)).
fluent(goal(t)).

#program initial.

#program dynamic(t).

1 { walk((1,W),t) : edge((1,W)) } 1 :- t = 1.
1 { walk((V,W),t) : edge((V,W)), walk((U,V),t-1) } 1 :- t > 1.

tour(t) :- walk((W,1),t).
incomplete(t) :- edge((U,V)), edge((V,U)), not walk((U,V),_), not walk((V,U),_).

#program final(t).

goal(t) :- tour(t), not incomplete(t).

"""

In [15]:
program_types = clingo.Control()
program_types.configuration.solve.models = 0

action_list = []
fluent_list = []
initial_states = []

In [16]:
program_types.add("base", (), inst1)
program_types.add("base", (), reasoning)
program_types.ground([('initial', []),('types', [clingo.Number(0)]),('static', [])])
with program_types.solve(yield_ = True) as solve_handle:
    for model in solve_handle:
        initial_state = []
        for atom in model.symbols(atoms = True):
            if atom.match('action', 1):
                action_list.append(atom.arguments[0])
            elif atom.match('fluent', 1):
                fluent_list.append(atom.arguments[0])
            elif atom in fluent_list or atom in action_list:
                initial_state.append(atom)
        initial_states.append(initial_state)

print("Actions: {}".format(', '.join(map(str, sorted(action_list)))))
print("Fluents: {}".format(', '.join(map(str, sorted(fluent_list)))))
print("Initial States: {}\n{}\n{}".format('{', '\n'.join('[ ' + ', '.join(map(str, sorted(s))) for s in initial_states) + ' ]', '}'))

Actions: walk((1,2),0), walk((1,3),0), walk((2,1),0), walk((2,3),0), walk((3,1),0), walk((3,2),0)
Fluents: goal(0), incomplete(0), tour(0)
Initial States: {
[  ]
}


<block>:18:48-61: info: atom does not occur in any rule head:
  walk((#X0,#X1),#P2)



In [17]:
program_dynamic = clingo.Control()
program_dynamic.configuration.solve.models = 0

program_dynamic.add('base', (), inst1)
program_dynamic.add('base', (), reasoning)

with SymbolicBackend(program_dynamic.backend()) as sb:
    for atom in fluent_list:
        sb.add_external(atom)
    for atom in action_list:
        sb.add_external(atom)
program_dynamic.ground([('base', []), ('dynamic', [clingo.Number(1)]), ('static', []), ('final', [clingo.Number(1)])])

In [18]:
class SearchNode(namedtuple('SearchNode', ['state', 'parent', 'action', 'path_cost', 'depth', 'priority'])):
    pass

In [19]:
def successors(domain : clingo.Control, state, actions, fluents):
    priority = -1
    succ_states: List[tuple] = []
    old_ones = state[:]
    for atom in state: # type: clingo.Symbol
        domain.assign_external(atom, True)

    with domain.solve(yield_=True) as solve_handle:
        for model in solve_handle:
            priority = -1
            cost = -1
            atoms = []
            performed_actions = []
            for atom in model.symbols(atoms=True):
                if atom.match('cost', 1):
                    cost = atom.arguments[0].number
                t = atom.arguments[-1]
                if t.type is clingo.SymbolType.Number and t.number == 1:
                    reset = clingo.Function(atom.name, atom.arguments, atom.positive)
                    reset.arguments[-1] = clingo.Number(0)
                    if reset in fluents:
                        atoms.append()
                        old_ones.append(atom)
                    elif reset in actions:
                        performed_actions.append(reset)
                        old_ones.append(atom)
            succ_states.append((performed_actions, atoms, int(priority),int(cost)))

    for atom in old_ones:
        domain.assign_external(atom, False)
    return succ_states

In [20]:
def expand(domain, node: SearchNode, actions, fluents):
    succs = []
    cost_value = 0
    priority_value = 0
    for action, result, priority_value, cost in successors(domain, node.state, actions, fluents):
        cost_value = cost or node.path_cost + 1

        new = SearchNode(result, node, action, cost_value, node.depth + 1, priority_value)
        succs.append(new)
    return succs

In [21]:
class GraphSearch:
    def __init__(self):
        self.domain = None



    def search(self, initial, fringe:list, actions, fluents):
        """ Searches for a plan """

        max_size = 0 # Maximum number of nodes in the fringe
        explored = 0 # Number of explored nodes
        visited = set() # Set of visited nodes
        root_node = SearchNode(initial, None, [], 0, 0, 0) # Node which stores the initial state
        fringe.append(root_node)
        goal = clingo.Function("goal", [clingo.Number(0)]) # The goal predicate

        while True:
            max_size = max(max_size, len(fringe))
            if not fringe:
                print("UNSAT", "explored states:", explored, "max fringe size", max_size)
                return False

            node : SearchNode = fringe.pop()
            print(node)

            if goal in node.state:
                print("SAT", "explored states:", explored, "max fringe size", max_size)
                print(node)
                return True

            state = frozenset(node.state)
            if state not in visited:
                visited.add(state)
                explored = explored + 1
                for child in expand(self.domain, node, actions, fluents):
                    fringe.append(child)

In [22]:
search = GraphSearch()
search.domain = program_dynamic

search.search(initial_states[0], [], action_list, fluent_list)

SearchNode(state=[], parent=None, action=[], path_cost=0, depth=0, priority=0)
SearchNode(state=[], parent=SearchNode(state=[], parent=None, action=[], path_cost=0, depth=0, priority=0), action=[], path_cost=-1, depth=1, priority=-1)
SearchNode(state=[], parent=SearchNode(state=[], parent=None, action=[], path_cost=0, depth=0, priority=0), action=[], path_cost=-1, depth=1, priority=-1)
UNSAT explored states: 1 max fringe size 2


False