diff --git a/cadence/_internal/workflow/deterministic_event_loop.py b/cadence/_internal/workflow/deterministic_event_loop.py index 8cc5dca..da9774f 100644 --- a/cadence/_internal/workflow/deterministic_event_loop.py +++ b/cadence/_internal/workflow/deterministic_event_loop.py @@ -27,6 +27,16 @@ def __init__(self): self._stopping = False self._closed = False + def run_until_yield(self): + """Run until stop() is called.""" + self._run_forever_setup() + try: + while self._ready: + self._run_once() + finally: + self._run_forever_cleanup() + + # Event Loop APIs def call_soon( self, callback: Callable[[Unpack[_Ts]], object], diff --git a/cadence/_internal/workflow/workflow_engine.py b/cadence/_internal/workflow/workflow_engine.py index 5fee196..a90a0aa 100644 --- a/cadence/_internal/workflow/workflow_engine.py +++ b/cadence/_internal/workflow/workflow_engine.py @@ -225,10 +225,7 @@ def _execute_workflow_once( self._workflow_instance.run(workflow_input) ) - # signal the loop to stop after the first run - self._loop.stop() - # this starts the loop and runs once then stops with cleanup - self._loop.run_forever() + self._loop.run_until_yield() except Exception as e: logger.error( diff --git a/tests/cadence/_internal/workflow/test_deterministic_event_loop.py b/tests/cadence/_internal/workflow/test_deterministic_event_loop.py index d555148..bbca5b0 100644 --- a/tests/cadence/_internal/workflow/test_deterministic_event_loop.py +++ b/tests/cadence/_internal/workflow/test_deterministic_event_loop.py @@ -76,3 +76,17 @@ def test_create_task(self): size = 10000 results = self.loop.run_until_complete(coro_await_task(size)) assert results == list(range(size)) + + def test_run_once(self): + # run once won't clear the read queue + self.loop.create_task(coro_await_task(10)) + self.loop.stop() + self.loop.run_forever() + assert len(self.loop._ready) == 10 + + def test_run_until_yield(self): + # run until yield will clear the read queue + task = self.loop.create_task(coro_await_task(3)) + self.loop.run_until_yield() + assert len(self.loop._ready) == 0 + assert task.done() is True