In [1]:
from dataclasses import dataclass, field
from typing import TypeVar, Sequence, FrozenSet, Optional, Collection, Set, Union, Tuple

import more_itertools


In [8]:
_U = TypeVar('_U')
_ForwardFrozenRelation = TypeVar('_ForwardFrozenRelation', bound='FrozenRelation')


@dataclass(frozen=True)
class FrozenRelation:
    relation_set: FrozenSet[Tuple[_U, _U]] = field(default_factory=frozenset)

    def __getitem__(self, item):
        return self.get(item)

    def __str__(self):
        return "{}{}{}".format('{', ",".join(map(lambda e: "({},{})".format(e[0], e[1]), self.relation_set)), '}')

    def related(self, item1, item2) -> bool:
        return (item1, item2) in self.relation_set

    def get(self, item) -> Set[_U]:
        return {elem[1] for elem in self.relation_set if elem[0] == item}

    def get_inverse(self, item) -> Set[_U]:
        return {elem[0] for elem in self.relation_set if elem[1] == item}

    @staticmethod
    def from_ground_coll(ground_coll: Collection[Tuple[_U, _U]],
                         transitive: bool = False,
                         reflexive: bool = False,
                         symmetric: bool = False) -> _ForwardFrozenRelation:
        relation_set = set(ground_coll)
        add = {None}
        while add:
            add = set()
            if symmetric:
                for rel in relation_set:
                    item1, item2 = rel
                    if (item2, item1) not in relation_set:
                        add.add((item2, item1))
            if reflexive:
                for rel in relation_set:
                    item1, item2 = rel
                    if (item1, item1) not in relation_set:
                        add.add((item1, item1))
                    if (item2, item2) not in relation_set:
                        add.add((item2, item2))
            if transitive:
                for rel in relation_set:
                    item1, item2 = rel
                    for rel_ in relation_set:
                        item3, item4 = rel_
                        if item2 != item3:
                            continue
                        if (item1, item4) not in relation_set:
                            add.add((item1, item4))
            relation_set.update(add)
        return FrozenRelation(frozenset(relation_set))


In [2]:
_ForwardAtom = TypeVar('_ForwardAtom', bound='Atom')
_ForwardFormula = TypeVar('_ForwardFormula', bound='Formula')


@dataclass(order=True, frozen=True)
class Atom:
    symbol: str
    arguments: Sequence[Union[_ForwardAtom, _ForwardFormula]] = field(default_factory=tuple)

    @property
    def is_complement(self) -> bool:
        return self.symbol.startswith('-')

    @property
    def is_know(self) -> bool:
        return self.symbol == 'Know'

    def __invert__(self):
        if self.is_complement:
            return Atom(self.symbol[1:], self.arguments)
        else:
            return Atom('-{}'.format(self.symbol), self.arguments)

    def __str__(self):
        if self.arguments:
            return "{}({})".format(self.symbol, ','.join(map(str, self.arguments)))
        else:
            return "{}".format(self.symbol)


In [3]:
@dataclass(order=True, frozen=True)
class Literal:
    atom: Atom
    sign: bool = field(default=True)

    def __neg__(self):
        return Literal(self.atom, not self.sign)

    def __invert__(self):
        return Literal(~self.atom, self.sign)

    def __str__(self):
        if self.sign:
            return str(self.atom)
        else:
            return "¬{}".format(self.atom)

In [4]:
Action = Atom
Fluent = Atom
FluentLiteral = Literal

In [5]:
@dataclass(order=True, frozen=True)
class State:
    world: FrozenSet[FluentLiteral] = field(default_factory=frozenset)
    k_relation: FrozenRelation = field(default_factory=FrozenRelation)

    def __contains__(self, item):
        if not isinstance(item, FluentLiteral):
            raise TypeError("Unexpected type {} of item {}".format(type(item).__name__, item))
        if item in self.world:
            return True
        if -item in self.world:
            return False
        return not item.sign


In [6]:
_ForwardSituation = TypeVar('_ForwardSituation', bound='Situation')


@dataclass(order=True, frozen=True)
class Situation:
    state: State = field(default_factory=State)
    action: Optional[Action] = field(default=None)
    parent: Optional[_ForwardSituation] = field(default=None)

    @property
    def is_root(self) -> bool:
        return self.parent is not None

    @property
    def time(self) -> int:
        if self.is_root:
            return 0
        else:
            return self.parent.time + 1

    @property
    def root(self) -> _ForwardSituation:
        if self.is_root:
            return self
        else:
            return self.parent.root

In [7]:
_ForwardPath = TypeVar('_ForwardPath', bound='Path')


@dataclass(order=True, frozen=True)
class Path:
    states: Sequence[State] = field(default_factory=tuple)
    actions: Sequence[Action] = field(default_factory=tuple)

    @property
    def root(self) -> State:
        return self.states[0]

    def time(self, state: State) -> Optional[int]:
        for time, state_ in enumerate(self.states):
            if state_ == state:
                return time
        return None

    def starts(self, state: State) -> bool:
        return self.root == state

    def get_suffix(self, state: Optional[State] = None) -> _ForwardPath:
        if state is None:
            state = self.root
        if state not in self.states:
            return Path()
        offset = self.time(state)
        if offset + 1 < len(self.actions):
            return Path(self.states[offset:], self.actions[offset + 1:])
        else:
            return Path(self.states[offset:])


In [9]:
_ForwardCausalSetting = TypeVar('_ForwardCausalSetting', bound='CausalSetting')

In [10]:
class Formula:
    def __neg__(self):
        return Negation(self)

    def __and__(self, other):
        return Conjunction(self, other)

    def __or__(self, other):
        return Disjunction(self, other)

    def evaluate(self, causal_setting: _ForwardCausalSetting, elem: Union[Situation, Path]) -> bool:
        if isinstance(elem, Situation):
            return self.evaluate_situation(causal_setting, elem)
        else:
            if not isinstance(elem, Path):
                raise TypeError('Unexpected type {} for elem {}'.format(type(elem).__name__, elem))
            return self.evaluate_path(causal_setting, elem)

    def evaluate_situation(self, causal_setting: _ForwardCausalSetting, situation: Situation) -> bool:
        raise NotImplementedError

    def evaluate_path(self, causal_setting: _ForwardCausalSetting, path: Path) -> bool:
        raise NotImplementedError



In [11]:
class StateFormula(Formula):

    def evaluate_path(self, causal_setting: _ForwardCausalSetting, path: Path) -> bool:
        return self.evaluate_situation(causal_setting, path.root)


In [12]:
class PathFormula(Formula):
    pass

In [13]:
@dataclass(order=True, frozen=True)
class CausalSetting:
    fluent_alphabet: FrozenSet[Fluent] = field(default_factory=frozenset)  # without Poss, Know, Int
    action_alphabet: FrozenSet[Action] = field(default_factory=frozenset)
    initial_state: State = field(default_factory=State)
    k_relation: FrozenRelation = field(default_factory=FrozenRelation)
    max_time: int = field(default=0)

    def poss(self, action: Action, state: State) -> bool:
        raise NotImplementedError

    def do(self, action: Action, situation: Optional[Situation] = None) -> Situation:
        raise NotImplementedError

    def do_iter(self, actions: Sequence[Action], state: Optional[Situation] = None) -> Situation:
        for action in actions:
            state = self.do(action, state)
        return state

    def know(self, formula: Formula, situation: Situation) -> bool:
        return all(formula.evaluate(self, situation_) for situation_ in self.k_relation.get_inverse(situation))

    def causes_directly(self) -> bool:
        pass

    def causes_indirectly(self) -> bool:
        pass

    def causes(self) -> bool:
        return self.causes_directly() or self.causes_indirectly()

In [14]:
@dataclass(order=True, frozen=True)
class Predicate(StateFormula):
    predicate: FluentLiteral

    def evaluate_state(self, causal_setting: CausalSetting, state: State) -> bool:
        return self.predicate in state

In [15]:
@dataclass(order=True, frozen=True)
class Possible(StateFormula):
    action: Action

    def evaluate_state(self, causal_setting: CausalSetting, state: State) -> bool:
        return causal_setting.poss(self.action, state)


In [16]:
@dataclass(order=True, frozen=True)
class After(StateFormula):
    action: Action
    formula: Formula

    def evaluate_state(self, causal_setting: CausalSetting, state: State) -> bool:
        return self.formula.evaluate(causal_setting, causal_setting.do(self.action, state))

In [17]:
@dataclass(order=True, frozen=True)
class Negation(Formula):
    formula: Formula

    def evaluate(self, causal_setting: CausalSetting, elem: Union[State, Path]) -> bool:
        return not self.formula.evaluate(causal_setting, elem)

    def evaluate_state(self, causal_setting: CausalSetting, state: State) -> bool:
        return not self.formula.evaluate_situation(causal_setting, state)

    def evaluate_path(self, causal_setting: CausalSetting, path: Path) -> bool:
        return not self.formula.evaluate_path(causal_setting, path)

    def __neg__(self):
        return self.formula

In [18]:
@dataclass(order=True, frozen=True)
class Conjunction(Formula):
    left: Formula
    right: Formula

    def evaluate(self, causal_setting: CausalSetting, elem: Union[State, Path]) -> bool:
        return self.left.evaluate(causal_setting, elem) and self.right.evaluate(causal_setting, elem)

    def evaluate_state(self, causal_setting: CausalSetting, state: State) -> bool:
        return self.left.evaluate_situation(causal_setting, state) and self.right.evaluate_situation(causal_setting, state)

    def evaluate_path(self, causal_setting: CausalSetting, path: Path) -> bool:
        return self.left.evaluate_path(causal_setting, path) and self.right.evaluate_path(causal_setting, path)


In [19]:
@dataclass(order=True, frozen=True)
class Disjunction(Formula):
    left: Formula
    right: Formula

    def evaluate(self, causal_setting: CausalSetting, elem: Union[State, Path]) -> bool:
        return self.left.evaluate(causal_setting, elem) or self.right.evaluate(causal_setting, elem)

    def evaluate_state(self, causal_setting: CausalSetting, state: State) -> bool:
        return self.left.evaluate_situation(causal_setting, state) or self.right.evaluate_situation(causal_setting, state)

    def evaluate_path(self, causal_setting: CausalSetting, path: Path) -> bool:
        return self.left.evaluate_path(causal_setting, path) or self.right.evaluate_path(causal_setting, path)


In [20]:
@dataclass(order=True, frozen=True)
class Know(StateFormula):
    formula: Formula

    def evaluate_state(self, causal_setting: CausalSetting, state: State) -> bool:
        return causal_setting.know(self, state)

In [21]:
@dataclass(order=True, frozen=True)
class Intention(StateFormula):
    formula: Formula

    def evaluate_state(self, causal_setting: CausalSetting, state: State) -> bool:
        raise NotImplementedError


In [22]:
@dataclass(order=True, frozen=True)
class Next(PathFormula):
    formula: Formula

    def evaluate_path(self, causal_setting: _ForwardCausalSetting, path: Path) -> bool:
        for action in causal_setting.action_alphabet:
            if not causal_setting.poss(action, path.root):
                continue
            path_ = path.get_suffix(causal_setting.do(action, path.root))
            if self.formula.evaluate_path(causal_setting, path_):
                return True
        return False



In [23]:
@dataclass(order=True, frozen=True)
class Until(PathFormula):
    holds_formula: Formula
    until_formula: Formula

    def evaluate_path(self, causal_setting: CausalSetting, path: Path) -> bool:
        for s_ in path.states:
            p_ = path.get_suffix(s_)
            if not self.until_formula.evaluate_path(causal_setting, p_):
                continue
            valid = True
            for s_star in path.states:
                p_star = path.get_suffix(s_star)
                if not self.holds_formula.evaluate_path(causal_setting, p_star):
                    valid = False
                    break
            if valid:
                return True
        return False

In [24]:
@dataclass(order=True, frozen=True)
class TrivialTrue(Formula):

    def evaluate(self, causal_setting: CausalSetting, elem: Union[State, Path]) -> bool:
        return True

    def evaluate_state(self, causal_setting: CausalSetting, state: State) -> bool:
        return True

    def evaluate_path(self, causal_setting: CausalSetting, path: Path) -> bool:
        return True


In [25]:
@dataclass(order=True, frozen=True)
class Eventually(PathFormula):
    formula: Formula

    def evaluate_path(self, causal_setting: CausalSetting, path: Path) -> bool:
        return Until(TrivialTrue(), self.formula).evaluate_path(causal_setting, path)

In [26]:
@dataclass(order=True, frozen=True)
class Before(PathFormula):
    before_formula: Formula
    after_formula: Formula

    def evaluate_path(self, causal_setting: CausalSetting, path: Path) -> bool:
        return Negation(Until(Negation(self.before_formula), self.after_formula)).evaluate_path(causal_setting, path)

In [27]:
Ls = Atom('Ls')
Ld = Atom('Ld')
L1 = Atom('L1')
L1_ = Atom("L1'")

takeOff_Ls = Atom('takeOff', Ls)
takeOff_Ld = Atom('takeOff', Ld)
takeOff_L1 = Atom('takeOff', L1)
takeOff_L1_ = Atom('takeOff', L1_)

flyTo_Ls = Atom('flyTo', (Ls,))
flyTo_Ld = Atom('flyTo', (Ld,))
flyTo_L1 = Atom('flyTo', (L1,))
flyTo_L1_ = Atom('flyTo', (L1_,))

land_Ls = Atom('land', Ls)
land_Ld = Atom('land', Ld)
land_L1 = Atom('land', L1)
land_L1_ = Atom('land', L1_)

atom_At_Ls = Atom('At', (Ls,))
atom_At_Ld = Atom('At', (Ld,))
atom_At_L1 = Atom('At', (L1,))
atom_At_L1_ = Atom('At', (L1_,))

At_Ls = Literal(atom_At_Ls)
At_Ld = Literal(atom_At_Ld)
At_L1 = Literal(atom_At_L1)
At_L1_ = Literal(atom_At_L1_)

a_Flying = Atom('Flying')

Flying = Literal(a_Flying)

atom_Vis_Ls = Atom('Vis', (Ls,))
atom_Vis_Ld = Atom('Vis', (Ld,))
atom_Vis_L1 = Atom('Vis', (L1,))
atom_Vis_L1_ = Atom('Vis', (L1_,))

Vis_Ls = Literal(atom_Vis_Ls)
Vis_Ld = Literal(atom_Vis_Ld)
Vis_L1 = Literal(atom_Vis_L1)
Vis_L1_ = Literal(atom_Vis_L1_)

atom_TStrom_Ls = Atom('TStrom', (Ls,))
atom_TStrom_Ld = Atom('TStrom', (Ld,))
atom_TStrom_L1 = Atom('TStrom', (L1,))
atom_TStrom_L1_ = Atom('TStrom', (L1_,))

TStrom_Ls = Literal(atom_TStrom_Ls)
TStrom_Ld = Literal(atom_TStrom_Ld)
TStrom_L1 = Literal(atom_TStrom_L1)
TStrom_L1_ = Literal(atom_TStrom_L1_)

sense_TStrom_Ls = Atom('sense', (atom_TStrom_Ls,))
sense_TStrom_Ld = Atom('sense', (atom_TStrom_Ld,))
sense_TStrom_L1 = Atom('sense', (atom_TStrom_L1,))
sense_Tstrom_L1_ = Atom('sense', (atom_TStrom_L1_,))

req_Eventually_Vis_L1_ = Atom('req', (Eventually(Vis_L1_),))

route = {
    Ls: frozenset({L1, L1_}),
    L1: frozenset({Ld}),
    L1_: frozenset({Ld}),
    Ld: frozenset()
}

drone_example_actions = {takeOff_Ls, takeOff_L1, takeOff_L1_, takeOff_Ld,
                         land_Ls, land_L1, land_L1_, land_Ld,
                         flyTo_Ls, flyTo_L1, flyTo_L1_, flyTo_Ld,
                         sense_TStrom_Ls, sense_TStrom_Ld, sense_TStrom_L1, sense_Tstrom_L1_,
                         req_Eventually_Vis_L1_}

drone_example_fluents = {
    atom_At_Ls, atom_At_L1, atom_At_L1_, atom_At_Ld,
    atom_Vis_Ls, atom_At_L1, atom_At_L1_, atom_At_Ld,
    atom_TStrom_Ls, atom_TStrom_L1, atom_TStrom_L1_, atom_TStrom_Ld
}

In [28]:
@dataclass(order=True, frozen=True)
class DroneExample(CausalSetting):

    def poss(self, action: Action, state: State) -> bool:
        if action not in self.action_alphabet:
            return False
        l = action.arguments[0]
        atom_At_l = Atom('At', l)
        At_l = Literal(atom_At_l)
        if At_l not in state:
            return False
        if action.symbol == 'takeOff':
            return -Flying in state
        if action.symbol == 'land':
            return Flying in state
        l_ = action.arguments[1]
        if action.symbol == 'flyTo':
            return Flying in state and l_ in route[l]

        assert False, "Unexpected action {}".format(action)

    def do(self, action: Action, state: Optional[State] = None) -> State:
        if state is None:
            state = self.initial_state
        if action.symbol == 'takeOff':
            state_ = (state - {-Flying}) | {Flying}
            return state_
        if action.symbol == 'land':
            state_ = (state - {Flying}) | {-Flying}
            return state_
        l_ = action.arguments[1]
        atom_At_l_ = Atom('At', l_)
        At_l_ = Literal(atom_At_l_)
        if action.symbol == 'flyTo':
            state_ = state | {At_l_}
            return state_

        assert False, "Unexpected action {}".format(action)



In [29]:
drone_example_initial_state = State({
    Vis_Ls, At_Ls, TStrom_L1
})
print(drone_example_initial_state)

{At(Ls),TStrom(L1),Vis(Ls)}


In [30]:
-Flying

Literal(atom=Atom(symbol='Flying', arguments=()), sign=False)

In [31]:
(-Flying) in drone_example_initial_state

True

In [32]:
Vis_Ls in drone_example_initial_state

True

In [33]:
drone_example_k_states = {State(Literal(atom) for atom in atoms) for atoms in
                          more_itertools.powerset(drone_example_fluents)}
len(drone_example_k_states)

512

In [34]:
drone_example_valid_k_states = {
    k_state for k_state in drone_example_k_states if
    At_Ls in k_state and
    -At_L1 in k_state and
    -At_L1_ in k_state and
    -At_Ld in k_state and
    -Flying in k_state and
    Vis_Ls in k_state and
    -Vis_L1 in k_state and
    -Vis_L1_ in k_state and
    -Vis_Ld in k_state and
    -TStrom_Ls in k_state and
    -TStrom_L1_ in k_state and
    -TStrom_Ld in k_state
}
len(drone_example_valid_k_states)

2

In [35]:
for k_state in drone_example_valid_k_states:
    print(k_state)

{At(Ls),TStrom(L1),Vis(Ls)}
{At(Ls),Vis(Ls)}


In [36]:
drone_example_k_relation_ground_coll = {
    (drone_example_initial_state, k_state) for k_state in drone_example_valid_k_states
}
drone_example_k_relation = FrozenRelation.from_ground_coll(drone_example_k_relation_ground_coll)
print(drone_example_k_relation)

{({At(Ls),TStrom(L1),Vis(Ls)},{At(Ls),Vis(Ls)}),({At(Ls),TStrom(L1),Vis(Ls)},{At(Ls),TStrom(L1),Vis(Ls)})}


In [37]:
drone_example = DroneExample(
    drone_example_fluents,
    drone_example_actions,
    drone_example_initial_state,
    drone_example_k_relation,
    7,
)

In [38]:
drone_example_phi_0 = Eventually(At_Ld)
drone_example_phi_1 = Before(Vis_L1, Vis_Ld)

In [39]:
drone_example_s3 = drone_example.do_iter([takeOff_Ls, sense_TStrom_L1, req_Eventually_Vis_L1_])

IndexError: tuple index out of range