From 10ba9a7b8112a3ecc35c593148526387dff4ac22 Mon Sep 17 00:00:00 2001 From: David Sutherland Date: Mon, 8 Jul 2024 21:09:52 +1200 Subject: [PATCH 1/4] sequential clock spawning fix --- cylc/flow/xtrigger_mgr.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cylc/flow/xtrigger_mgr.py b/cylc/flow/xtrigger_mgr.py index 4f33ec7f3c..5ba90738a0 100644 --- a/cylc/flow/xtrigger_mgr.py +++ b/cylc/flow/xtrigger_mgr.py @@ -644,6 +644,8 @@ def call_xtriggers_async(self, itask: 'TaskProxy'): if sig in self.sat_xtrig: # Already satisfied, just update the task itask.state.xtriggers[label] = True + if self.all_task_seq_xtriggers_satisfied(itask): + self.sequential_spawn_next.add(itask.identity) elif _wall_clock(*ctx.func_args, **ctx.func_kwargs): # Newly satisfied itask.state.xtriggers[label] = True From 3c846c0c2917ac61ea677e01820202c318c70022 Mon Sep 17 00:00:00 2001 From: David Sutherland Date: Mon, 8 Jul 2024 21:27:42 +1200 Subject: [PATCH 2/4] change log entry --- changes.d/6206.fix.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes.d/6206.fix.md diff --git a/changes.d/6206.fix.md b/changes.d/6206.fix.md new file mode 100644 index 0000000000..fef5fb1ec2 --- /dev/null +++ b/changes.d/6206.fix.md @@ -0,0 +1 @@ +Fixes the spawning of multiple parentless tasks off the same sequential wall-clock xtrigger. \ No newline at end of file From b25735e3c72e923146d8e14a49dc963cfc9d066e Mon Sep 17 00:00:00 2001 From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> Date: Mon, 8 Jul 2024 15:56:40 +0100 Subject: [PATCH 3/4] Tidy --- tests/integration/test_xtrigger_mgr.py | 31 +++++--------------------- 1 file changed, 5 insertions(+), 26 deletions(-) diff --git a/tests/integration/test_xtrigger_mgr.py b/tests/integration/test_xtrigger_mgr.py index 3bf425650c..e551929835 100644 --- a/tests/integration/test_xtrigger_mgr.py +++ b/tests/integration/test_xtrigger_mgr.py @@ -38,9 +38,6 @@ async def test_2_xtriggers(flow, start, scheduler, monkeypatch): lambda: ten_years_ahead - 1 ) id_ = flow({ - 'scheduler': { - 'allow implicit tasks': True - }, 'scheduling': { 'initial cycle point': '2020-05-05', 'xtriggers': { @@ -72,31 +69,19 @@ async def test_2_xtriggers(flow, start, scheduler, monkeypatch): } -async def test_1_xtrigger_2_tasks(flow, start, scheduler, monkeypatch, mocker): +async def test_1_xtrigger_2_tasks(flow, start, scheduler, mocker): """ If multiple tasks depend on the same satisfied xtrigger, the DB mgr method - put_xtriggers should only be called once - when the xtrigger gets satisfied. + put_xtriggers should only be called once - when the xtrigger gets satisfied See [GitHub #5908](https://github.com/cylc/cylc-flow/pull/5908) """ - task_point = 1588636800 # 2020-05-05 - ten_years_ahead = 1904169600 # 2030-05-05 - monkeypatch.setattr( - 'cylc.flow.xtriggers.wall_clock.time', - lambda: ten_years_ahead - 1 - ) id_ = flow({ - 'scheduler': { - 'allow implicit tasks': True - }, 'scheduling': { - 'initial cycle point': '2020-05-05', - 'xtriggers': { - 'clock_1': 'wall_clock()', - }, + 'initial cycle point': '2020', 'graph': { - 'R1': '@clock_1 => foo & bar' + 'R1': '@wall_clock => foo & bar' } } }) @@ -112,7 +97,7 @@ async def test_1_xtrigger_2_tasks(flow, start, scheduler, monkeypatch, mocker): schd.xtrigger_mgr.call_xtriggers_async(task) # It should now be satisfied. - assert task.state.xtriggers == {'clock_1': True} + assert task.state.xtriggers == {'wall_clock': True} # Check one put_xtriggers call only, not two. assert spy.call_count == 1 @@ -128,9 +113,6 @@ async def test_xtriggers_restart(flow, start, scheduler, db_select): """It should write xtrigger results to the DB and load them on restart.""" # define a workflow which uses a custom xtrigger id_ = flow({ - 'scheduler': { - 'allow implicit tasks': 'True' - }, 'scheduling': { 'xtriggers': { 'mytrig': 'mytrig()' @@ -194,9 +176,6 @@ async def test_error_in_xtrigger(flow, start, scheduler): """Failure in an xtrigger is handled nicely. """ id_ = flow({ - 'scheduler': { - 'allow implicit tasks': 'True' - }, 'scheduling': { 'xtriggers': { 'mytrig': 'mytrig()' From 00784eed9c8f0666f5a7ac61506c57367b6bc909 Mon Sep 17 00:00:00 2001 From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> Date: Mon, 8 Jul 2024 15:59:43 +0100 Subject: [PATCH 4/4] Add test for sequential clock trigger bugfix Test that all tasks dependent on a sequential clock trigger continue to spawn after the first cycle --- tests/integration/test_xtrigger_mgr.py | 39 ++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/tests/integration/test_xtrigger_mgr.py b/tests/integration/test_xtrigger_mgr.py index e551929835..d04c202bd4 100644 --- a/tests/integration/test_xtrigger_mgr.py +++ b/tests/integration/test_xtrigger_mgr.py @@ -18,8 +18,14 @@ import asyncio from pathlib import Path from textwrap import dedent +from typing import Set from cylc.flow.pathutil import get_workflow_run_dir +from cylc.flow.scheduler import Scheduler + + +def get_task_ids(schd: Scheduler) -> Set[str]: + return {task.identity for task in schd.pool.get_tasks()} async def test_2_xtriggers(flow, start, scheduler, monkeypatch): @@ -210,3 +216,36 @@ def mytrig(*args, **kwargs): error = log.messages[-1].split('\n') assert error[-2] == 'Exception: This Xtrigger is broken' assert error[0] == 'ERROR in xtrigger mytrig()' + + +async def test_1_seq_clock_trigger_2_tasks(flow, start, scheduler): + """Test that all tasks dependent on a sequential clock trigger continue to + spawn after the first cycle. + + See https://github.com/cylc/cylc-flow/issues/6204 + """ + id_ = flow({ + 'scheduler': { + 'cycle point format': 'CCYY', + }, + 'scheduling': { + 'initial cycle point': '1990', + 'graph': { + 'P1Y': '@wall_clock => foo & bar', + }, + }, + }) + schd: Scheduler = scheduler(id_) + + async with start(schd): + start_task_pool = get_task_ids(schd) + assert start_task_pool == {'1990/foo', '1990/bar'} + + for _ in range(3): + await schd._main_loop() + + assert get_task_ids(schd) == start_task_pool.union( + f'{year}/{name}' + for year in range(1991, 1994) + for name in ('foo', 'bar') + )