Skip to content

Commit

Permalink
AsyncRunner uses execute_once
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexandreDecan committed Jul 6, 2018
1 parent a54d890 commit 72ea6e3
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 34 deletions.
3 changes: 1 addition & 2 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,9 @@ Queued events can be delayed when they are added to an interpreter event queue.
- (Added) Delayed events can be sent from within a statechart by specifying a ``delay`` parameter to the ``sent`` function.
- (Added) An ``EventQueue`` class (in ``sismic.interpreter.queue``) that controls how (delayed) events are handled by an interpreter.

Several new interpreter runners that benefit from the clock-based handling of time and delayed events:
A new interpreter runner that benefit from the clock-based handling of time and delayed events:

- (Added) An ``AsyncRunner`` in the newly added ``runner`` module to asynchronously run an interpreter at regular interval.
- (TODO) EventBasedRunner
- (Changed) ``helpers.run_in_background`` no longer synchronizes the interpreter clock.
Use the ``start()`` method of ``interpreter.clock`` or an ``UtcClock`` instance instead.
- (Deprecated) ``helpers.run_in_background`` is deprecated, use ``runner.AsyncRunner`` instead.
Expand Down
5 changes: 1 addition & 4 deletions sismic/runner/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1 @@
from .runner import AsyncRunner


__all__ = ['AsyncRunner']
from .runner import *
23 changes: 12 additions & 11 deletions sismic/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,29 +35,29 @@ class AsyncRunner:
- before_run: called (only once !) when the runner is started.
- after_run: called (only once !) when the interpreter reaches a final configuration.
configuration of the underlying interpreter is reached.
- execute: called at each step of the run. By default, call the `execute()`
method of the underlying interpreter.
- execute: called at each step of the run. By default, call the `execute_once`
method of the underlying interpreter and returns a *list* of macro steps.
- before_execute: called right before the call to `execute()`;
- after_execute: called right after the call to `execute()`.
This method is called with the returned value of `execute()`.
:param interpreter: interpreter instance to run.
:param interval: interval between two calls to `execute`
:param execute_once: If set, call interpreter's `execute_once` method instead of `execute`.
:param execute_all: If set, repeatedly call interpreter's `execute_once` method.
"""
def __init__(self, interpreter: Interpreter, interval: float=0.1, execute_once=False) -> None:
def __init__(self, interpreter: Interpreter, interval: float=0.1, execute_all=False) -> None:
self._unpaused = threading.Event()
self._stop = threading.Event()

self.interpreter = interpreter
self.interval = interval
self._execute_once = execute_once
self._execute_all = execute_all
self._thread = threading.Thread(target=self._run)

@property
def running(self):
"""
Holds if execution is currently running (event if it's paused).
Holds if execution is currently running (even if it's paused).
"""
return self._thread.is_alive()

Expand All @@ -84,8 +84,8 @@ def stop(self):
"""
Stop the execution.
"""
self._unpaused.set()
self._stop.set()
self._unpaused.set()
self.wait()

def pause(self):
Expand Down Expand Up @@ -118,7 +118,7 @@ def execute(self) -> List[MacroStep]:
steps.append(step)
step = self.interpreter.execute_once()

if self._execute_once:
if not self._execute_all:
break

return steps
Expand Down Expand Up @@ -152,17 +152,17 @@ def after_run(self):

def _run(self):
self.before_run()
self._unpaused.wait()

while not self.interpreter.final and not self._stop.is_set():
self._unpaused.wait()

while not self.interpreter.final and not self._stop.is_set():
starttime = time.time()
self.before_execute()
r = self.execute()
self.after_execute(r)

elapsed = time.time() - starttime
time.sleep(max(0, self.interval - elapsed))
self._unpaused.wait()

# Ensure that self._stop is set if self.interpreter.final holds
self._stop.set()
Expand All @@ -171,3 +171,4 @@ def _run(self):

def __del__(self):
self.stop()

34 changes: 17 additions & 17 deletions tests/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@

from time import sleep

from sismic.runner import AsyncRunner
from sismic.interpreter import Interpreter
from sismic.runner import AsyncRunner, EventBasedRunner
from sismic.interpreter import Interpreter, DelayedEvent


class TestAsyncRunner:
INTERVAL = 0.05
INTERVAL = 0.02

@pytest.fixture()
def interpreter(self, simple_statechart):
return Interpreter(simple_statechart)

@pytest.fixture()
def runner(self, interpreter):
r = AsyncRunner(interpreter, interval=self.INTERVAL)
r = AsyncRunner(interpreter, interval=0)
yield r
r.stop()

Expand All @@ -27,31 +27,30 @@ class MockedRunner(AsyncRunner):
after_execute = mocker.MagicMock()
after_run = mocker.MagicMock()

r = MockedRunner(interpreter, interval=self.INTERVAL)
r = MockedRunner(interpreter, interval=0)
yield r
r.stop()

def test_not_yet_started(self, runner):
assert runner.interpreter.configuration == []

runner.interpreter.queue('goto s2')
sleep(self.INTERVAL * 2)
sleep(self.INTERVAL)
assert runner.interpreter.configuration == []

runner.start()
sleep(self.INTERVAL * 2)
sleep(self.INTERVAL)
assert runner.interpreter.configuration == ['root', 's3']

def test_start(self, runner):
runner.start()
sleep(self.INTERVAL * 2)
sleep(self.INTERVAL)
assert runner.interpreter.configuration == ['root', 's1']

runner.interpreter.queue('goto s2')
sleep(self.INTERVAL * 2)
sleep(self.INTERVAL)
assert runner.interpreter.configuration == ['root', 's3']


def test_restart_stopped(self, runner):
runner.start()
runner.stop()
Expand All @@ -73,13 +72,13 @@ def test_start_again(self, runner):

def test_hooks(self, mocked_runner):
mocked_runner.start()
sleep(self.INTERVAL * 2)
sleep(self.INTERVAL)
with pytest.raises(AssertionError, message='before_run not called'):
mocked_runner.before_run.assert_not_called()

mocked_runner.pause()
mocked_runner.unpause()
sleep(self.INTERVAL * 2)
sleep(self.INTERVAL)
mocked_runner.pause()

with pytest.raises(AssertionError, message='before_execute not called'):
Expand All @@ -89,15 +88,16 @@ def test_hooks(self, mocked_runner):
mocked_runner.after_execute.assert_not_called()

mocked_runner.stop()
sleep(self.INTERVAL * 2)
sleep(self.INTERVAL)
with pytest.raises(AssertionError, message='after_run not called'):
mocked_runner.after_run.assert_not_called()

def test_final(self, runner):
runner.start()
runner.interpreter.queue('goto s2')
runner.interpreter.queue('goto final')
sleep(self.INTERVAL * 2)
sleep(self.INTERVAL)
sleep(self.INTERVAL)
assert runner.interpreter.final
assert not runner.running
sleep(self.INTERVAL) # Wait for the thread to finish
Expand All @@ -107,20 +107,20 @@ def test_pause(self, runner):
runner.start()
assert not runner.paused

sleep(self.INTERVAL * 2)
sleep(self.INTERVAL)
runner.pause()
assert runner.paused
assert runner.running
assert runner.interpreter.configuration == ['root', 's1']

runner.interpreter.queue('goto s2')
sleep(self.INTERVAL * 2)
sleep(self.INTERVAL)
assert runner.interpreter.configuration == ['root', 's1']

runner.unpause()
assert not runner.paused
assert runner.running
sleep(self.INTERVAL * 2)
sleep(self.INTERVAL)
assert runner.interpreter.configuration == ['root', 's3']


Expand Down

0 comments on commit 72ea6e3

Please sign in to comment.