Skip to content

Commit

Permalink
scheduler, task_pool, task_proxy refactor
Browse files Browse the repository at this point in the history
Remove broadcast server dependency from task_proxy.
Simplify (re)try timers logic.
  • Loading branch information
matthewrmshin committed Mar 23, 2017
1 parent a97d23c commit e573c0b
Show file tree
Hide file tree
Showing 9 changed files with 327 additions and 344 deletions.
28 changes: 19 additions & 9 deletions lib/cylc/network/https/suite_broadcast_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,22 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Server-side suite broadcast interface."""

import json
import re
import threading

from cylc.broadcast_report import (
CHANGE_FMT, CHANGE_PREFIX_SET,
get_broadcast_change_iter,
get_broadcast_change_report,
get_broadcast_bad_options_report)
from cylc.cycling.loader import get_point, standardise_point_string
from cylc.wallclock import get_current_time_string
from cylc.network import COMMS_BCAST_OBJ_NAME
from cylc.network.https.base_server import BaseCommsServer
from cylc.network.https.util import unicode_encode
from cylc.network import check_access_priv
from cylc.suite_logging import LOG
from cylc.suite_logging import LOG, OUT
from cylc.task_id import TaskID
from cylc.rundb import CylcSuiteDAO

Expand Down Expand Up @@ -147,7 +147,7 @@ def put(self, point_strings=None, namespaces=None, settings=None,
bad_point = False
try:
point_string = standardise_point_string(point_string)
except Exception as exc:
except Exception:
if point_string != '*':
bad_point_strings.append(point_string)
bad_point = True
Expand Down Expand Up @@ -316,20 +316,30 @@ def _settings_to_keys_list(settings):
keys_list.append(keys + [key])
return keys_list

def load_state(self, point, namespace, key, value):
def load_db_broadcast_states(self, row_idx, row):
"""Load broadcast variables from runtime DB broadcast states row."""
if row_idx == 0:
OUT.info("LOADING broadcast states")
point, namespace, key, value = row
sections = []
if "]" in key:
sections = self.REC_SECTION.findall(key)
key = key.rsplit(r"]", 1)[-1]
cur_key = key
if "]" in cur_key:
sections = self.REC_SECTION.findall(cur_key)
cur_key = cur_key.rsplit(r"]", 1)[-1]
with self.lock:
self.settings.setdefault(point, {})
self.settings[point].setdefault(namespace, {})
dict_ = self.settings[point][namespace]
for section in sections:
dict_.setdefault(section, {})
dict_ = dict_[section]
dict_[key] = value
dict_[cur_key] = value
OUT.info(CHANGE_FMT.strip() % {
"change": CHANGE_PREFIX_SET,
"point": point,
"namespace": namespace,
"key": key,
"value": value})

@classmethod
def _get_bad_options(
Expand Down
170 changes: 13 additions & 157 deletions lib/cylc/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
"""Cylc scheduler server."""

from copy import deepcopy
from logging import DEBUG, INFO
from logging import DEBUG
import os
import pickle
import Queue
from shutil import copytree, rmtree
from subprocess import Popen, PIPE
Expand All @@ -32,11 +31,8 @@
import isodatetime.parsers
from parsec.util import printcfg

from cylc.broadcast_report import (
CHANGE_FMT as BROADCAST_LOAD_FMT,
CHANGE_PREFIX_SET as BROADCAST_LOAD_PREFIX)
from cylc.cfgspec.globalcfg import GLOBAL_CFG
from cylc.config import SuiteConfig, SuiteConfigError
from cylc.config import SuiteConfig
from cylc.cycling import PointParsingError
from cylc.cycling.loader import get_point, standardise_point_string
from cylc.daemonize import daemonize
Expand Down Expand Up @@ -66,18 +62,11 @@
from cylc.suite_srv_files_mgr import (
SuiteSrvFilesManager, SuiteServiceFileError)
from cylc.taskdef import TaskDef
from cylc.task_action_timer import TaskActionTimer
from cylc.task_id import TaskID
from cylc.task_job_mgr import TaskJobManager, RemoteJobHostInitError
from cylc.task_outputs import TASK_OUTPUT_SUBMITTED
from cylc.task_pool import TaskPool
from cylc.task_proxy import TaskProxy, TaskProxySequenceBoundsError
from cylc.task_state import (
TASK_STATUSES_ACTIVE, TASK_STATUS_WAITING,
TASK_STATUS_QUEUED, TASK_STATUS_READY, TASK_STATUS_SUBMITTED,
TASK_STATUS_SUBMIT_FAILED, TASK_STATUS_SUBMIT_RETRYING,
TASK_STATUS_RUNNING, TASK_STATUS_SUCCEEDED, TASK_STATUS_FAILED,
TASK_STATUS_RETRYING)
from cylc.task_state import TASK_STATUSES_ACTIVE, TASK_STATUS_FAILED
from cylc.templatevars import load_template_vars
from cylc.version import CYLC_VERSION
from cylc.wallclock import (
Expand Down Expand Up @@ -343,7 +332,8 @@ def configure(self):
LOG.info('Final point: ' + str(self.final_point))

self.pool = TaskPool(
self.final_point, self.suite_db_mgr, self.task_events_mgr)
self.config, self.final_point, self.suite_db_mgr,
self.task_events_mgr)
self.message_queue = TaskMessageServer(self.suite)
self.comms_daemon.connect(
self.message_queue, COMMS_TASK_MESSAGE_OBJ_NAME)
Expand Down Expand Up @@ -381,10 +371,6 @@ def configure(self):
self.already_timed_out = False
self.set_suite_timer()

# self.nudge_timer_start = None
# self.nudge_timer_on = False
# self.auto_nudge_interval = 5 # seconds

self.already_inactive = False
if self._get_events_conf(self.EVENT_INACTIVITY_TIMEOUT):
self.set_suite_inactivity_timer()
Expand Down Expand Up @@ -419,39 +405,26 @@ def load_tasks_for_restart(self):
self.suite_db_mgr.pri_dao.select_suite_params(
self._load_suite_params, self.options.checkpoint)
self.suite_db_mgr.pri_dao.select_broadcast_states(
self._load_broadcast_states, self.options.checkpoint)
BroadcastServer.get_inst().load_db_broadcast_states,
self.options.checkpoint)
self.suite_db_mgr.pri_dao.select_task_job_run_times(
self._load_task_run_times)
self.suite_db_mgr.pri_dao.select_task_pool_for_restart(
self._load_task_pool, self.options.checkpoint)
self.pool.load_db_task_pool_for_restart, self.options.checkpoint)
self.suite_db_mgr.pri_dao.select_task_action_timers(
self._load_task_action_timers)
self.pool.load_db_task_action_timers)
# Re-initialise run directory for user@host for each submitted and
# running tasks.
# Note: tasks should all be in the runahead pool at this point.
for itask in self.pool.get_rh_tasks():
if itask.state.status in [
TASK_STATUS_SUBMITTED, TASK_STATUS_RUNNING]:
if itask.state.status in TASK_STATUSES_ACTIVE:
try:
self.task_job_mgr.init_host(
self.suite, itask.task_host, itask.task_owner)
except RemoteJobHostInitError as exc:
LOG.error(str(exc))
self.command_poll_tasks()

def _load_broadcast_states(self, row_idx, row):
"""Load a setting in the previous broadcast states."""
if row_idx == 0:
OUT.info("LOADING broadcast states")
point, namespace, key, value = row
BroadcastServer.get_inst().load_state(point, namespace, key, value)
OUT.info(BROADCAST_LOAD_FMT.strip() % {
"change": BROADCAST_LOAD_PREFIX,
"point": point,
"namespace": namespace,
"key": key,
"value": value})

def _load_suite_params(self, row_idx, row):
"""Load previous initial/final cycle point."""
if row_idx == 0:
Expand Down Expand Up @@ -506,119 +479,6 @@ def _load_task_run_times(self, row_idx, row):
except (KeyError, ValueError, AttributeError):
return

def _load_task_pool(self, row_idx, row):
"""Load a task from previous task pool.
The state of task prerequisites (satisfied or not) and outputs
(completed or not) is determined by the recorded TASK_STATUS:
TASK_STATUS_WAITING - prerequisites and outputs unsatisified
TASK_STATUS_HELD - ditto (only waiting tasks can be held)
TASK_STATUS_QUEUED - prereqs satisfied, outputs not completed
(only tasks ready to run can get queued)
TASK_STATUS_READY - ditto
TASK_STATUS_SUBMITTED - ditto (but see *)
TASK_STATUS_SUBMIT_RETRYING - ditto
TASK_STATUS_RUNNING - ditto (but see *)
TASK_STATUS_FAILED - ditto (tasks must run in order to fail)
TASK_STATUS_RETRYING - ditto (tasks must fail in order to retry)
TASK_STATUS_SUCCEEDED - prerequisites satisfied, outputs completed
(*) tasks reloaded with TASK_STATUS_SUBMITTED or TASK_STATUS_RUNNING
are polled to determine what their true status is.
"""
if row_idx == 0:
OUT.info("LOADING task proxies")
(cycle, name, spawned, status, hold_swap, submit_num, _,
user_at_host) = row
try:
itask = TaskProxy(
self.config.get_taskdef(name),
get_point(cycle),
status=status,
hold_swap=hold_swap,
has_spawned=bool(spawned),
submit_num=submit_num)
except SuiteConfigError as exc:
if cylc.flags.debug:
ERR.error(traceback.format_exc())
else:
ERR.error(str(exc))
ERR.warning((
"ignoring task %s from the suite run database\n"
"(its task definition has probably been deleted).") % name)
except Exception:
ERR.error(traceback.format_exc())
ERR.error("could not load task %s" % name)
else:
if status in (TASK_STATUS_SUBMITTED, TASK_STATUS_RUNNING):
itask.state.set_prerequisites_all_satisfied()
# update the task proxy with user@host
try:
itask.task_owner, itask.task_host = user_at_host.split(
"@", 1)
except ValueError:
itask.task_owner = None
itask.task_host = user_at_host

elif status in (TASK_STATUS_SUBMIT_FAILED, TASK_STATUS_FAILED):
itask.state.set_prerequisites_all_satisfied()

elif status in (TASK_STATUS_QUEUED, TASK_STATUS_READY):
# reset to waiting as these had not been submitted yet.
itask.state.reset_state(TASK_STATUS_WAITING)
itask.state.set_prerequisites_all_satisfied()

elif status in (TASK_STATUS_SUBMIT_RETRYING, TASK_STATUS_RETRYING):
itask.state.set_prerequisites_all_satisfied()

elif status == TASK_STATUS_SUCCEEDED:
itask.state.set_prerequisites_all_satisfied()
# TODO - just poll for outputs in the job status file.
itask.state.outputs.set_all_completed()

if user_at_host:
itask.summary['job_hosts'][int(submit_num)] = user_at_host
if hold_swap:
OUT.info("+ %s.%s %s (%s)" % (name, cycle, status, hold_swap))
else:
OUT.info("+ %s.%s %s" % (name, cycle, status))
self.pool.add_to_runahead_pool(itask, is_restart=True)

def _load_task_action_timers(self, row_idx, row):
"""Load a task action timer, e.g. event handlers, retry states."""
if row_idx == 0:
OUT.info("LOADING task action timers")
(
cycle, name, ctx_key_pickle, ctx_pickle, delays_pickle, num, delay,
timeout
) = row
id_ = TaskID.get(name, cycle)
itask = self.pool.get_task_by_id(id_)
if itask is None:
ERR.warning("%(id)s: task not found, skip" % {"id": id_})
return
ctx_key = "?"
try:
ctx_key = pickle.loads(str(ctx_key_pickle))
ctx = pickle.loads(str(ctx_pickle))
delays = pickle.loads(str(delays_pickle))
if ctx_key and ctx_key[0] in ["poll_timers", "try_timers"]:
getattr(itask, ctx_key[0])[ctx_key[1]] = TaskActionTimer(
ctx, delays, num, delay, timeout)
else:
key1, submit_num = ctx_key
key = (key1, cycle, name, submit_num)
self.task_events_mgr.event_timers[key] = TaskActionTimer(
ctx, delays, num, delay, timeout)
except (EOFError, TypeError, LookupError, ValueError):
ERR.warning(
"%(id)s: skip action timer %(ctx_key)s" %
{"id": id_, "ctx_key": ctx_key})
ERR.warning(traceback.format_exc())
return
OUT.info("+ %s.%s %s" % (name, cycle, ctx_key))

def process_queued_task_messages(self):
"""Handle incoming task messages for each task proxy."""
queue = self.message_queue.get_queue()
Expand Down Expand Up @@ -896,7 +756,7 @@ def command_reload_suite(self):
LOG.info("Reloading the suite definition.")
old_tasks = set(self.config.get_task_name_list())
self.configure_suite(reconfigure=True)
self.pool.reconfigure(self.final_point)
self.pool.reconfigure(self.config, self.final_point)
self.task_events_mgr.mail_interval = self._get_cylc_conf(
"task event mail interval")
self.task_events_mgr.mail_footer = self._get_events_conf("mail footer")
Expand Down Expand Up @@ -1291,12 +1151,8 @@ def run(self):
continue
deps = itask.state.get_resolved_dependencies()
LOG.info('triggered off %s' % deps, itask=itask)
if self.run_mode == 'simulation':
for itask in itasks:
self.task_events_mgr.process_message(
itask, INFO, TASK_OUTPUT_SUBMITTED)
else:
self.task_job_mgr.submit_task_jobs(self.suite, itasks)
self.task_job_mgr.submit_task_jobs(
self.suite, itasks, self.run_mode == 'simulation')
for meth in [
self.pool.spawn_all_tasks,
self.pool.remove_spent_tasks,
Expand Down
13 changes: 9 additions & 4 deletions lib/cylc/task_action_timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,8 @@ class TaskActionTimer(object):

def __init__(self, ctx=None, delays=None, num=0, delay=None, timeout=None):
self.ctx = ctx
if delays is None:
self.delays = [float(0)]
else:
self.delays = [float(delay) for delay in delays]
self.delays = None
self.set_delays(delays)
self.num = int(num)
if delay is not None:
delay = float(delay)
Expand Down Expand Up @@ -85,6 +83,13 @@ def reset(self):
self.timeout = None
self.is_waiting = False

def set_delays(self, delays=None):
"""Set delays, ensuring that the values are floats."""
if delays is None:
self.delays = [float(0)]
else:
self.delays = [float(delay) for delay in delays]

def set_waiting(self):
"""Set waiting flag, while waiting for action to complete."""
self.delay = None
Expand Down

0 comments on commit e573c0b

Please sign in to comment.