Skip to content

Commit

Permalink
start/stop/pause/unpause runner
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexandreDecan committed Jun 29, 2018
1 parent bfaddfa commit 0dc5e55
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 18 deletions.
50 changes: 36 additions & 14 deletions sismic/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@ class AsyncRunner:
"""
An asynchronous runner that repeatedly execute given interpreter.
The execution must be started with the "start" method, and can be stopped
with the "stop" method. These are non-blocking calls. A call to "wait"
is blocking until the execution of the underlying interpreter ends (i.e. when
it reaches a final configuration).
The runner tries to call its `execute` method every `interval` seconds, assuming
that a call to interpreter `execute` method takes less time than `interval`.
If not, subsequent call is queued and will occur as soon as possible with
no delay. The runner stops as soon as the underlying interpreter reaches
a final configuration.
The execution must be started with the `start` method, and can be (definitively)
stopped with the `stop` method. An execution can be temporarily suspended
using the `pause` and `unpause` methods. A call to `wait` blocks until
the statechart reaches a final configuration.
While this runner can be used "as is", it was designed to be subclassed and
as such, proposes several hooks to control the execution and additional
behaviours:
Expand All @@ -42,29 +42,49 @@ class AsyncRunner:
:param interval: interval between two calls to `execute`
"""
def __init__(self, interpreter: Interpreter, interval: float=0.1) -> None:
self._runnning = threading.Event()
self._running = threading.Event()
self._stop = threading.Event()

self.interpreter = interpreter
self.interval = interval
self._thread = threading.Thread(target=self._run)
self._thread.start()

def start(self):
"""
Start the execution.
"""
self._runnning.set()
if self._stop.isSet():
raise RuntimeError('Cannot restart a stopped runner.')
elif self._thread.isAlive():
raise RuntimeError('Runner is already started')
else:
self._running.set()
self._thread.start()

def stop(self):
"""
Stop the execution.
"""
self._runnning.clear()

self._stop.set()

def pause(self):
"""
Pause the execution.
"""
self._running.clear()

def unpause(self):
"""
Unpause the execution.
"""
self._running.set()

def wait(self):
"""
Wait for the execution to finish.
"""
self._thread.join()
if self._thread.isAlive():
self._thread.join()

def execute(self) -> List[MacroStep]:
"""
Expand Down Expand Up @@ -100,11 +120,10 @@ def after_run(self):
pass

def _run(self):
self._runnning.wait()
self.before_run()

while not self.interpreter.final:
self._runnning.wait()
while not self.interpreter.final and not self._stop.isSet():
self._running.wait()

starttime = time.time()
self.before_execute()
Expand All @@ -115,3 +134,6 @@ def _run(self):
time.sleep(max(0, self.interval - elapsed))

self.after_run()

def __del__(self):
self.stop()
35 changes: 31 additions & 4 deletions tests/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ def test_is_not_started(self, runner):
sleep(self.INTERVAL * 2)
assert runner.interpreter.configuration == ['root', 's3']

runner.stop()

def test_start(self, runner):
runner.start()
sleep(self.INTERVAL * 2)
Expand All @@ -37,6 +39,23 @@ def test_start(self, runner):
sleep(self.INTERVAL * 2)
assert runner.interpreter.configuration == ['root', 's3']

runner.stop()

def test_start_stopped(self, runner):
runner.start()
runner.stop()

with pytest.raises(RuntimeError, match='Cannot restart'):
runner.start()

def test_start_again(self, runner):
runner.start()

with pytest.raises(RuntimeError, match='already started'):
runner.start()

runner.stop()

def test_hooks(self, runner, mocker):
hooks = ['before_run', 'after_run', 'before_execute', 'after_execute']
for hook in hooks:
Expand All @@ -53,6 +72,8 @@ def test_hooks(self, runner, mocker):
with pytest.raises(AssertionError, message=hook):
# Because assert_called requires python 3.6+
getattr(runner, hook).assert_not_called()

runner.stop()

def test_final(self, runner):
runner.start()
Expand All @@ -62,17 +83,23 @@ def test_final(self, runner):
assert runner.interpreter.final
assert not runner._thread.isAlive()

def test_stop(self, runner):
def test_pause(self, runner):
runner.start()
sleep(self.INTERVAL * 2)
runner.stop()
runner.pause()
assert runner.interpreter.configuration == ['root', 's1']

runner.interpreter.queue('goto s2')
sleep(self.INTERVAL * 2)
assert runner.interpreter.configuration == ['root', 's1']

runner.start()
runner.unpause()
sleep(self.INTERVAL * 2)
assert runner.interpreter.configuration == ['root', 's3']


runner.stop()

def test_join_stopped(self, runner):
runner.start()
runner.stop()
runner.wait()

0 comments on commit 0dc5e55

Please sign in to comment.