Skip to content

Commit

Permalink
Refactor cylc.task_proxy 1
Browse files Browse the repository at this point in the history
Reduce number of attributes.
`cylc.config.SuiteConfig` absorbs `get_task_proxy`.
  • Loading branch information
matthewrmshin committed Feb 21, 2017
1 parent 59b6c81 commit 04f7a49
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 214 deletions.
10 changes: 5 additions & 5 deletions bin/cylc-submit
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ from cylc.cfgspec.globalcfg import GLOBAL_CFG
from cylc.config import SuiteConfig
from cylc.cycling.loader import get_point
import cylc.flags
from cylc.get_task_proxy import get_task_proxy
from cylc.job_file import JobFile
from cylc.job_host import RemoteJobHostManager
from cylc.mp_pool import SuiteProcPool
from cylc.option_parsers import CylcOptionParser as COP
from cylc.suite_srv_files_mgr import SuiteSrvFilesManager
from cylc.task_id import TaskID
from cylc.task_proxy import TaskProxy
from cylc.task_state import TASK_STATUS_SUBMIT_FAILED
from cylc.templatevars import load_template_vars
import cylc.version # Ensures '$CYLC_VERSION' is set.
Expand Down Expand Up @@ -105,10 +105,10 @@ def main():
})

task_name, point_string = TaskID.split(task_id)
point = get_point(point_string).standardise()
# Try to get a graphed task of the given name.
itask = get_task_proxy(task_name, point, is_startup=True)

itask = TaskProxy(
config.get_taskdef(task_name),
get_point(point_string).standardise(),
is_startup=True)
if itask.prep_submit(dry_run=options.dry_run) is None:
sys.exit(1)
if options.dry_run:
Expand Down
37 changes: 21 additions & 16 deletions lib/cylc/cfgspec/suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,14 @@ def _coerce_final_cycletime(value, keys, _):
return _strip_and_unquote(keys, value)


def _coerce_interval_x_2(value, keys, args):
"""Coerce interval list with 2 elements."""
values = coerce_interval_list(value, keys, args)
if len(values) != 2:
raise IllegalValueError("ISO 8601 interval * 2", keys, value)
return values


def _coerce_parameter_list(value, keys, _):
"""Coerce parameter list."""
value = _strip_and_unquote_list(keys, value)
Expand All @@ -162,6 +170,7 @@ def _coerce_parameter_list(value, keys, _):
coercers['final_cycletime'] = _coerce_final_cycletime
coercers['interval'] = coerce_interval
coercers['interval_list'] = coerce_interval_list
coercers['interval_x_2'] = _coerce_interval_x_2
coercers['parameter_list'] = _coerce_parameter_list


Expand Down Expand Up @@ -322,7 +331,7 @@ def _coerce_parameter_list(value, keys, _):
},
'simulation mode': {
'run time range': vdr(
vtype='interval_list',
vtype='interval_x_2',
default=[DurationFloat(1), DurationFloat(16)]),
'simulate failure': vdr(vtype='boolean', default=False),
'disable task event hooks': vdr(vtype='boolean', default=True),
Expand Down Expand Up @@ -373,21 +382,17 @@ def _coerce_parameter_list(value, keys, _):
'reset timer': vdr(vtype='boolean', default=None),
'submission timeout': vdr(vtype='interval'),

'expired handler': vdr(vtype='string_list', default=[]),
'submitted handler': vdr(vtype='string_list', default=[]),
'started handler': vdr(vtype='string_list', default=[]),
'succeeded handler': vdr(vtype='string_list', default=[]),
'failed handler': vdr(vtype='string_list', default=[]),
'submission failed handler': vdr(
vtype='string_list', default=[]),
'warning handler': vdr(vtype='string_list', default=[]),
'retry handler': vdr(vtype='string_list', default=[]),
'submission retry handler': vdr(
vtype='string_list', default=[]),
'execution timeout handler': vdr(
vtype='string_list', default=[]),
'submission timeout handler': vdr(
vtype='string_list', default=[]),
'expired handler': vdr(vtype='string_list'),
'submitted handler': vdr(vtype='string_list'),
'started handler': vdr(vtype='string_list'),
'succeeded handler': vdr(vtype='string_list'),
'failed handler': vdr(vtype='string_list'),
'submission failed handler': vdr(vtype='string_list'),
'warning handler': vdr(vtype='string_list'),
'retry handler': vdr(vtype='string_list'),
'submission retry handler': vdr(vtype='string_list'),
'execution timeout handler': vdr(vtype='string_list'),
'submission timeout handler': vdr(vtype='string_list'),
},
'suite state polling': {
'user': vdr(vtype='string'),
Expand Down
30 changes: 15 additions & 15 deletions lib/cylc/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from cylc.print_tree import print_tree
from cylc.taskdef import TaskDef, TaskDefError
from cylc.task_id import TaskID
from cylc.task_proxy import TaskProxy
from cylc.task_trigger import TaskTrigger
from cylc.wallclock import get_current_time_string

Expand Down Expand Up @@ -92,13 +93,6 @@ def __init__(self, msg):
def __str__(self):
return repr(self.msg)


class TaskNotDefinedError(SuiteConfigError):
"""A named task not defined."""

def __str__(self):
return "Task not defined: %s" % self.msg

# TODO: separate config for run and non-run purposes?


Expand Down Expand Up @@ -1405,12 +1399,7 @@ def generate_taskdefs(self, orig_expr, left_nodes, right, section, seq,
self.ns_defn_order.append(name)

# check task name legality and create the taskdef
if name not in self.taskdefs:
try:
self.taskdefs[name] = self.get_taskdef(name)
except TaskDefError as exc:
ERR.error(orig_expr)
raise SuiteConfigError(str(exc))
self.get_taskdef(name, orig_expr)

if name in self.suite_polling_tasks:
self.taskdefs[name].suite_polling_cfg = {
Expand Down Expand Up @@ -1861,13 +1850,24 @@ def _proc_triggers(self, triggers, original, section, seq):
self.generate_triggers(
expr, lefts, right, seq, suicide, base_interval)

def get_taskdef(self, name):
def get_taskdef(self, name, orig_expr=None):
"""Return an instance of TaskDef for task name."""
if name not in self.taskdefs:
try:
self.taskdefs[name] = self._get_taskdef(name)
except TaskDefError as exc:
if orig_expr:
ERR.error(orig_expr)
raise SuiteConfigError(str(exc))
return self.taskdefs[name]

def _get_taskdef(self, name):
"""Get the dense task runtime."""
# (TaskDefError caught above)
try:
rtcfg = self.cfg['runtime'][name]
except KeyError:
raise TaskNotDefinedError(name)
raise SuiteConfigError("Task not defined: %s" % name)
# We may want to put in some handling for cases of changing the
# initial cycle via restart (accidentally or otherwise).

Expand Down
30 changes: 0 additions & 30 deletions lib/cylc/get_task_proxy.py

This file was deleted.

26 changes: 10 additions & 16 deletions lib/cylc/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,12 @@
CHANGE_FMT as BROADCAST_LOAD_FMT,
CHANGE_PREFIX_SET as BROADCAST_LOAD_PREFIX)
from cylc.cfgspec.globalcfg import GLOBAL_CFG
from cylc.config import SuiteConfig, TaskNotDefinedError
from cylc.config import SuiteConfig, SuiteConfigError
from cylc.cycling import PointParsingError
from cylc.cycling.loader import get_point, standardise_point_string
from cylc.daemonize import daemonize
from cylc.exceptions import CylcError
import cylc.flags
from cylc.get_task_proxy import get_task_proxy
from cylc.job_file import JobFile
from cylc.job_host import RemoteJobHostManager, RemoteJobHostInitError
from cylc.log_diagnosis import LogSpec
Expand Down Expand Up @@ -465,14 +464,12 @@ def load_tasks_for_run(self):
# No start cycle point at which to load cycling tasks.
continue
try:
itask = get_task_proxy(
name, self.start_point, is_startup=True,
message_queue=self.pool.message_queue)
self.pool.add_to_runahead_pool(TaskProxy(
self.config.get_taskdef(name), self.start_point,
is_startup=True))
except TaskProxySequenceBoundsError as exc:
self.log.debug(str(exc))
continue
# Load task.
self.pool.add_to_runahead_pool(itask)

def load_tasks_for_restart(self):
"""Load tasks for restart."""
Expand Down Expand Up @@ -590,24 +587,22 @@ def _load_task_pool(self, row_idx, row):
(cycle, name, spawned, status, hold_swap, submit_num, try_num,
user_at_host) = row
try:
itask = get_task_proxy(
name,
itask = TaskProxy(
self.config.get_taskdef(name),
get_point(cycle),
status=status,
hold_swap=hold_swap,
has_spawned=bool(spawned),
submit_num=submit_num,
is_reload_or_restart=True,
message_queue=self.pool.message_queue)
except TaskNotDefinedError as exc:
is_reload_or_restart=True)
except SuiteConfigError as exc:
if cylc.flags.debug:
ERR.error(traceback.format_exc())
else:
ERR.error(str(exc))
ERR.warning((
"ignoring task %s from the suite run database file\n"
"(the task definition has probably been deleted from the "
"suite).") % name)
"ignoring task %s from the suite run database\n"
"(its task definition has probably been deleted).") % name)
except Exception:
ERR.error(traceback.format_exc())
ERR.error("could not load task %s" % name)
Expand Down Expand Up @@ -835,7 +830,6 @@ def command_stop_now(self, terminate=False):
def _set_stop(self, stop_mode=None):
"""Set shutdown mode."""
SuiteProcPool.get_inst().stop_job_submission()
TaskProxy.stop_sim_mode_job_submission = True
if stop_mode is None:
stop_mode = TaskPool.STOP_REQUEST_CLEAN
self.stop_mode = stop_mode
Expand Down
47 changes: 33 additions & 14 deletions lib/cylc/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import os
import pickle
import Queue
from random import randrange
from time import time
import traceback

Expand All @@ -48,13 +49,13 @@
get_interval, get_interval_cls, get_point, ISO8601_CYCLING_TYPE,
standardise_point_string)
import cylc.flags
from cylc.get_task_proxy import get_task_proxy
from cylc.mp_pool import SuiteProcPool, SuiteProcContext
from cylc.network.ext_trigger_server import ExtTriggerServer
from cylc.network.suite_broadcast_server import BroadcastServer
from cylc.owner import is_remote_user
from cylc.rundb import CylcSuiteDAO
from cylc.suite_host import is_remote_host
from cylc.task_proxy import TaskProxy
from cylc.task_state import (
TASK_STATUSES_ACTIVE, TASK_STATUSES_NOT_STALLED, TASK_STATUSES_FINAL,
TASK_STATUS_HELD, TASK_STATUS_WAITING, TASK_STATUS_EXPIRED,
Expand Down Expand Up @@ -227,9 +228,9 @@ def insert_tasks(self, items, stop_point_str, no_check=False):
if (name_str, point_str) in task_states_data:
submit_num = task_states_data[(name_str, point_str)].get(
"submit_num")
new_task = get_task_proxy(
name_str, get_point(point_str), stop_point=stop_point,
submit_num=submit_num, message_queue=self.message_queue)
new_task = TaskProxy(
config.get_taskdef(name_str), get_point(point_str),
stop_point=stop_point, submit_num=submit_num)
if new_task:
self.add_to_runahead_pool(new_task)
return n_warnings
Expand Down Expand Up @@ -723,6 +724,7 @@ def reload_taskdefs(self):
for task in self.orphans:
if task not in [tsk.tdef.name for tsk in self.get_all_tasks()]:
getLogger("log").log(WARNING, "Removed task: '%s'" % (task,))
config = SuiteConfig.get_inst()
for itask in self.get_all_tasks():
if itask.tdef.name in self.orphans:
if itask.state.status in [
Expand All @@ -737,10 +739,11 @@ def reload_taskdefs(self):
itask.has_spawned = True
itask.log(WARNING, "last instance (orphaned by reload)")
else:
new_task = get_task_proxy(
itask.tdef.name, itask.point, itask.state.status,
stop_point=itask.stop_point, submit_num=itask.submit_num,
is_reload_or_restart=True, pre_reload_inst=itask)
new_task = TaskProxy(
config.get_taskdef(itask.tdef.name), itask.point,
itask.state.status, stop_point=itask.stop_point,
submit_num=itask.submit_num, is_reload_or_restart=True,
pre_reload_inst=itask)
self.remove(itask, '(suite definition reload)')
self.add_to_runahead_pool(new_task)
self.log.info("Reload completed.")
Expand Down Expand Up @@ -1315,13 +1318,28 @@ def check_auto_shutdown(self):
return shutdown

def sim_time_check(self):
"""Simulation mode: simulate task run times and set states."""
sim_task_succeeded = False
for itask in self.get_tasks():
if itask.state.status == TASK_STATUS_RUNNING:
# Automatically set sim-mode tasks to TASK_STATUS_SUCCEEDED
# after their alotted run time.
if itask.sim_time_check():
sim_task_succeeded = True
if itask.state.status != TASK_STATUS_RUNNING:
continue
# Automatically set sim-mode tasks to TASK_STATUS_SUCCEEDED
# after their alotted run time.
rrange = itask.tdef.rtconfig['simulation mode']['run time range']
timeout = (itask.summary['started_time'] +
randrange(rrange[0], rrange[1]))
if time() > timeout:
if itask.tdef.rtconfig['simulation mode']['simulate failure']:
self.message_queue.put(
itask.identity, 'NORMAL', TASK_STATUS_SUBMITTED)
self.message_queue.put(
itask.identity, 'CRITICAL', TASK_STATUS_FAILED)
else:
self.message_queue.put(
itask.identity, 'NORMAL', TASK_STATUS_SUBMITTED)
self.message_queue.put(
itask.identity, 'NORMAL', TASK_STATUS_SUCCEEDED)
sim_task_succeeded = True
return sim_task_succeeded

def waiting_tasks_ready(self):
Expand Down Expand Up @@ -1398,7 +1416,8 @@ def get_task_requisites(self, items):
if itask.tdef.clocktrigger_offset is not None:
extras['Clock trigger time reached'] = (
itask.start_time_reached())
extras['Triggers at'] = itask.delayed_start_str
extras['Triggers at'] = get_time_string_from_unix_time(
itask.delayed_start)
for trig, satisfied in itask.state.external_triggers.items():
if satisfied:
state = 'satisfied'
Expand Down

0 comments on commit 04f7a49

Please sign in to comment.