In [1]:
import numpy as np
import pytest
!uv pip install pytest ipytest

[2mUsing Python 3.10.12 environment at: /opt/.venv[0m
[2mAudited [1m2 packages[0m [2min 3ms[0m[0m


In [2]:
import copy
from queue import PriorityQueue


class Fluent():
    def __init__(self, name, *args):
        self.name = name
        self.args = args
    
    def __str__(self):
        args_str = " ".join(self.args)
        return f"{self.name} {args_str}"

    def __repr__(self):
        return f"Fluent<{self}>"

    def __hash__(self):
        return hash(str(self))
    
    def __eq__(self, other):
        return hash(self) == hash(other)

    def __invert__(self):
        if self.name[:4] == 'not ':
            return Fluent(f"{self.name[4:]}", *[a for a in self.args])
        return Fluent(f"not {self.name}", *[a for a in self.args])
    

class State():

    def __init__(self, time=0, active_fluents=None, upcoming_effects=None):
        self.time = time
        if active_fluents is None:
            active_fluents = set()
        if upcoming_effects is None:
            upcoming_effects = PriorityQueue()
        self.active_fluents = active_fluents
        self.upcoming_effects = upcoming_effects

    def satisfies_precondition(self, action):
        for precondition in action.preconditions:
            if precondition not in self.active_fluents:
                return False
        return True
                
    def transition(self, action):
        if not self.satisfies_precondition(action):
            raise ValueError("Precondition not satisfied for applying action")
        
        state = self.copy()
        outcome_states = {}
        
        # Copy action effects to upcoming effects in state
        for effect in action.effects:
            state.upcoming_effects.put((effect.time + state.time, effect))

        advance_state(state, outcome_states)
        return outcome_states
                 
    def __repr__(self):
        return f"State<time={self.time}, active_fluents={self.active_fluents}>"
                            
    def __hash__(self):
        return hash(self.time) + sum(hash(a_f) for a_f in self.active_fluents)

    def __eq__(self, other):
        return hash(self) == hash(other)
        
    def copy(self):
        upcoming_effects = PriorityQueue()
        upcoming_effects.queue = [x for x in self.upcoming_effects.queue]
        return State(
            time=self.time,
            active_fluents=set(f for f in self.active_fluents),
            upcoming_effects=upcoming_effects
        )
            

def advance_state(state, outcome_states, prob=1.0):
    while not state.upcoming_effects.empty():
        _, effect = state.upcoming_effects.get()
        state.time += effect.time
        state.active_fluents = add_fluents(state.active_fluents,
                                           effect.resulting_fluents)
        for f in state.active_fluents:
            if f.name == 'free':
                outcome_states[state] = (prob, state.time)

        if isinstance(effect, Effect):
            continue

        for p, effects in effect.prob_effects:
            prob_state = state.copy()
            for effect in effects:
                prob_state.upcoming_effects.put((effect.time + prob_state.time, effect))
            advance_state(prob_state, outcome_states, prob=prob * p)
            

class Effect():
    """
    Effect(t: float, resulting_fluents: Set[Fluent])
    """
    def __init__(self, time, resulting_fluents):
        self.time = time
        self.resulting_fluents = resulting_fluents


class ProbEffects():
    """
    ProbEffects(
        t=5,
        prob_effects=[(0.8, List[Effect]),
                      (0.2, List[Effect])
        ],
        resulting_fluents=Set[String]
    )
    """
    def __init__(self, time, prob_effects, resulting_fluents=set()):
        self.time = time
        self.prob_effects = prob_effects
        self.resulting_fluents = resulting_fluents  # Propagate into all prob_effects

class Action():
    def __init__(self, preconditions, effects):
        self.preconditions = preconditions
        self.effects = effects


In [3]:
def search(robot, loc_from, loc_to, object):
    return Action(
        preconditions={
            Fluent('at', robot, loc_from),
            ~Fluent('searched', loc_to, object),
            Fluent('free', robot),
            ~Fluent('found', object)
        },
        effects=[
            Effect(
                time=0,
                resulting_fluents={
                    ~Fluent('free', robot),
                    ~Fluent('found', object)
                }
            ),
            ProbEffects(
                time=5,
                prob_effects=[
                    (0.8, [Effect(time=0,
                                  resulting_fluents={Fluent('at', loc_to, object),
                                                    Fluent('found', object)}),
                           Effect(time=3,
                                  resulting_fluents={Fluent('holding', robot, object),
                                                     ~Fluent('at', loc_to, object),
                                                     Fluent('free', robot)})]),
                    (0.2, [Effect(time=0,
                                  resulting_fluents={Fluent('free', robot),
                                                    ~Fluent('at', loc_to, object)})])
                ],
               resulting_fluents={Fluent('at', robot, loc_to),
                                  ~Fluent('at', robot, loc_from),
                                 Fluent('searched', loc_to, object)}
            )
        ]
    )


def add_fluents(active_fluents, upcoming_fluents):
    active_fluents = {a_f for a_f in active_fluents}
    for u_f in upcoming_fluents:
        if ~u_f in active_fluents:
            active_fluents.remove(~u_f)
        active_fluents.add(u_f)
    return active_fluents


In [4]:
import ipytest
ipytest.autoconfig()

# @pytest.mark.skip()
def test_invert_fluent():
    pass

# @pytest.mark.skip()
def test_add_fluents():
    active_fluents = {
        Fluent('at', 'robot1', 'bedroom'),
        Fluent('free', 'robot1'),
        ~Fluent('at', 'livingroom', 'fork')
    }
    upcoming_fluents={
        ~Fluent('free', 'robot1'),
        ~Fluent('at', 'robot1', 'bedroom'),
        Fluent('at', 'robot1', 'kitchen'),
        ~Fluent('found', 'fork'),
    }
    resulting_fluents = add_fluents(active_fluents, upcoming_fluents)
    expected_fluents = {
        ~Fluent('free', 'robot1'),
        ~Fluent('at', 'robot1', 'bedroom'),
        Fluent('at', 'robot1', 'kitchen'),
        ~Fluent('found', 'fork'),
        ~Fluent('at', 'livingroom', 'fork')
    }
    assert resulting_fluents == expected_fluents
    
    resulting_fluents = add_fluents(resulting_fluents, {Fluent('free', 'robot1')})
    expected_fluents = {
        Fluent('free', 'robot1'),
        ~Fluent('at', 'robot1', 'bedroom'),
        Fluent('at', 'robot1', 'kitchen'),
        ~Fluent('found', 'fork'),
        ~Fluent('at', 'livingroom', 'fork')
    }
    assert resulting_fluents == expected_fluents

    
# @pytest.mark.skip()
def test_state_action_outcomes():
    active_fluents = {
        Fluent('at', 'robot1', 'bedroom'),
        ~Fluent('searched', 'kitchen', 'fork'),
        Fluent('free', 'robot1'),
        ~Fluent('found', 'fork')        
    }
        
    locations = ['bedroom', 'kitchen', 'livingroom']
    robots = ['robot1']
    objects = ['fork']

    init_state = State(active_fluents=active_fluents)
    action = search('robot1', 'bedroom', 'kitchen', 'fork')
    outcome_states = init_state.transition(action)
    
    success_fluents = {
        Fluent('free', 'robot1'),
        Fluent('at', 'robot1', 'kitchen'),
        ~Fluent('at', 'robot1', 'bedroom'),
        Fluent('holding', 'robot1', 'fork'),
        Fluent('searched', 'kitchen', 'fork'),
        ~Fluent('at', 'kitchen', 'fork'),
        Fluent('found', 'fork'),
    }
    failure_fluents = {
        Fluent('free', 'robot1'),
        ~Fluent('found', 'fork'),
        Fluent('at', 'robot1', 'kitchen'),
        ~Fluent('at', 'robot1', 'bedroom'),
        Fluent('searched', 'kitchen', 'fork'),
        ~Fluent('at', 'kitchen', 'fork')
    }
    expected_outcome_states = {
        State(time=8, active_fluents=success_fluents): (0.8, 8),
        State(time=5, active_fluents=failure_fluents): (0.2, 5)
    }

    assert outcome_states == expected_outcome_states    
        
ipytest.run('-vv')


platform linux -- Python 3.10.12, pytest-8.3.5, pluggy-1.5.0 -- /opt/.venv/bin/python3
cachedir: .pytest_cache
metadata: {'Python': '3.10.12', 'Platform': 'Linux-6.8.0-52-generic-x86_64-with-glibc2.35', 'Packages': {'pytest': '8.3.5', 'pluggy': '1.5.0'}, 'Plugins': {'html': '4.1.1', 'timeout': '2.3.1', 'metadata': '3.1.1', 'anyio': '4.9.0'}}
rootdir: /notebooks
plugins: html-4.1.1, timeout-2.3.1, metadata-3.1.1, anyio-4.9.0
[1mcollecting ... [0mcollected 3 items

t_53d18e84d7364427bed661fc3403adc3.py::test_invert_fluent [32mPASSED[0m[32m                             [ 33%][0m
t_53d18e84d7364427bed661fc3403adc3.py::test_add_fluents [32mPASSED[0m[32m                               [ 66%][0m
t_53d18e84d7364427bed661fc3403adc3.py::test_state_action_outcomes [32mPASSED[0m[32m                     [100%][0m



<ExitCode.OK: 0>

In [5]:
    # return Action(
    #     preconditions={f'not searched {location} {object}', f'free {robot}', f'not found {object}'},
    #     effects=[Effect(0, {f"not free {robot}", f"not found {object}"}),
    #              ProbEffect(time=5,
    #                         prob_effects = [
    #                              (0.8, [Effect(0, {f"at {location} {object}"}),
    #                                     Effect(3, f"holding {robot} {object}", f"free {robot}"})]),
    #                              (0.2, [Effect(0, {f"free {robot}"})])
    #                         ],
    #                        resulting_fluents={f'at {robot} {location}'})
    #             ]
    # )