Skip to content

Commit

Permalink
Merge pull request #6210 from cylc/8.3.x-sync
Browse files Browse the repository at this point in the history
🤖 Merge 8.3.x-sync into master
  • Loading branch information
MetRonnie committed Jul 9, 2024
2 parents a191d82 + 29d26cf commit 768ad61
Show file tree
Hide file tree
Showing 12 changed files with 212 additions and 126 deletions.
1 change: 1 addition & 0 deletions changes.d/6200.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed bug where a stalled paused workflow would be incorrectly reported as running, not paused
1 change: 1 addition & 0 deletions changes.d/6206.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixes the spawning of multiple parentless tasks off the same sequential wall-clock xtrigger.
5 changes: 5 additions & 0 deletions cylc/flow/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,16 +189,19 @@ async def stop(
schd.workflow_db_mgr.put_workflow_stop_cycle_point(
schd.options.stopcp
)
schd._update_workflow_state()
elif clock_time is not None:
# schedule shutdown after wallclock time passes provided time
parser = TimePointParser()
schd.set_stop_clock(
int(parser.parse(clock_time).seconds_since_unix_epoch)
)
schd._update_workflow_state()
elif task is not None:
# schedule shutdown after task succeeds
task_id = TaskID.get_standardised_taskid(task)
schd.pool.set_stop_task(task_id)
schd._update_workflow_state()
else:
# immediate shutdown
with suppress(KeyError):
Expand Down Expand Up @@ -229,6 +232,7 @@ async def release_hold_point(schd: 'Scheduler'):
yield
LOG.info("Releasing all tasks and removing hold cycle point.")
schd.pool.release_hold_point()
schd._update_workflow_state()


@_command('resume')
Expand Down Expand Up @@ -287,6 +291,7 @@ async def set_hold_point(schd: 'Scheduler', point: str):
"All tasks after this point will be held."
)
schd.pool.set_hold_point(cycle_point)
schd._update_workflow_state()


@_command('pause')
Expand Down
45 changes: 36 additions & 9 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,10 @@
pdeepcopy,
poverride
)
from cylc.flow.workflow_status import get_workflow_status
from cylc.flow.workflow_status import (
get_workflow_status,
get_workflow_status_msg,
)
from cylc.flow.task_job_logs import JOB_LOG_OPTS, get_task_job_log
from cylc.flow.task_proxy import TaskProxy
from cylc.flow.task_state import (
Expand Down Expand Up @@ -789,8 +792,9 @@ def increment_graph_window(
source_tokens,
point,
flow_nums,
False,
itask
is_parent=False,
itask=itask,
replace_existing=True,
)

# Pre-populate from previous walks
Expand Down Expand Up @@ -1150,24 +1154,27 @@ def generate_ghost_task(
is_parent: bool = False,
itask: Optional['TaskProxy'] = None,
n_depth: int = 0,
replace_existing: bool = False,
) -> None:
"""Create task-point element populated with static data.
Args:
source_tokens
point
flow_nums
is_parent:
Used to determine whether to load DB state.
itask:
Update task-node from corresponding task proxy object.
is_parent: Used to determine whether to load DB state.
itask: Update task-node from corresponding task proxy object.
n_depth: n-window graph edge distance.
replace_existing: Replace any existing data for task as it may
be out of date (e.g. flow nums).
"""
tp_id = tokens.id
if (
tp_id in self.data[self.workflow_id][TASK_PROXIES]
or tp_id in self.added[TASK_PROXIES]
):
if replace_existing and itask is not None:
self.delta_from_task_proxy(itask)
return

name = tokens['task']
Expand Down Expand Up @@ -2174,8 +2181,8 @@ def update_workflow(self, reloaded=False):
w_delta.latest_state_tasks[state].task_proxies[:] = tp_queue

# Set status & msg if changed.
status, status_msg = map(
str, get_workflow_status(self.schd))
status = get_workflow_status(self.schd).value
status_msg = get_workflow_status_msg(self.schd)
if w_data.status != status or w_data.status_msg != status_msg:
w_delta.status = status
w_delta.status_msg = status_msg
Expand Down Expand Up @@ -2522,6 +2529,26 @@ def delta_task_xtrigger(self, sig, satisfied):
xtrigger.time = update_time
self.updates_pending = True

def delta_from_task_proxy(self, itask: TaskProxy) -> None:
"""Create delta from existing pool task proxy.
Args:
itask (cylc.flow.task_proxy.TaskProxy):
Update task-node from corresponding task proxy
objects from the workflow task pool.
"""
tproxy: Optional[PbTaskProxy]
tp_id, tproxy = self.store_node_fetcher(itask.tokens)
if not tproxy:
return
update_time = time()
tp_delta = self.updated[TASK_PROXIES].setdefault(
tp_id, PbTaskProxy(id=tp_id))
tp_delta.stamp = f'{tp_id}@{update_time}'
self._process_internal_task_proxy(itask, tp_delta)
self.updates_pending = True

# -----------
# Job Deltas
# -----------
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2000,7 +2000,7 @@ def update_data_store(self):
Call this method whenever the Scheduler's state has changed in a way
that requires a data store update.
See cylc.flow.workflow_status.get_workflow_status() for a
See cylc.flow.workflow_status.get_workflow_status_msg() for a
(non-exhaustive?) list of properties that if changed will require
this update.
Expand Down
13 changes: 0 additions & 13 deletions cylc/flow/tui/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,19 +384,6 @@ def get_task_status_summary(flow):
]


def get_workflow_status_str(flow):
"""Return a workflow status string for the header.
Arguments:
flow (dict):
GraphQL JSON response for this workflow.
Returns:
list - Text list for the urwid.Text widget.
"""


def _render_user(node, data):
return f'~{ME}'

Expand Down
4 changes: 2 additions & 2 deletions cylc/flow/wallclock.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"""Wall clock related utilities."""

from calendar import timegm
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone

from metomi.isodatetime.timezone import (
get_local_time_zone_format, get_local_time_zone, TimeZoneFormatMode)
Expand Down Expand Up @@ -209,7 +209,7 @@ def get_time_string_from_unix_time(unix_time, display_sub_seconds=False,
to use as the time zone designator.
"""
date_time = datetime.utcfromtimestamp(unix_time)
date_time = datetime.fromtimestamp(unix_time, timezone.utc)
return get_time_string(date_time,
display_sub_seconds=display_sub_seconds,
use_basic_format=use_basic_format,
Expand Down
111 changes: 57 additions & 54 deletions cylc/flow/workflow_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,18 @@
"""Workflow status constants."""

from enum import Enum
from typing import Tuple, TYPE_CHECKING
from typing import TYPE_CHECKING, Optional, Union

from cylc.flow.cycling.loader import get_point
from cylc.flow.id import tokenise
from cylc.flow.wallclock import get_time_string_from_unix_time as time2str

if TYPE_CHECKING:
from optparse import Values

from cylc.flow.cycling import PointBase
from cylc.flow.scheduler import Scheduler
from cylc.flow.task_pool import TaskPool

# Keys for identify API call
KEY_GROUP = "group"
Expand Down Expand Up @@ -143,62 +148,60 @@ class AutoRestartMode(Enum):
"""Workflow will stop immediately but *not* attempt to restart."""


def get_workflow_status(schd: 'Scheduler') -> Tuple[str, str]:
"""Return the status of the provided workflow.
This should be a short, concise description of the workflow state.
Args:
schd: The running workflow
Returns:
tuple - (state, state_msg)
state:
The WorkflowState.
state_msg:
Text describing the current state (may be an empty string).
def get_workflow_status(schd: 'Scheduler') -> WorkflowStatus:
"""Return the status of the provided workflow."""
if schd.stop_mode is not None:
return WorkflowStatus.STOPPING
if schd.is_paused or schd.reload_pending:
return WorkflowStatus.PAUSED
return WorkflowStatus.RUNNING

"""
status = WorkflowStatus.RUNNING
status_msg = ''

def get_workflow_status_msg(schd: 'Scheduler') -> str:
"""Return a short, concise status message for the provided workflow."""
if schd.stop_mode is not None:
status = WorkflowStatus.STOPPING
status_msg = f'stopping: {schd.stop_mode.explain()}'
elif schd.reload_pending:
status = WorkflowStatus.PAUSED
status_msg = f'reloading: {schd.reload_pending}'
elif schd.is_stalled:
status_msg = 'stalled'
elif schd.is_paused:
status = WorkflowStatus.PAUSED
status_msg = 'paused'
elif schd.pool.hold_point:
status_msg = (
WORKFLOW_STATUS_RUNNING_TO_HOLD %
schd.pool.hold_point)
elif schd.pool.stop_point:
status_msg = (
WORKFLOW_STATUS_RUNNING_TO_STOP %
schd.pool.stop_point)
elif schd.stop_clock_time is not None:
status_msg = (
WORKFLOW_STATUS_RUNNING_TO_STOP %
time2str(schd.stop_clock_time))
elif schd.pool.stop_task_id:
status_msg = (
WORKFLOW_STATUS_RUNNING_TO_STOP %
schd.pool.stop_task_id)
elif schd.config and schd.config.final_point:
status_msg = (
WORKFLOW_STATUS_RUNNING_TO_STOP %
schd.config.final_point)
else:
# fallback - running indefinitely
status_msg = 'running'

return (status.value, status_msg)
return f'stopping: {schd.stop_mode.explain()}'
if schd.reload_pending:
return f'reloading: {schd.reload_pending}'
if schd.is_stalled:
if schd.is_paused:
return 'stalled and paused'
return 'stalled'
if schd.is_paused:
return 'paused'
if schd.stop_clock_time is not None:
return WORKFLOW_STATUS_RUNNING_TO_STOP % time2str(
schd.stop_clock_time
)
stop_point_msg = _get_earliest_stop_point_status_msg(schd.pool)
if stop_point_msg is not None:
return stop_point_msg
if schd.config and schd.config.final_point:
return WORKFLOW_STATUS_RUNNING_TO_STOP % schd.config.final_point
# fallback - running indefinitely
return 'running'


def _get_earliest_stop_point_status_msg(pool: 'TaskPool') -> Optional[str]:
"""Return the status message for the earliest stop point in the pool,
if any."""
template = WORKFLOW_STATUS_RUNNING_TO_STOP
prop: Union[PointBase, str, None] = pool.stop_task_id
min_point: Optional[PointBase] = get_point(
tokenise(pool.stop_task_id, relative=True)['cycle']
if pool.stop_task_id else None
)
for point, tmpl in (
(pool.stop_point, WORKFLOW_STATUS_RUNNING_TO_STOP),
(pool.hold_point, WORKFLOW_STATUS_RUNNING_TO_HOLD)
):
if point is not None and (min_point is None or point < min_point):
template = tmpl
min_point = point
prop = point
if prop is None:
return None
return template % prop


class RunMode:
Expand Down
2 changes: 2 additions & 0 deletions cylc/flow/xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,8 @@ def call_xtriggers_async(self, itask: 'TaskProxy'):
if sig in self.sat_xtrig:
# Already satisfied, just update the task
itask.state.xtriggers[label] = True
if self.all_task_seq_xtriggers_satisfied(itask):
self.sequential_spawn_next.add(itask.identity)
elif _wall_clock(*ctx.func_args, **ctx.func_kwargs):
# Newly satisfied
itask.state.xtriggers[label] = True
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/cylc-show/06-past-present-future.t
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ TEST_NAME="${TEST_NAME_BASE}-show.present"
contains_ok "${WORKFLOW_RUN_DIR}/show-c.txt" <<__END__
state: running
prerequisites: ('⨯': not satisfied)
1/b succeeded
1/b succeeded
__END__

TEST_NAME="${TEST_NAME_BASE}-show.future"
Expand Down
Loading

0 comments on commit 768ad61

Please sign in to comment.