In [321]:
import functools
from typing import Union, Callable

import clingo
from clingo import Function, Number
from clingox.backend import SymbolicBackend
from clingox.program import Program, ProgramObserver

In [322]:
static_rules = """

pos((X,Y)) :- X=1..3, Y=1..3, (X,Y) != (1,3), (X,Y) != (3,1).
adjacent((X1,Y1), (X2,Y2)) :- pos((X1,Y1)), pos((X2,Y2)), |X1 - X2| + |Y1 - Y2| = 1.

1 { observe(object(Type, Id), value(Property, Value), 0) : value(Property, Value) } 1 :- object(Type, Id).

value(at, Pos) :- pos(Pos).
action(move, Pos) :- pos(Pos).
action(wait, ()).

"""

In [323]:
effect_rules = """

observe(O, value(at, Pos2), T+1) :-
  time(T+1), time(T),
  occurs(O, action(move, Pos2), T),
  observe(O, value(at, Pos1), T),
  adjacent(Pos1, Pos2).

observe(O, value(at, Pos), T+1) :-
  time(T+1), time(T),
  occurs(O, action(wait, ()), T),
  observe(O, value(at, Pos), T).

observe(object(cell, Pos), value(collision, ()), T) :-
  observe(O1, value(at, Pos), T),
  observe(O2, value(at, Pos), T),
  O1 != O2.

observe(object(cell, Pos), value(collision, ()), T+1) :-
  O1 != O2,
  observe(O1, value(at, Pos), T),
  occurs(O2, action(move, Pos), T).

:- occurs(O, action(move, Pos2), T),                                 % Action
   time(T+1), time(T),
   observe(O, value(at, Pos1), T), adjacent(Pos1, Pos2),             % State
   observe(object(cell, Pos2), value(collision, ()), T+1).           % Precondition

"""

In [324]:
instance = """

object(robot, worker).
object(robot, charger).

init(object(robot, worker), value(at, (1,1))).
init(object(robot, charger), value(at, (2,2))).

goal(object(robot, worker), value(at, (3,3))).
goal(object(robot, charger), value(at, (2,2))).

"""

In [325]:
plan = """

time(0..k).

:- init(O, V), not observe(O, V, 0).
:- goal(O, V), not observe(O, V, k).

1 { occurs(object(ObjectType, Id), action(ActionType, Par), T) : action(ActionType, Par) } 1 :- object(ObjectType, Id), time(T), time(T+1).

"""

In [326]:
breakable = """

breakable(charger, base).
breakable(worker, base).

disables((charger, base), action(move, Pos), value(at, Pos)) :- pos(Pos).
disables((worker, base), action(move, Pos), value(at, Pos)) :- pos(Pos).

status(broken;working).

1 { observe(object(robot, R), value(Part, Status), T) : status(Status) } 1 :-
  time(T), object(robot, R), breakable(R, Part).

:- time(T), time(T+1), observe(object(robot, R), value(Part, broken), T), observe(object(robot, R), value(Part, working), T+1).

"""

In [327]:
diagnose_rules = """

observe(O, value(at, Pos), T+1) :-
  time(T+1), time(T),
  occurs(O, action(wait, ()), T),
  observe(O, value(at, Pos), T).

observe(object(robot, R), value(Property, Value), T+1) :-
  time(T+1), time(T),
  breakable(R, Part), disables((R, Part), action(ActionType, Parameter), value(Property, Value)),
  occurs(object(robot, R), action(ActionType, Parameter), T),
  observe(object(robot, R), value(Part, working), T).

observe(object(robot, R), value(Property, Value), T+1) :-
  time(T+1), time(T),
  %breakable(R, Part), disables((R, Part), action(ActionType, Parameter), value(Property, Value)),
  occurs(object(robot, R), action(ActionType, Parameter), T),
  observe(object(robot, R), value(Property, Value), T),
  observe(object(robot, R), value(Part, broken), T).

%observe(object(robot,worker),value(at,(1,1)),0)
%observe(object(robot,worker),value(base,broken),0)
%occurs(object(robot,worker),action(move,(2,1)),0)

"""

In [328]:
diagnose = """

time(0..k).

:- init(O, V), not observe(O, V, 0).

1 { occurs(object(ObjectType, Id), action(ActionType, Par), T) : action(ActionType, Par) } 1 :- object(ObjectType, Id), time(T), time(T+1).

{ faulty(Robot, Part) : breakable(Robot, Part) }.

:- breakable(Robot, Part),     faulty(Robot, Part), not observe(object(robot, Robot), value(Part, broken), k).   % X
:- breakable(Robot, Part), not faulty(Robot, Part), not observe(object(robot, Robot), value(Part, working), k).  % R - X

"""

In [329]:
ctl = clingo.Control()
ctl.configuration.solve.models = 10
prg = Program()
obs = ProgramObserver(prg)
ctl.register_observer(obs)

In [330]:
ctl.add('base', [], static_rules)
ctl.add('base', [], effect_rules)
ctl.add('base', [], instance)
ctl.add('plan', ['k'], plan)
ctl.add('base', [], '#show observe/3. #show occurs/3.')

In [331]:
k = 4
ctl.ground([("base", []), ("plan", [clingo.Number(k)])])

In [332]:
print("Facts:", len(prg.facts), "Rules:", len(prg.rules))

Facts: 0 Rules: 531


In [333]:
def get_time(symbol: clingo.Symbol, default: Union[Callable[[clingo.Symbol], int], int, None] = None):
    if symbol.type is clingo.SymbolType.Function and symbol.name in ('occurs', 'observe'):
        return symbol.arguments[-1].number
    if callable(default):
        return default(symbol)
    return default

In [334]:
def solve(control: clingo.Control):
    models = []
    with control.solve(yield_=True) as solve_handle:
        for model in solve_handle:
            symbols = sorted(sorted(model.symbols(shown=True)), key=functools.partial(get_time, default=-1))
            print("Answer {}:{}".format(model.number,'{'), '\n'.join(map(str, symbols)), '}', sep='\n')
            models.append(symbols)
        solve_result = solve_handle.get()
        print(solve_result, end='')
        if solve_result.satisfiable:
            print(' {}{}'.format(len(models), '' if solve_result.exhausted else '+'), end='')
        print()
    return models

In [335]:
plans = solve(ctl)

Answer 1:{
observe(object(robot,charger),value(at,(2,2)),0)
observe(object(robot,worker),value(at,(1,1)),0)
occurs(object(robot,charger),action(move,(1,2)),0)
occurs(object(robot,worker),action(move,(2,1)),0)
observe(object(robot,charger),value(at,(1,2)),1)
observe(object(robot,worker),value(at,(2,1)),1)
occurs(object(robot,charger),action(wait,()),1)
occurs(object(robot,worker),action(move,(2,2)),1)
observe(object(robot,charger),value(at,(1,2)),2)
observe(object(robot,worker),value(at,(2,2)),2)
occurs(object(robot,charger),action(wait,()),2)
occurs(object(robot,worker),action(move,(2,3)),2)
observe(object(robot,charger),value(at,(1,2)),3)
observe(object(robot,worker),value(at,(2,3)),3)
occurs(object(robot,charger),action(move,(2,2)),3)
occurs(object(robot,worker),action(move,(3,3)),3)
observe(object(robot,charger),value(at,(2,2)),4)
observe(object(robot,worker),value(at,(3,3)),4)
}
Answer 2:{
observe(object(robot,charger),value(at,(2,2)),0)
observe(object(robot,worker),value(at,(1,1))

In [336]:
observed = [
 Function('observe', [Function('object', [Function('robot', []), Function('charger', [])]), Function('value', [Function('at', []), Function('', [Number(2), Number(2)])]), Number(0)]),
 Function('observe', [Function('object', [Function('robot', []), Function('worker', [])] ), Function('value', [Function('at', []), Function('', [Number(1), Number(1)])]), Number(0)]),
 Function('observe', [Function('object', [Function('robot', []), Function('charger', [])]), Function('value', [Function('at', []), Function('', [Number(2), Number(2)])]), Number(1)]),
 Function('observe', [Function('object', [Function('robot', []), Function('worker', [])] ), Function('value', [Function('at', []), Function('', [Number(1), Number(1)])]), Number(1)]),
 Function('observe', [Function('object', [Function('robot', []), Function('charger', [])]), Function('value', [Function('at', []), Function('', [Number(2), Number(2)])]), Number(2)]),
 Function('observe', [Function('object', [Function('robot', []), Function('worker', [])] ), Function('value', [Function('at', []), Function('', [Number(1), Number(1)])]), Number(2)]),
 Function('observe', [Function('object', [Function('robot', []), Function('charger', [])]), Function('value', [Function('at', []), Function('', [Number(2), Number(2)])]), Number(3)]),
 Function('observe', [Function('object', [Function('robot', []), Function('worker', [])] ), Function('value', [Function('at', []), Function('', [Number(1), Number(1)])]), Number(3)]),
 Function('observe', [Function('object', [Function('robot', []), Function('charger', [])]), Function('value', [Function('at', []), Function('', [Number(2), Number(2)])]), Number(4)]),
 Function('observe', [Function('object', [Function('robot', []), Function('worker', [])] ), Function('value', [Function('at', []), Function('', [Number(1), Number(1)])]), Number(4)])]

This is done like in Algorithm 1

In [337]:
i = 0
plan = plans[0]
plan

[Function('observe', [Function('object', [Function('robot', [], True), Function('charger', [], True)], True), Function('value', [Function('at', [], True), Function('', [Number(2), Number(2)], True)], True), Number(0)], True),
 Function('observe', [Function('object', [Function('robot', [], True), Function('worker', [], True)], True), Function('value', [Function('at', [], True), Function('', [Number(1), Number(1)], True)], True), Number(0)], True),
 Function('occurs', [Function('object', [Function('robot', [], True), Function('charger', [], True)], True), Function('action', [Function('move', [], True), Function('', [Number(1), Number(2)], True)], True), Number(0)], True),
 Function('occurs', [Function('object', [Function('robot', [], True), Function('worker', [], True)], True), Function('action', [Function('move', [], True), Function('', [Number(2), Number(1)], True)], True), Number(0)], True),
 Function('observe', [Function('object', [Function('robot', [], True), Function('charger', [],

In [338]:
actions = [symbol for symbol in plan if symbol.match('occurs', 3)]

In [340]:
if i < k: # simulate a while loop by repeatedly running 'Run cell'
    o = [symbol for symbol in observed if symbol.match('observe', 3) and symbol.arguments[-1].number == i]
    e = [symbol for symbol in plan if symbol.match('observe', 3) and symbol.arguments[-1].number == i]
    os = set(o)
    es = set(e)
    discrepancy = os != es
    if discrepancy:
        print('Discrepancy in step', i)
        diag = clingo.Control()
        diag.configuration.solve.models = 6
        diag.add('base', [], static_rules)
        diag.add('base', [], diagnose_rules)
        diag.add('base', [], breakable)
        diag.add('base', [], instance)
        diag.add('diagnose', ['k'], diagnose)
        diag.add('base', [], '#show observe/3. #show occurs/3. #show faulty/2.')
        with SymbolicBackend(diag.backend()) as sb:
            for action in actions:
                if action.arguments[-1].number < i:
                    sb.add_rule(head=(),pos_body=(),neg_body=[action])
                    pass
            for obs in o:
                print(obs)
                sb.add_rule(head=(), pos_body=(), neg_body=[obs])

        diag.ground([('base', []), ('diagnose', [clingo.Number(i)])])
        diagnosis = solve(diag)
    i += 1
i

Discrepancy in step 1
observe(object(robot,charger),value(at,(2,2)),1)
observe(object(robot,worker),value(at,(1,1)),1)
Answer 1:{
faulty(charger,base)
faulty(worker,base)
observe(object(robot,charger),value(at,(2,2)),0)
observe(object(robot,charger),value(base,broken),0)
observe(object(robot,worker),value(at,(1,1)),0)
observe(object(robot,worker),value(base,broken),0)
occurs(object(robot,charger),action(move,(1,2)),0)
occurs(object(robot,worker),action(move,(2,1)),0)
observe(object(robot,charger),value(at,(2,2)),1)
observe(object(robot,charger),value(base,broken),1)
observe(object(robot,worker),value(at,(1,1)),1)
observe(object(robot,worker),value(base,broken),1)
}
SAT 1


<block>:1:34-49: info: no atoms over signature occur in program:
  faulty/2

<block>:1:1-17: info: no atoms over signature occur in program:
  observe/3

<block>:1:18-33: info: no atoms over signature occur in program:
  occurs/3



2