Skip to content

Commit

Permalink
scheduler, task_pool, task_proxy refactor
Browse files Browse the repository at this point in the history
Decouple logging and database update between task_proxy and task_state.
  • Loading branch information
matthewrmshin committed Mar 9, 2017
1 parent 2e893c4 commit b62e471
Show file tree
Hide file tree
Showing 10 changed files with 273 additions and 243 deletions.
10 changes: 5 additions & 5 deletions bin/cylc-monitor
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ from cylc.network.suite_state_client import (
from cylc.wallclock import get_time_string_from_unix_time
from cylc.cfgspec.globalcfg import GLOBAL_CFG
from cylc.task_state import (
TaskState, TASK_STATUS_RUNAHEAD, TASK_STATUSES_ORDERED,
TASK_STATUS_RUNAHEAD, TASK_STATUSES_ORDERED,
TASK_STATUSES_RESTRICTED)
from cylc.task_state_prop import get_status_prop


class SuiteMonitor(object):
Expand Down Expand Up @@ -101,7 +102,7 @@ A terminal-based live suite monitor. Exit with 'Ctrl-C'.""",

legend = ''
for state in TASK_STATUSES_ORDERED:
legend += TaskState.get_status_prop(state, 'ascii_ctrl')
legend += get_status_prop(state, 'ascii_ctrl')
legend = legend.rstrip()

len_header = sum(len(s) for s in TASK_STATUSES_ORDERED)
Expand Down Expand Up @@ -158,7 +159,7 @@ A terminal-based live suite monitor. Exit with 'Ctrl-C'.""",
name_list.add(name)
if point_string not in task_info:
task_info[point_string] = {}
task_info[point_string][name] = TaskState.get_status_prop(
task_info[point_string][name] = get_status_prop(
state, 'ascii_ctrl', subst=name)

# Sort the tasks in each cycle point.
Expand Down Expand Up @@ -204,8 +205,7 @@ A terminal-based live suite monitor. Exit with 'Ctrl-C'.""",
state_totals[state] += 1
for state, tot in state_totals.items():
subst = " %d " % tot
summary += TaskState.get_status_prop(state,
'ascii_ctrl', subst)
summary += get_status_prop(state, 'ascii_ctrl', subst)
blit.append(summary)

# Print a divider line containing the suite status string.
Expand Down
9 changes: 5 additions & 4 deletions bin/cylc-scan
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ from cylc.network.port_scan import scan_all
from cylc.option_parsers import CylcOptionParser as COP
from cylc.cfgspec.globalcfg import GLOBAL_CFG
from cylc.owner import USER
from cylc.task_state import TaskState, TASK_STATUSES_ORDERED
from cylc.task_state import TASK_STATUSES_ORDERED
from cylc.task_state_prop import get_status_prop


NO_BOLD = False
Expand Down Expand Up @@ -201,7 +202,7 @@ def main():
if options.color:
n_states = len(TASK_STATUSES_ORDERED)
for index, state in enumerate(TASK_STATUSES_ORDERED):
state_legend += TaskState.get_status_prop(state, 'ascii_ctrl')
state_legend += get_status_prop(state, 'ascii_ctrl')
if index == n_states / 2:
state_legend += "\n"
state_legend = state_legend.rstrip()
Expand Down Expand Up @@ -305,7 +306,7 @@ def get_point_state_count_lines(state_count_totals, state_count_cycles,
for state, tot in sorted(state_count_totals.items()):
if use_color:
subst = " %d " % tot
line += TaskState.get_status_prop(state, 'ascii_ctrl', subst)
line += get_status_prop(state, 'ascii_ctrl', subst)
else:
line += '%s:%d ' % (state, tot)
yield ("", line.strip())
Expand All @@ -315,7 +316,7 @@ def get_point_state_count_lines(state_count_totals, state_count_cycles,
for state, tot in sorted(state_count_cycles[point_string].items()):
if use_color:
subst = " %d " % tot
line += TaskState.get_status_prop(state, 'ascii_ctrl', subst)
line += get_status_prop(state, 'ascii_ctrl', subst)
else:
line += '%s:%d ' % (state, tot)
yield (point_string, line.strip())
Expand Down
6 changes: 3 additions & 3 deletions lib/cylc/gui/app_gcylc.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,12 @@
from cylc.wallclock import (
get_current_time_string, get_time_string_from_unix_time)
from cylc.task_state import (
TaskState, TASK_STATUSES_ALL, TASK_STATUSES_RESTRICTED,
TASK_STATUSES_ALL, TASK_STATUSES_RESTRICTED,
TASK_STATUSES_WITH_JOB_SCRIPT, TASK_STATUSES_WITH_JOB_LOGS,
TASK_STATUSES_TRIGGERABLE, TASK_STATUSES_ACTIVE,
TASK_STATUS_WAITING, TASK_STATUS_HELD, TASK_STATUS_READY,
TASK_STATUS_RUNNING, TASK_STATUS_SUCCEEDED, TASK_STATUS_FAILED)
from cylc.task_state import get_status_prop


def run_get_stdout(command, filter=False):
Expand Down Expand Up @@ -3130,8 +3131,7 @@ def create_task_filter_widgets(self):
pass
else:
icon = dotm.get_image(st)
cb = gtk.CheckButton(
TaskState.get_status_prop(st, 'gtk_label'))
cb = gtk.CheckButton(get_status_prop(st, 'gtk_label'))
cb.set_active(st not in self.filter_states_excl)
cb.connect('toggled', self.check_task_filter_buttons)
tooltip = gtk.Tooltips()
Expand Down
60 changes: 38 additions & 22 deletions lib/cylc/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ def __init__(self, is_restart, options, args):
self.contact_data = None

self.do_process_tasks = False
self.do_update_state_summary = True

# initialize some items in case of early shutdown
# (required in the shutdown() method)
Expand Down Expand Up @@ -621,6 +620,23 @@ def _load_task_action_timers(self, row_idx, row):
return
OUT.info("+ %s.%s %s" % (name, cycle, ctx_key))

def process_queued_task_messages(self):
"""Handle incoming task messages for each task proxy."""
queue = self.message_queue.get_queue()
task_id_messages = {}
while queue.qsize():
try:
task_id, priority, message = queue.get(block=False)
except Queue.Empty:
break
queue.task_done()
task_id_messages.setdefault(task_id, [])
task_id_messages[task_id].append((priority, message))
for itask in self.pool.get_tasks():
if itask.identity in task_id_messages:
for priority, message in task_id_messages[itask.identity]:
itask.process_incoming_message(priority, message)

def process_command_queue(self):
"""Process queued commands."""
queue = self.command_queue.get_queue()
Expand Down Expand Up @@ -661,7 +677,7 @@ def process_command_queue(self):
(n_warnings, cmdstr))
else:
self.log.info('Command succeeded: ' + cmdstr)
self.do_update_state_summary = True
cylc.flags.iflag = True
if name in self.PROC_CMDS:
self.do_process_tasks = True
queue.task_done()
Expand Down Expand Up @@ -900,7 +916,7 @@ def command_reload_suite(self):
self.final_point,
self.pool.is_held,
self.config.cfg['cylc']['cycle point format'])
self.do_update_state_summary = True
cylc.flags.iflag = True

def command_set_runahead(self, interval=None):
"""Set runahead limit."""
Expand Down Expand Up @@ -1230,6 +1246,7 @@ def run(self):
self.profiler.log_memory("scheduler.py: begin run while loop")

time_next_fs_check = None
cylc.flags.iflag = True

if self.options.profile_mode:
previous_profile_point = 0
Expand All @@ -1245,11 +1262,11 @@ def run(self):
if self.pool.do_reload:
self.pool.reload_taskdefs()
self.suite_db_mgr.checkpoint("reload-done")
self.do_update_state_summary = True
cylc.flags.iflag = True

self.process_command_queue()
if self.pool.release_runahead_tasks():
self.do_update_state_summary = True
cylc.flags.iflag = True
self.proc_pool.handle_results_async()

# External triggers must be matched now. If any are matched pflag
Expand All @@ -1267,7 +1284,7 @@ def run(self):
if self.stop_mode is None:
itasks = self.pool.get_ready_tasks()
if itasks:
self.do_update_state_summary = True
cylc.flags.iflag = True
if self.config.cfg['cylc']['log resolved dependencies']:
for itask in itasks:
if not itask.local_job_file_path:
Expand All @@ -1283,7 +1300,7 @@ def run(self):
self.pool.remove_spent_tasks,
self.pool.remove_suiciding_tasks]:
if meth():
self.do_update_state_summary = True
cylc.flags.iflag = True

BroadcastServer.get_inst().expire(self.pool.get_min_point())

Expand All @@ -1292,19 +1309,20 @@ def run(self):
"END TASK PROCESSING (took %s seconds)" %
(time() - time0))

self.pool.process_queued_task_messages(self.message_queue)
self.process_queued_task_messages()
self.process_command_queue()
self.task_events_mgr.event_timers_from_tasks(self.pool.get_tasks())
self.task_events_mgr.events_from_tasks(self.pool)
self.task_events_mgr.process_events(self)
has_changes = cylc.flags.iflag or self.do_update_state_summary
if has_changes:
self.suite_db_mgr.put_task_pool(
self.pool.get_all_tasks(),
self.task_events_mgr.event_timers)
self.update_state_summary()
self.suite_db_mgr.put_task_event_timers(self.task_events_mgr)
has_changes = cylc.flags.iflag
if cylc.flags.iflag:
self.suite_db_mgr.put_task_pool(self.pool)
self.update_state_summary() # Will reset cylc.flags.iflag
try:
self.suite_db_mgr.process_queued_ops(self.pool.get_all_tasks())
self.suite_db_mgr.process_queued_ops()
except OSError as err:
if cylc.flags.debug:
ERR.debug(traceback.format_exc())
raise SchedulerError(str(err))
# If public database is stuck, blast it away by copying the content
# of the private database into it.
Expand Down Expand Up @@ -1430,7 +1448,6 @@ def update_state_summary(self):
self.will_stop_at(), self.config.ns_defn_order,
self.pool.do_reload)
cylc.flags.iflag = False
self.do_update_state_summary = False
self.is_stalled = False
if self.suite_timer_active:
self.suite_timer_active = False
Expand Down Expand Up @@ -1544,11 +1561,10 @@ def shutdown(self, reason=None):
if self.pool is not None:
self.pool.warn_stop_orphans()
try:
itasks = self.pool.get_all_tasks()
self.task_events_mgr.event_timers_from_tasks(itasks)
self.suite_db_mgr.put_task_pool(
itasks, self.task_events_mgr.event_timers)
self.suite_db_mgr.process_queued_ops(itasks)
self.task_events_mgr.events_from_tasks(self.pool)
self.suite_db_mgr.put_task_event_timers(self.task_events_mgr)
self.suite_db_mgr.put_task_pool(self.pool)
self.suite_db_mgr.process_queued_ops()
except Exception as exc:
ERR.error(str(exc))

Expand Down

0 comments on commit b62e471

Please sign in to comment.