Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow-specific hold/release #5698

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
5 changes: 5 additions & 0 deletions cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -2035,6 +2035,11 @@ class Meta:
''')
resolver = mutator

class Arguments(TaskMutation.Arguments):
flow_num = Int(
description='Number of flow to hold.'
)


class Release(Mutation, TaskMutation):
class Meta:
Expand Down
5 changes: 3 additions & 2 deletions cylc/flow/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ class CylcWorkflowDAO:
TABLE_TASKS_TO_HOLD: [
["name"],
["cycle"],
["flow"],
],
}

Expand Down Expand Up @@ -939,11 +940,11 @@ def select_task_prerequisites(
stmt_args = [cycle, name, flow_nums]
return list(self.connect().execute(stmt, stmt_args))

def select_tasks_to_hold(self) -> List[Tuple[str, str]]:
def select_tasks_to_hold(self) -> List[Tuple[str, str, str]]:
"""Return all tasks to hold stored in the DB."""
stmt = rf'''
SELECT
name, cycle
name, cycle, flow
FROM
{self.TABLE_TASKS_TO_HOLD}
''' # nosec (table name is code constant)
Expand Down
23 changes: 12 additions & 11 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 +905,8 @@ def get_command_method(self, command_name: str) -> Callable:
def queue_command(self, command: str, kwargs: dict) -> None:
self.command_queue.put((
command,
tuple(kwargs.values()), {}
[],
kwargs,
hjoliver marked this conversation as resolved.
Show resolved Hide resolved
))

async def process_command_queue(self) -> None:
Expand Down Expand Up @@ -1011,9 +1012,9 @@ def _set_stop(self, stop_mode: Optional[StopMode] = None) -> None:
self.stop_mode = stop_mode
self.update_data_store()

def command_release(self, task_globs: Iterable[str]) -> int:
def command_release(self, tasks: Iterable[str]) -> int:
"""Release held tasks."""
return self.pool.release_held_tasks(task_globs)
return self.pool.release_held_tasks(tasks)

def command_release_hold_point(self) -> None:
"""Release all held tasks and unset workflow hold after cycle point,
Expand All @@ -1025,17 +1026,17 @@ def command_resume(self) -> None:
"""Resume paused workflow."""
self.resume_workflow()

def command_poll_tasks(self, items: List[str]) -> int:
def command_poll_tasks(self, tasks: List[str]) -> int:
"""Poll pollable tasks or a task or family if options are provided."""
if self.config.run_mode('simulation'):
return 0
itasks, _, bad_items = self.pool.filter_task_proxies(items)
itasks, _, bad_items = self.pool.filter_task_proxies(tasks)
self.task_job_mgr.poll_task_jobs(self.workflow, itasks)
return len(bad_items)

def command_kill_tasks(self, items: List[str]) -> int:
def command_kill_tasks(self, tasks: List[str]) -> int:
"""Kill all tasks or a task/family if options are provided."""
itasks, _, bad_items = self.pool.filter_task_proxies(items)
itasks, _, bad_items = self.pool.filter_task_proxies(tasks)
if self.config.run_mode('simulation'):
for itask in itasks:
if itask.state(*TASK_STATUSES_ACTIVE):
Expand All @@ -1045,9 +1046,9 @@ def command_kill_tasks(self, items: List[str]) -> int:
self.task_job_mgr.kill_task_jobs(self.workflow, itasks)
return len(bad_items)

def command_hold(self, task_globs: Iterable[str]) -> int:
def command_hold(self, tasks: Iterable[str], flow_num=None) -> int:
"""Hold specified tasks."""
return self.pool.hold_tasks(task_globs)
return self.pool.hold_tasks(tasks, flow_num)

def command_set_hold_point(self, point: str) -> None:
"""Hold all tasks after the specified cycle point."""
Expand All @@ -1073,9 +1074,9 @@ def command_set_verbosity(lvl: Union[int, str]) -> None:
raise CommandFailedError(exc)
cylc.flow.flags.verbosity = log_level_to_verbosity(lvl)

def command_remove_tasks(self, items) -> int:
def command_remove_tasks(self, tasks) -> int:
"""Remove tasks."""
return self.pool.remove_tasks(items)
return self.pool.remove_tasks(tasks)

async def command_reload_workflow(self) -> None:
"""Reload workflow configuration."""
Expand Down
12 changes: 10 additions & 2 deletions cylc/flow/scripts/hold.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,12 @@
mutation (
$wFlows: [WorkflowID]!,
$tasks: [NamespaceIDGlob]!
$flowNum: Int,
) {
hold (
workflows: $wFlows,
tasks: $tasks
tasks: $tasks,
flowNum: $flowNum,
) {
result
}
Expand Down Expand Up @@ -114,6 +116,11 @@ def get_option_parser() -> COP:
help="Hold all tasks after this cycle point.",
metavar="CYCLE_POINT", action="store", dest="hold_point_string")

parser.add_option(
"--flow",
help="Hold tasks that belong to a specific flow.",
metavar="INT", action="store", dest="flow_num")
Comment on lines +123 to +126
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumably --flow=all is the default?


return parser


Expand Down Expand Up @@ -144,7 +151,8 @@ async def run(options, workflow_id, *tokens_list):
'tasks': [
id_.relative_id_with_selectors
for id_ in tokens_list
]
],
'flowNum': options.flow_num
}

mutation_kwargs = {
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/scripts/set_outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def get_option_parser() -> COP:
action="append", default=None, dest="outputs")

parser.add_option(
"-f", "--flow", metavar="FLOW",
"-f", "--flow", metavar="INT",
help="Number of the flow to attribute the outputs.",
action="store", default=None, dest="flow_num")

Expand Down
34 changes: 23 additions & 11 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def __init__(
self.task_name_list,
self.config.runtime['descendants']
)
self.tasks_to_hold: Set[Tuple[str, 'PointBase']] = set()
self.tasks_to_hold: Set[Tuple[str, 'PointBase', Optional[int]]] = set()

def set_stop_task(self, task_id):
"""Set stop after a task."""
Expand Down Expand Up @@ -673,7 +673,7 @@ def load_db_tasks_to_hold(self):
"""Update the tasks_to_hold set with the tasks stored in the
database."""
self.tasks_to_hold.update(
(name, get_point(cycle)) for name, cycle in
(name, get_point(cycle), flow_num) for name, cycle in
self.workflow_db_mgr.pri_dao.select_tasks_to_hold()
)

Expand Down Expand Up @@ -1169,10 +1169,12 @@ def is_stalled(self) -> bool:
unsatisfied = self.log_unsatisfied_prereqs()
return (incomplete or unsatisfied)

def hold_active_task(self, itask: TaskProxy) -> None:
def hold_active_task(
self, itask: TaskProxy, flow_num: Optional[int]=None
) -> None:
if itask.state_reset(is_held=True):
self.data_store_mgr.delta_task_held(itask)
self.tasks_to_hold.add((itask.tdef.name, itask.point))
self.tasks_to_hold.add((itask.tdef.name, itask.point, flow_num))
self.workflow_db_mgr.put_tasks_to_hold(self.tasks_to_hold)

def release_held_active_task(self, itask: TaskProxy) -> None:
Expand All @@ -1191,7 +1193,7 @@ def set_hold_point(self, point: 'PointBase') -> None:
self.hold_active_task(itask)
self.workflow_db_mgr.put_workflow_hold_cycle_point(point)

def hold_tasks(self, items: Iterable[str]) -> int:
def hold_tasks(self, items: Iterable[str], flow_num=None) -> int:
"""Hold tasks with IDs matching the specified items."""
# Hold active tasks:
itasks, future_tasks, unmatched = self.filter_task_proxies(
Expand All @@ -1200,11 +1202,15 @@ def hold_tasks(self, items: Iterable[str]) -> int:
future=True,
)
for itask in itasks:
if flow_num is not None and flow_num not in itask.flow_nums:
continue
self.hold_active_task(itask)
# Set future tasks to be held:
to_hold = set()
for name, cycle in future_tasks:
self.data_store_mgr.delta_task_held((name, cycle, True))
self.tasks_to_hold.update(future_tasks)
to_hold.add((name, cycle, flow_num))
self.tasks_to_hold.update(to_hold)
self.workflow_db_mgr.put_tasks_to_hold(self.tasks_to_hold)
LOG.debug(f"Tasks to hold: {self.tasks_to_hold}")
return len(unmatched)
Expand Down Expand Up @@ -1529,16 +1535,22 @@ def spawn_task(
is_manual_submit=is_manual_submit,
flow_wait=flow_wait,
)
if (name, point) in self.tasks_to_hold:
LOG.info(f"[{itask}] holding (as requested earlier)")
self.hold_active_task(itask)
elif self.hold_point and itask.point > self.hold_point:
# Hold if beyond the workflow hold point
if self.hold_point and itask.point > self.hold_point:
LOG.info(
f"[{itask}] holding (beyond workflow "
f"hold point: {self.hold_point})"
)
self.hold_active_task(itask)
else:
for (h_name, h_point, h_flow) in self.tasks_to_hold:
if (
h_name == name
and h_point == point
and (h_flow is None or h_flow in flow_nums)
):
LOG.info(f"[{itask}] holding (as requested earlier)")
self.hold_active_task(itask, h_flow)
break

if self.stop_point and itask.point <= self.stop_point:
future_trigger_overrun = False
Expand Down
6 changes: 3 additions & 3 deletions cylc/flow/workflow_db_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ def put_task_pool(self, pool: 'TaskPool') -> None:
itask.state.time_updated = None

def put_tasks_to_hold(
self, tasks: Set[Tuple[str, 'PointBase']]
self, tasks: Set[Tuple[str, 'PointBase', Optional[int]]]
) -> None:
"""Replace the tasks in the tasks_to_hold table."""
# There isn't that much cost in calling this multiple times between
Expand All @@ -528,8 +528,8 @@ def put_tasks_to_hold(
# whole table each time the queue is processed is a bit inefficient.
self.db_deletes_map[self.TABLE_TASKS_TO_HOLD] = [{}]
self.db_inserts_map[self.TABLE_TASKS_TO_HOLD] = [
{"name": name, "cycle": str(point)}
for name, point in tasks
{"name": name, "cycle": str(point), "flow": flow_num}
for name, point, flow_num in tasks
]

def put_insert_task_events(self, itask, args):
Expand Down