Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't delay reload of active tasks. #1835

Merged
merged 1 commit into from May 16, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/cylc-restart
Expand Up @@ -426,7 +426,7 @@ class restart(Scheduler):
state,
has_spawned,
submit_num=submit_num,
is_reload=True,
is_reload_or_restart=True,
message_queue=self.pool.message_queue
)
except TaskNotDefinedError, x:
Expand Down
24 changes: 6 additions & 18 deletions lib/cylc/scheduler.py
Expand Up @@ -244,8 +244,7 @@ def configure(self):

self.pool = TaskPool(
self.suite, self.pri_dao, self.pub_dao, self.final_point,
self.pyro, self.log, self.run_mode,
self._update_state_summary_callback)
self.pyro, self.log, self.run_mode)
self.state_dumper.pool = self.pool
self.request_handler = PyroRequestHandler(self.pyro)
self.request_handler.start()
Expand Down Expand Up @@ -331,9 +330,6 @@ def process_command_queue(self):
self.do_process_tasks = True
queue.task_done()

def _update_state_summary_callback(self):
self.do_update_state_summary = True

def _task_type_exists(self, name_or_id):
# does a task name or id match a known task type in this suite?
name = name_or_id
Expand Down Expand Up @@ -516,19 +512,14 @@ def command_nudge(self):

def command_reload_suite(self):
"""Reload suite configuration."""
print "RELOADING the suite definition"
self.log.info("Reloading the suite definition.")
self.configure_suite(reconfigure=True)

self.pool.reconfigure(self.final_point)

self.configure_suite_environment()

if self.gen_reference_log or self.reference_test_mode:
self.configure_reftest(recon=True)

# update state SuiteStateDumper state
# update SuiteStateDumper state
self.state_dumper.set_cts(self.initial_point, self.final_point)

self.do_update_state_summary = True

def command_set_runahead(self, *args):
Expand Down Expand Up @@ -657,9 +648,7 @@ def configure_suite(self, reconfigure=False):
# Initial and final cycle times - command line takes precedence.
# self.config already alters the 'initial cycle point' for CLI.
self.initial_point = self.config.initial_point

self.start_point = self.config.start_point

self.final_point = get_point(
self.options.final_point_string or
self.config.cfg['scheduling']['final cycle point']
Expand Down Expand Up @@ -1019,12 +1008,11 @@ def run(self):

t0 = time.time()

if self.pool.reconfiguring:
# suite definition reload still in progress
if self.pool.do_reload:
self.pool.reload_taskdefs()
self.do_update_state_summary = True

self.pool.release_runahead_tasks()

proc_pool.handle_results_async()

# External triggers must be matched now. If any are matched pflag
Expand Down Expand Up @@ -1157,7 +1145,7 @@ def update_state_summary(self):
self.pool.get_min_point(), self.pool.get_max_point(),
self.pool.get_max_point_runahead(), self.paused(),
self.will_pause_at(), self.shut_down_cleanly, self.will_stop_at(),
self.config.ns_defn_order, self.pool.reconfiguring)
self.config.ns_defn_order, self.pool.do_reload)

def check_suite_timer(self):
if self.already_timed_out:
Expand Down
123 changes: 32 additions & 91 deletions lib/cylc/task_pool.py
Expand Up @@ -59,11 +59,12 @@
from cylc.task_id import TaskID
from cylc.task_proxy import TaskProxy
from cylc.task_state import (
TASK_STATUSES_ACTIVE, TASK_STATUS_HELD, TASK_STATUS_WAITING,
TASK_STATUS_EXPIRED, TASK_STATUS_QUEUED, TASK_STATUS_READY,
TASK_STATUS_SUBMITTED, TASK_STATUS_SUBMIT_FAILED,
TASK_STATUS_SUBMIT_RETRYING, TASK_STATUS_RUNNING, TASK_STATUS_SUCCEEDED,
TASK_STATUS_FAILED, TASK_STATUS_RETRYING)
TASK_STATUSES_ACTIVE, TASK_STATUSES_POLLABLE, TASK_STATUSES_KILLABLE,
TASK_STATUS_HELD, TASK_STATUS_WAITING, TASK_STATUS_EXPIRED,
TASK_STATUS_QUEUED, TASK_STATUS_READY, TASK_STATUS_SUBMITTED,
TASK_STATUS_SUBMIT_FAILED, TASK_STATUS_SUBMIT_RETRYING,
TASK_STATUS_RUNNING, TASK_STATUS_SUCCEEDED, TASK_STATUS_FAILED,
TASK_STATUS_RETRYING)


class TaskPool(object):
Expand All @@ -75,16 +76,15 @@ class TaskPool(object):
JOBS_SUBMIT = SuiteProcPool.JOBS_SUBMIT

def __init__(self, suite, pri_dao, pub_dao, stop_point, pyro, log,
run_mode, update_state_summary_callback):
run_mode):
self.suite_name = suite
self.pyro = pyro
self.run_mode = run_mode
self.log = log
self.stop_point = stop_point
self.reconfiguring = False
self.do_reload = False
self.pri_dao = pri_dao
self.pub_dao = pub_dao
self.update_state_summary_callback = update_state_summary_callback

config = SuiteConfig.get_inst()
self.custom_runahead_limit = config.get_custom_runahead_limit()
Expand All @@ -94,7 +94,6 @@ def __init__(self, suite, pri_dao, pub_dao, stop_point, pyro, log,
config.get_max_num_active_cycle_points())
self._prev_runahead_base_point = None
self._prev_runahead_sequence_points = None
self.reload_warned = False
self.message_queue = TaskMessageServer()

self.pyro.connect(self.message_queue, "task_pool")
Expand Down Expand Up @@ -665,7 +664,7 @@ def set_max_future_offset(self):

def reconfigure(self, stop_point):
"""Set the task pool to reload mode."""
self.reconfiguring = True
self.do_reload = True

config = SuiteConfig.get_inst()
self.custom_runahead_limit = config.get_custom_runahead_limit()
Expand All @@ -687,9 +686,6 @@ def reconfigure(self, stop_point):
new_queues[key][id_] = itask
self.queues = new_queues

for itask in self.get_all_tasks():
itask.reconfigure_me = True

# find any old tasks that have been removed from the suite
old_task_name_list = self.task_name_list
self.task_name_list = config.get_task_name_list()
Expand All @@ -704,82 +700,27 @@ def reconfigure(self, stop_point):

def reload_taskdefs(self):
"""Reload task definitions."""
found = False

self.log.info("Reloading task definitions.")
for itask in self.get_all_tasks():
if itask.state.status in [TASK_STATUS_READY,
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING]:
# do not reload active tasks as it would be possible to
# get a task proxy incompatible with the running task
if itask.reconfigure_me:
found = True
continue
if itask.reconfigure_me:
itask.reconfigure_me = False
if itask.tdef.name in self.orphans:
# orphaned task
if itask.state.status in [TASK_STATUS_WAITING,
TASK_STATUS_QUEUED,
TASK_STATUS_SUBMIT_RETRYING,
TASK_STATUS_RETRYING]:
# if not started running yet, remove it.
self.remove(itask, '(task orphaned by suite reload)')
else:
# set spawned so itask won't carry on into the future
itask.has_spawned = True
self.log.warning(
'orphaned task will not continue: ' +
itask.identity)
if itask.tdef.name in self.orphans:
if itask.state.status in [
TASK_STATUS_WAITING, TASK_STATUS_QUEUED,
TASK_STATUS_SUBMIT_RETRYING, TASK_STATUS_RETRYING]:
# Remove orphaned task if it hasn't started running yet.
self.remove(itask, '(task orphaned by suite reload)')
else:
self.log.info(
'RELOADING TASK DEFINITION FOR ' + itask.identity)
new_task = get_task_proxy(
itask.tdef.name,
itask.point,
itask.state.status,
itask.has_spawned,
stop_point=itask.stop_point,
submit_num=itask.submit_num,
is_reload=True
)
# set reloaded task's spawn status
new_task.has_spawned = itask.has_spawned

# succeeded tasks need their outputs set completed:
if itask.state.status == TASK_STATUS_SUCCEEDED:
new_task.state.reset_state(TASK_STATUS_SUCCEEDED)

# carry some task proxy state over to the new instance
new_task.summary = itask.summary
new_task.started_time = itask.started_time
new_task.submitted_time = itask.submitted_time
new_task.finished_time = itask.finished_time

# if currently retrying, retain the old retry delay
# list, to avoid extra retries (the next instance
# of the task will still be as newly configured)
new_task.run_try_state = itask.run_try_state
new_task.sub_try_state = itask.sub_try_state
new_task.submit_num = itask.submit_num
new_task.db_inserts_map = itask.db_inserts_map
new_task.db_updates_map = itask.db_updates_map

self.remove(itask, '(suite definition reload)')
self.add_to_runahead_pool(new_task)

if found:
if not self.reload_warned:
self.log.warning(
"Reload will complete once active tasks have finished.")
self.reload_warned = True
else:
self.log.info("Reload completed.")
self.reload_warned = False
# The GUI detects end of a reload via the suite state summary.
self.update_state_summary_callback()

self.reconfiguring = found
# Keep active orphaned task, but stop it from spawning.
itask.has_spawned = True
itask.log(WARNING, "last instance (orphaned by reload)")
else:
new_task = get_task_proxy(
itask.tdef.name, itask.point, itask.state.status,
stop_point=itask.stop_point, submit_num=itask.submit_num,
is_reload_or_restart=True, pre_reload_inst=itask)
self.remove(itask, '(suite definition reload)')
self.add_to_runahead_pool(new_task)
self.log.info("Reload completed.")
self.do_reload = False

def set_stop_point(self, stop_point):
"""Set the global suite stop point."""
Expand Down Expand Up @@ -826,7 +767,7 @@ def poll_task_jobs(self, items=None, compat=None):
itasks, n_warnings = self._filter_task_proxies(items, compat)
active_itasks = []
for itask in itasks:
if itask.state.status in TASK_STATUSES_ACTIVE:
if itask.state.status in TASK_STATUSES_POLLABLE:
if itask.job_conf is None:
try:
itask.prep_manip()
Expand All @@ -842,7 +783,7 @@ def poll_task_jobs(self, items=None, compat=None):
active_itasks.append(itask)
elif items: # and not active
self.log.warning(
'%s: skip poll, task not active' % itask.identity)
'%s: skip poll, task not pollable' % itask.identity)
self._run_job_cmd(
self.JOBS_POLL, active_itasks, self.poll_task_jobs_callback)
return n_warnings
Expand All @@ -868,7 +809,7 @@ def kill_task_jobs(self, items=None, compat=None):
itasks, n_warnings = self._filter_task_proxies(items, compat)
active_itasks = []
for itask in itasks:
is_active = itask.state.status in TASK_STATUSES_ACTIVE
is_active = itask.state.status in TASK_STATUSES_KILLABLE
if is_active and self.run_mode == 'simulation':
itask.state.reset_state(TASK_STATUS_FAILED)
elif is_active:
Expand All @@ -888,7 +829,7 @@ def kill_task_jobs(self, items=None, compat=None):
active_itasks.append(itask)
elif items: # and not active
self.log.warning(
'%s: skip kill, task not active' % itask.identity)
'%s: skip kill, task not killable' % itask.identity)
self._run_job_cmd(
self.JOBS_KILL, active_itasks, self.kill_task_jobs_callback)
return n_warnings
Expand Down
32 changes: 27 additions & 5 deletions lib/cylc/task_proxy.py
Expand Up @@ -223,8 +223,8 @@ def get_job_log_dir(
def __init__(
self, tdef, start_point, status=TASK_STATUS_WAITING,
has_spawned=False, stop_point=None, is_startup=False,
validate_mode=False, submit_num=0, is_reload=False,
message_queue=None):
validate_mode=False, submit_num=0, is_reload_or_restart=False,
pre_reload_inst=None, message_queue=None):
self.tdef = tdef
if submit_num is None:
self.submit_num = 0
Expand Down Expand Up @@ -328,7 +328,8 @@ def __init__(

# An initial db state entry is created at task proxy init. On reloading
# or restarting the suite, the task proxies already have this db entry.
if not self.validate_mode and not is_reload and self.submit_num == 0:
if (not self.validate_mode and not is_reload_or_restart and
self.submit_num == 0):
self.db_inserts_map[self.TABLE_TASK_STATES].append({
"time_created": get_current_time_string(),
"time_updated": get_current_time_string(),
Expand All @@ -340,7 +341,6 @@ def __init__(
"time_updated": get_current_time_string(),
"status": status})

self.reconfigure_me = False
self.event_hooks = None
self.sim_mode_run_length = None
self.set_from_rtconfig()
Expand Down Expand Up @@ -368,6 +368,29 @@ def __init__(
self.cleanup_cutoff < p_next):
self.cleanup_cutoff = p_next

if is_reload_or_restart and pre_reload_inst is not None:
self.log(INFO, 'reloaded task definition')
if pre_reload_inst.state.status in TASK_STATUSES_ACTIVE:
self.log(WARNING, "job is active with pre-reload settings")
# Retain some state from my pre suite-reload predecessor.
self.has_spawned = pre_reload_inst.has_spawned
self.summary = pre_reload_inst.summary
self.started_time = pre_reload_inst.started_time
self.submitted_time = pre_reload_inst.submitted_time
self.finished_time = pre_reload_inst.finished_time
self.run_try_state = pre_reload_inst.run_try_state
self.sub_try_state = pre_reload_inst.sub_try_state
self.submit_num = pre_reload_inst.submit_num
self.db_inserts_map = pre_reload_inst.db_inserts_map
self.db_updates_map = pre_reload_inst.db_updates_map
# Retain status of outputs.
for msg, oid in pre_reload_inst.state.outputs.completed.items():
self.state.outputs.completed[msg] = oid
try:
del self.state.outputs.not_completed[msg]
except:
pass

def _get_events_conf(self, key, default=None):
"""Return an events setting from suite then global configuration."""
for getter in (
Expand Down Expand Up @@ -821,7 +844,6 @@ def job_submission_failed(self):
msg = "submission failed, %s (after %s)" % (delay_msg, timeout_str)
self.log(INFO, "job(%02d) " % self.submit_num + msg)
self.summary['latest_message'] = msg
self.summary['waiting for reload'] = self.reconfigure_me
self.db_events_insert(
event="submission failed", message=delay_msg)
# TODO - is this insert redundant with setup_event_handlers?
Expand Down
2 changes: 1 addition & 1 deletion tests/reload/16-waiting/suite.rc
Expand Up @@ -26,7 +26,7 @@ done
[[reloader]]
script = """
cylc reload "${CYLC_SUITE_NAME}"
while ! grep -q 'RELOADING TASK DEFINITION FOR waiter\.1' \
while ! grep -q '\[waiter\.1\] -reloaded task definition' \
"${CYLC_SUITE_LOG_DIR}/log"
do
sleep 1
Expand Down