Skip to content

Commit

Permalink
sequential clock spawning fix (#6206)
Browse files Browse the repository at this point in the history
sequential clock spawning fix

---------

Co-authored-by: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
  • Loading branch information
dwsutherland and MetRonnie committed Jul 9, 2024
1 parent bb64e9f commit ae6a70d
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 26 deletions.
1 change: 1 addition & 0 deletions changes.d/6206.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixes the spawning of multiple parentless tasks off the same sequential wall-clock xtrigger.
2 changes: 2 additions & 0 deletions cylc/flow/xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,8 @@ def call_xtriggers_async(self, itask: 'TaskProxy'):
if sig in self.sat_xtrig:
# Already satisfied, just update the task
itask.state.xtriggers[label] = True
if self.all_task_seq_xtriggers_satisfied(itask):
self.sequential_spawn_next.add(itask.identity)
elif _wall_clock(*ctx.func_args, **ctx.func_kwargs):
# Newly satisfied
itask.state.xtriggers[label] = True
Expand Down
70 changes: 44 additions & 26 deletions tests/integration/test_xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,14 @@
import asyncio
from pathlib import Path
from textwrap import dedent
from typing import Set

from cylc.flow.pathutil import get_workflow_run_dir
from cylc.flow.scheduler import Scheduler


def get_task_ids(schd: Scheduler) -> Set[str]:
return {task.identity for task in schd.pool.get_tasks()}


async def test_2_xtriggers(flow, start, scheduler, monkeypatch):
Expand All @@ -38,9 +44,6 @@ async def test_2_xtriggers(flow, start, scheduler, monkeypatch):
lambda: ten_years_ahead - 1
)
id_ = flow({
'scheduler': {
'allow implicit tasks': True
},
'scheduling': {
'initial cycle point': '2020-05-05',
'xtriggers': {
Expand Down Expand Up @@ -72,31 +75,19 @@ async def test_2_xtriggers(flow, start, scheduler, monkeypatch):
}


async def test_1_xtrigger_2_tasks(flow, start, scheduler, monkeypatch, mocker):
async def test_1_xtrigger_2_tasks(flow, start, scheduler, mocker):
"""
If multiple tasks depend on the same satisfied xtrigger, the DB mgr method
put_xtriggers should only be called once - when the xtrigger gets satisfied.
put_xtriggers should only be called once - when the xtrigger gets satisfied
See [GitHub #5908](https://github.com/cylc/cylc-flow/pull/5908)
"""
task_point = 1588636800 # 2020-05-05
ten_years_ahead = 1904169600 # 2030-05-05
monkeypatch.setattr(
'cylc.flow.xtriggers.wall_clock.time',
lambda: ten_years_ahead - 1
)
id_ = flow({
'scheduler': {
'allow implicit tasks': True
},
'scheduling': {
'initial cycle point': '2020-05-05',
'xtriggers': {
'clock_1': 'wall_clock()',
},
'initial cycle point': '2020',
'graph': {
'R1': '@clock_1 => foo & bar'
'R1': '@wall_clock => foo & bar'
}
}
})
Expand All @@ -112,7 +103,7 @@ async def test_1_xtrigger_2_tasks(flow, start, scheduler, monkeypatch, mocker):
schd.xtrigger_mgr.call_xtriggers_async(task)

# It should now be satisfied.
assert task.state.xtriggers == {'clock_1': True}
assert task.state.xtriggers == {'wall_clock': True}

# Check one put_xtriggers call only, not two.
assert spy.call_count == 1
Expand All @@ -128,9 +119,6 @@ async def test_xtriggers_restart(flow, start, scheduler, db_select):
"""It should write xtrigger results to the DB and load them on restart."""
# define a workflow which uses a custom xtrigger
id_ = flow({
'scheduler': {
'allow implicit tasks': 'True'
},
'scheduling': {
'xtriggers': {
'mytrig': 'mytrig()'
Expand Down Expand Up @@ -194,9 +182,6 @@ async def test_error_in_xtrigger(flow, start, scheduler):
"""Failure in an xtrigger is handled nicely.
"""
id_ = flow({
'scheduler': {
'allow implicit tasks': 'True'
},
'scheduling': {
'xtriggers': {
'mytrig': 'mytrig()'
Expand Down Expand Up @@ -231,3 +216,36 @@ def mytrig(*args, **kwargs):
error = log.messages[-1].split('\n')
assert error[-2] == 'Exception: This Xtrigger is broken'
assert error[0] == 'ERROR in xtrigger mytrig()'


async def test_1_seq_clock_trigger_2_tasks(flow, start, scheduler):
"""Test that all tasks dependent on a sequential clock trigger continue to
spawn after the first cycle.
See https://github.com/cylc/cylc-flow/issues/6204
"""
id_ = flow({
'scheduler': {
'cycle point format': 'CCYY',
},
'scheduling': {
'initial cycle point': '1990',
'graph': {
'P1Y': '@wall_clock => foo & bar',
},
},
})
schd: Scheduler = scheduler(id_)

async with start(schd):
start_task_pool = get_task_ids(schd)
assert start_task_pool == {'1990/foo', '1990/bar'}

for _ in range(3):
await schd._main_loop()

assert get_task_ids(schd) == start_task_pool.union(
f'{year}/{name}'
for year in range(1991, 1994)
for name in ('foo', 'bar')
)

0 comments on commit ae6a70d

Please sign in to comment.