From 03ce2e2cf31f9171c5eb904c1ceecf85eb4c2b5d Mon Sep 17 00:00:00 2001 From: Riccardo Murri Date: Sun, 4 Feb 2018 21:11:31 +0100 Subject: [PATCH] Rewrite `Engine.progress()`. Try to make the code more readable and move specific concerns (e.g., counting) into own sub-class. In particular, tasks are now kept in queues so execution order should be predictable. --- gc3libs/__init__.py | 6 - gc3libs/core.py | 1038 +++++++++++++++++++--------------- gc3libs/tests/test_engine.py | 94 +-- 3 files changed, 621 insertions(+), 517 deletions(-) diff --git a/gc3libs/__init__.py b/gc3libs/__init__.py index 0ca0f873..6f915f29 100755 --- a/gc3libs/__init__.py +++ b/gc3libs/__init__.py @@ -1898,12 +1898,6 @@ def fset(self, value): self.history.append( "Transition from state {0} to state {1}" .format(self._state, value)) - # update stats on controller -- we need to do this - # *after* the `.terminated()` method has gotten a - # chance to run and set the final exitcode. - if self._ref is not None and self._ref._attached: - self._ref._controller._update_task_counts(self._ref, self._state, -1) - self._ref._controller._update_task_counts(self._ref, value, +1) # finally, update state self._state = value diff --git a/gc3libs/core.py b/gc3libs/core.py index e1de1d9f..7b110b3b 100755 --- a/gc3libs/core.py +++ b/gc3libs/core.py @@ -19,7 +19,7 @@ # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 2110-1301 USA # -from collections import defaultdict +from collections import defaultdict, deque from fnmatch import fnmatch import functools import itertools @@ -44,7 +44,6 @@ class MatchMaker(object): - """ Select and sort resources for attempting submission of a `Task`. @@ -907,17 +906,8 @@ def remove(self, task): """ pass - def _update_task_counts(self, task, state, increment): - """ - No-op, implemented for compatibility with `Engine`. - - This method is here just to allow `Core` and `Engine` objects - to be used interchangeably. - """ - pass class Scheduler(object): - """ Instances of the `Scheduler` class are used in `Engine.progress`:meth: to determine what tasks (among those in @@ -1071,36 +1061,40 @@ def __exit__(self, *excinfo): @scheduler -def first_come_first_serve(tasks, resources, matchmaker=MatchMaker()): - """First-come first-serve scheduling policy. +def first_come_first_serve(task_queue, resources, matchmaker=MatchMaker()): + """ + First-come first-serve scheduling policy. Tasks are submitted to resources in the order they appear in the - `tasks` list. Each task is submitted to resources according to + `task_queue` queue. Each task is submitted to resources according to the order they are sorted by `Application.rank_resources` (if that method exists). This is the default scheduling policy in GC3Pie's `Engine`:class. - """ assert resources, "No execution resources available!" # make a copy of the `resources` argument, so we can modify it # when e.g. disabling resources that are full resources = list(resources) - for task_idx, task in enumerate(tasks): + total = len(task_queue) + for done in xrange(total): + task = task_queue.get() # keep only compatible resources compatible_resources = matchmaker.filter(task, resources) if not compatible_resources: gc3libs.log.warning( "No compatible resources for task '%s'" " - cannot submit it", task) + task_queue.put(task) continue # sort them according to the Task's preference targets = matchmaker.rank(task, compatible_resources) # now try submission of the task to each resource until one succeeds for target in targets: try: - # result = yield (task_idx, target.name) - yield (task_idx, target.name) + yield (task, target.name) + # submission successful, continue with next task + break except (gc3libs.exceptions.ResourceNotReady, gc3libs.exceptions.MaximumCapacityReached) as exc: # this is not a real error: the resource is adapting @@ -1113,33 +1107,21 @@ def first_come_first_serve(tasks, resources, matchmaker=MatchMaker()): continue # pylint: disable=broad-except except Exception as err: - # note error condition but continue with next resource + # note error condition but continue with next target resource gc3libs.log.debug( "Scheduler ignored error in submitting task '%s': %s: %s", - task, err.__class__.__name__, str(err), exc_info=True) - else: - # submission successful, continue with next task - break + task, err.__class__.__name__, err, exc_info=True) + else: + # unsuccessful submission, push task back into queue + task_queue.put(task) if not resources: gc3libs.log.debug( "No more resources available," " aborting scheduling cycle with %d tasks remaining.", - len(tasks) - task_idx) + total - done) return -# Work around infinite recursion error when trying to compare -# `UserDict` instances which can contain each other. We know -# that two identical tasks are the same object by -# construction, so let's use this to check. -def _contained(elt, lst): - i = id(elt) - for item in lst: - if i == id(item): - return True - return False - - class Engine(object): # pylint: disable=too-many-instance-attributes """ Manage a collection of tasks, until a terminal state is reached. @@ -1255,12 +1237,7 @@ def __init__(self, controller, tasks=[], store=None, """ # internal-use attributes - self._new = [] - self._in_flight = [] - self._stopped = [] - self._terminating = [] - self._terminated = [] - self._to_kill = [] + self._managed = self._TaskQueueManager(self) self._core = controller self._store = store self._tasks_by_id = {} @@ -1278,53 +1255,228 @@ def __init__(self, controller, tasks=[], store=None, self.forget_terminated = forget_terminated # init counters/statistics - self._counts = {} - self.init_counts_for(Task) # always gather these + self._counts = self._Counters(self) + self._counts.init_for(Task) # always gather these # Engine fully initialized, add all tasks for task in tasks: self.add(task) - def _update_task_counts(self, task, state, increment): - """ - Update the counts relative to `task`'s state by `increment`. + class TaskQueue(object): + def __init__(self): + # the actual data structure used for keeping tasks + self._queue = deque() + + def __iter__(self): + return iter(self._queue) + + def __len__(self): + return len(self._queue) + + def add(self, task): + """ + Add task in front of queue. + + Does *not* check if already present. + """ + self._queue.appendleft(task) + + def get(self): + """ + Pop the first task from the front of queue. + """ + task = self._queue.popleft() + return task + + def put(self, task): + """ + Add task to back of queue. + + Does *not* check if already present. + """ + self._queue.append(task) + + def remove(self, task): + """ + Remove the given task from the queue. + """ + # FIXME: this relies on `collections.deque` which supports + # removal of an element by value; Python's `queue.Queue` + # only provides put/get operations and would make this + # difficult to do. We would need to find a different + # solution or data structure before we can generalize the + # Engine to span multiple processes... + self._queue.remove(task) + + + class _TaskQueueManager(object): + + def __init__(self, engine): + # action-based queues + self.to_kill = engine.TaskQueue() + self.to_update = engine.TaskQueue() + self.to_submit = engine.TaskQueue() + self.to_fetch_output = engine.TaskQueue() + self.to_cleanup = engine.TaskQueue() + self.done = engine.TaskQueue() + + # map action names to queues + self._actions = { + 'kill': self.to_kill, + 'update': self.to_update, + 'submit': self.to_submit, + 'fetch_output': self.to_fetch_output, + 'cleanup': self.to_cleanup, + 'done': self.done, + } + + # map task ID to action + self._index = {} + + def __contains__(self, task): + """ + Return ``True`` if `task` is present in some queue. + """ + return id(task) in self._index + + def get_queue(self, task, _override_state=None): + """ + Return the "queue" object to which `task` should be added or removed. + """ + state = _override_state or task.execution.state + if Run.State.NEW == state: + return self.to_submit + elif state in [ + Run.State.RUNNING, + Run.State.STOPPED, + Run.State.SUBMITTED, + Run.State.UNKNOWN, + ]: + return self.to_update + elif Run.State.TERMINATING == state: + return self.to_fetch_output + elif Run.State.TERMINATED == state: + # FIXME: we should never add here when `self.forget_terminated == True` + return self.done + else: + raise AssertionError( + "Unhandled state '%s' in gc3libs.core.Engine." % state) + + def add(self, task, action=None): + """ + Append a task to the back of the appropriate action queue. + If the task is already managed, no change is made. + """ + queue = (self._actions[action] if action else self.get_queue(task)) + queue.put(task) + self._index[id(task)] = queue + + def remove(self, task): + """ + Delete the given task from the queue it is in. + After calling this method, `task in this` will be ``False``. + """ + queue = self._index.pop(id(task)) + queue.remove(task) - The task state is passed as an independent argument, in order - to allow us to decrease counters on the old task state. - """ - for cls in self._counts: - if isinstance(task, cls): - self._counts[cls]['total'] += increment - self._counts[cls][state] += increment - if Run.State.TERMINATED == state: + def requeue(self, task, action=None): + """ + Move an already-managed task to the back of the queue + that is appropriate for its state. + """ + try: + queue = self._index[id(task)] + queue.remove(task) + except (KeyError, ValueError): + pass + self.add(task, action) + + + class _Counters(object): + """ + Keep count of how many tasks of a class are in a given state. + """ + def __init__(self, engine): + self._engine = engine + self.totals = {} + + def init_for(self, cls): + """ + Initialize counters for tasks of class `cls`. + + All statistics are initially computed starting from the + current collection of tasks managed by the parent `Engine` + instance; they will be kept up-to-date during task + addition/removal/progress. + """ + # FIXME: rewrite using `collections.Counter` when we drop + # support for Py 2.6? + counter = self.totals[cls] = defaultdict(int) + for task in self._engine.iter_tasks(cls): + counter['total'] += 1 + state = task.execution.state + counter[state] += 1 + if state == Run.State.TERMINATED: if task.execution.returncode == 0: - self._counts[cls]['ok'] += increment + counter['ok'] += 1 else: - self._counts[cls]['failed'] += increment - - - # pylint: disable=too-many-arguments,dangerous-default-value - def __get_task_queue(self, task, _override_state=None): - """ - Return the "queue" object to which `task` should be added or removed. - """ - state = _override_state or task.execution.state - if Run.State.NEW == state: - return self._new - elif state in [Run.State.SUBMITTED, - Run.State.RUNNING, - Run.State.UNKNOWN]: - return self._in_flight - elif Run.State.STOPPED == state: - return self._stopped - elif Run.State.TERMINATING == state: - return self._terminating - elif Run.State.TERMINATED == state: - return self._terminated - else: - raise AssertionError( - "Unhandled state '%s' in gc3libs.core.Engine." % state) + counter['failed'] += 1 + return counter + + def _update(self, task, state, increment): + """ + Update the counts relative to `task`'s state by `increment`. + + The task state is passed as an independent argument, in order + to allow us to decrease counters on the old task state. + """ + stats_to_update = ['total', state] + if state == 'TERMINATED': + if task.execution.returncode == 0: + stats_to_update.append('ok') + else: + stats_to_update.append('failed') + for cls in self.totals: + if isinstance(task, cls): + for stat in stats_to_update: + self.totals[cls][stat] += increment + + def add(self, task): + """ + Adjust totals to include `task`. + """ + self._update(task, task.execution.state, +1) + + def remove(self, task): + """ + Adjust totals following the removal of `task`. + """ + self._update(task, task.execution.state, -1) + + def transitioned(self, task, from_state, to_state): + """ + Update the counts, following a transition of `task` + from one state to another. The counters relative + to `from_state` are decremented by unit, and correspondingly + the counters for `to_state` are incremented. + + This is functionally equivalent to, but more efficient than:: + + self._update(task, from_state, -1) + self._update(task, to_state, +1) + """ + stats_to_increment = ['total', to_state] + if to_state == 'TERMINATED': + if task.execution.returncode == 0: + stats_to_increment.append('ok') + else: + stats_to_increment.append('failed') + for cls in self.totals: + if isinstance(task, cls): + self.totals[cls][from_state] -= 1 + for stat in stats_to_increment: + self.totals[cls][stat] += 1 def add(self, task): @@ -1333,21 +1485,17 @@ def add(self, task): Adding a task that has already been added to this `Engine` instance results in a no-op. """ - queue = self.__get_task_queue(task) - if _contained(task, queue): - # no-op if the task has already been added - return - # add task to internal data structures - queue.append(task) - if self._store: - try: - self._tasks_by_id[task.persistent_id] = task - except AttributeError: - gc3libs.log.debug( - "Task %s added to Engine %s with no persistent ID!", - task, self) - task.attach(self) - self._update_task_counts(task, task.execution.state, +1) + if task not in self._managed: + self._managed.add(task) + self._counts.add(task) + if self._store: + try: + self._tasks_by_id[task.persistent_id] = task + except AttributeError: + gc3libs.log.debug( + "Task %s added to Engine %s with no persistent ID!", + task, self) + task.attach(self) def remove(self, task, _override_queue=None): @@ -1357,27 +1505,27 @@ def remove(self, task, _override_queue=None): Removing a task that is not managed (i.e., already removed or never added) is a no-op. """ - queue = _override_queue or self.__get_task_queue(task) - if _contained(task, queue): - queue.remove(task) - if self._store: - try: - del self._tasks_by_id[task.persistent_id] - except KeyError: - # already removed - pass - except AttributeError: - gc3libs.log.debug( - "Task %s added to Engine %s with no persistent ID!", - task, self) - task.detach() - self._update_task_counts(task, task.execution.state, -1) + if task not in self._managed: + return + self._managed.remove(task) + self._counts.remove(task) + if self._store: + try: + del self._tasks_by_id[task.persistent_id] + except KeyError: + # already removed + pass + except AttributeError: + gc3libs.log.debug( + "Task %s added to Engine %s with no persistent ID!", + task, self) + task.detach() def find_task_by_id(self, task_id): """ - Return the task with the given persistent ID added to this `Engine` instance. - If no task has that ID, raise a `KeyError`. + Return the task with the given persistent ID added to this + `Engine` instance. If no task has that ID, raise a `KeyError`. """ return self._tasks_by_id[task_id] @@ -1395,12 +1543,12 @@ def iter_tasks(self, only_cls=None): else: select = self.__iter_only return itertools.chain( - select(self._new, only_cls), - select(self._in_flight, only_cls), - select(self._stopped, only_cls), - select(self._to_kill, only_cls), - select(self._terminating, only_cls), - select(self._terminated, only_cls), + select(self._managed.to_submit, only_cls), + select(self._managed.to_update, only_cls), + select(self._managed.to_kill, only_cls), + select(self._managed.to_fetch_output, only_cls), + select(self._managed.to_cleanup, only_cls), + select(self._managed.done, only_cls), ) # helper methods for `iter_tasks`; they are created as @@ -1416,7 +1564,6 @@ def __iter_only(queue, cls): (lambda task: isinstance(task, cls)), iter(queue)) - # FIXME: rewrite using `collections.Counter` when we drop support for Py 2.6 def init_counts_for(self, cls): """ Initialize counters for tasks of class `cls`. @@ -1433,16 +1580,7 @@ def init_counts_for(self, cls): state, or the counts for ``TERMINATED``, ``ok``, and ``failed`` jobs will be incorrectly initialized to 0. """ - counter = self._counts[cls] = defaultdict(int) - for task in self.iter_tasks(cls): - counter['total'] += 1 - state = task.execution.state - counter[state] += 1 - if state == Run.State.TERMINATED: - if task.execution.returncode == 0: - counter['ok'] += 1 - else: - counter['failed'] += 1 + counter = self._counts.init_for(cls) if Run.State.TERMINATED in counter and counter[Run.State.TERMINATED] > 0: warn("The Engine class will forget TERMINATED tasks in the near future." "In order to get correct results, `init_counts_for`" @@ -1467,6 +1605,8 @@ def progress(self): The `max_in_flight` and `max_submitted` limits (if >0) are taken into account when attempting submission of tasks. """ + gc3libs.log.debug("Engine.progress(): starting.") + # prepare currently_submitted = 0 currently_in_flight = 0 @@ -1488,85 +1628,147 @@ def progress(self): raise gc3libs.exceptions.NoResources( "No resources available for running jobs.") - # update status of SUBMITTED/RUNNING tasks before launching - # new ones, otherwise we would be checking the status of - # some tasks twice... - if self._in_flight: - gc3libs.log.debug("Engine %s about to update status of SUBMITTED+RUNNING tasks ...", self) - transitioned = [] - for index, task in enumerate(self._in_flight): + # execute kills and update count of submitted/in-flight tasks + queue = self._managed.to_kill + if queue: + gc3libs.log.debug("Engine %s about to kill jobs ...", self) + for _ in xrange(len(queue)): + task = queue.get() + old_state = task.execution.state + + try: + self._core.kill(task) + if self._store: + self._store.save(task) + # pylint: disable=broad-except + except Exception as err: + self.__ignore_or_raise( + err, "killing", task, + # context: + # - module + 'core', + # - class + 'Engine', + # - method + 'progress', + # - actual error class + err.__class__.__name__, + # - additional keywords + 'kill' + ) + + self._managed.requeue(task) + + state = task.execution.state + if state != old_state: + self._counts.transitioned(task, old_state, state) + + # do book-keeping + if old_state == Run.State.SUBMITTED: + if isinstance(task, Application): + currently_submitted -= 1 + currently_in_flight -= 1 + elif old_state == Run.State.RUNNING: + if isinstance(task, Application): + currently_in_flight -= 1 + + # update status of tasks before launching new ones + queue = self._managed.to_update + if queue: + gc3libs.log.debug( + "Engine %s about to update status of in-flight tasks ...", + self) + for _ in xrange(len(queue)): + task = queue.get() + + # ensure pre-condition on state is met + old_state = task.execution.state + if old_state not in [ + Run.State.RUNNING, + Run.State.STOPPED, + Run.State.SUBMITTED, + Run.State.UNKNOWN, + ]: + # task changed state outside of the Engine, requeue + self._managed.requeue(task) + continue + try: - old_state = task.execution.state self._core.update_job_state(task) if self._store and task.changed: self._store.save(task) - state = task.execution.state - if state == Run.State.SUBMITTED: - # only real applications need to be counted - # against the limit; policy tasks are exempt - # (this applies to all similar clauses below) - if isinstance(task, Application): + except gc3libs.exceptions.ConfigurationError: + # Unrecoverable; no sense in continuing -- pass + # immediately on to client code and let it handle + # this... + raise + # pylint: disable=broad-except + except Exception as err: + self.__ignore_or_raise( + err, "updating state", task, + # context: + # - module + 'core', + # - class + 'Engine', + # - method + 'progress', + # - actual error class + err.__class__.__name__, + # - additional keywords + 'state', + 'update', + ) + + # do book-keeping + state = task.execution.state + + if state != old_state: + self._counts.transitioned(task, old_state, state) + + if state in [ + Run.State.RUNNING, + Run.State.STOPPED, + Run.State.SUBMITTED, + Run.State.UNKNOWN, + ]: + queue.put(task) + else: + self._managed.requeue(task) + + if state == Run.State.SUBMITTED: + # only real applications need to be counted + # against the limit; policy tasks are exempt + # (this applies to all similar clauses below) + if isinstance(task, Application): + if old_state != Run.State.SUBMITTED: currently_submitted += 1 + if old_state not in [ + Run.State.SUBMITTED, + Run.State.RUNNING, + ]: currently_in_flight += 1 - elif state == Run.State.RUNNING: - if isinstance(task, Application): - if old_state == Run.State.SUBMITTED: - currently_submitted -= 1 - # currently_in_flight does not change - else: - currently_in_flight += 1 - if self.can_retrieve and self.retrieve_running: - # try to get output - try: - self._core.fetch_output( - task, - overwrite=self.retrieve_overwrites, - changed_only=self.retrieve_changed_only) - # pylint: disable=broad-except - except Exception as err: - if gc3libs.error_ignored( - # context: - # - module - 'core', - # - class - 'Engine', - # - method - 'progress', - # - actual error class - err.__class__.__name__, - # - additional keywords - 'RUNNING', - 'fetch_output', - ): - gc3libs.log.error( - "Ignored error in fetching output of" - " RUNNING task '%s': %s: %s", - task, err.__class__.__name__, err) - gc3libs.log.debug( - "(Original traceback follows.)", - exc_info=True) - else: - # propagate exceptions for debugging purposes - raise - elif state == Run.State.NEW: - # can happen only with TaskCollections - assert not isinstance(task, Application) - elif state in [ - Run.State.STOPPED, - Run.State.TERMINATED, - Run.State.TERMINATING, - Run.State.UNKNOWN, - ]: - # task changed state, mark as to remove - transitioned.append(index) - queue = self.__get_task_queue(task) - queue.append(task) - else: - # if we got to this point, state has an invalid value - gc3libs.log.error( - "Invalid state `%r` returned by task %s.", - state, task) - if not gc3libs.error_ignored( + elif state == Run.State.RUNNING: + if isinstance(task, Application): + if old_state == Run.State.SUBMITTED: + currently_submitted -= 1 + if old_state not in [ + Run.State.SUBMITTED, + Run.State.RUNNING, + ]: + currently_in_flight += 1 + if (self.retrieve_running and task.would_output + and self.can_retrieve): + # try to get output + try: + self._core.fetch_output( + task, + overwrite=self.retrieve_overwrites, + changed_only=self.retrieve_changed_only) + # pylint: disable=broad-except + except Exception as err: + self.__ignore_or_raise( + err, "fecthing output", task, # context: # - module 'core', @@ -1575,338 +1777,226 @@ def progress(self): # - method 'progress', # - actual error class - 'InternalError', + err.__class__.__name__, # - additional keywords - 'state', - 'update', - ): - # propagate exception to caller - raise gc3libs.exceptions.InternalError( - "Invalid state '{state!r}' returned by task {task}" - .format(state=state, task=task)) - except gc3libs.exceptions.ConfigurationError: - # Unrecoverable; no sense in continuing -- pass - # immediately on to client code and let it handle - # this... - raise - # pylint: disable=broad-except - except Exception as err: - if gc3libs.error_ignored( - # context: - # - module - 'core', - # - class - 'Engine', - # - method - 'progress', - # - actual error class - err.__class__.__name__, - # - additional keywords - 'state', - 'update', - ): - gc3libs.log.error( - "Ignoring error in updating state of task '%s':" - " %s: %s", - task, - err.__class__.__name__, - err, - exc_info=True) - else: - # propagate exception to caller - raise - # remove tasks that transitioned to other states - for index in reversed(transitioned): - del self._in_flight[index] - - # execute kills and update count of submitted/in-flight tasks - if self._to_kill: - gc3libs.log.debug("Engine %s about to kill jobs ...", self) - transitioned = [] - for index, task in enumerate(self._to_kill): - try: - old_state = task.execution.state - self._core.kill(task) - if self._store: - self._store.save(task) - state = task.execution.state - if old_state == Run.State.SUBMITTED: - if isinstance(task, Application): - currently_submitted -= 1 - currently_in_flight -= 1 - elif old_state == Run.State.RUNNING: - if isinstance(task, Application): - currently_in_flight -= 1 - self._terminated.append(task) - transitioned.append(index) - # pylint: disable=broad-except - except Exception as err: - if gc3libs.error_ignored( - # context: - # - module - 'core', - # - class - 'Engine', - # - method - 'progress', - # - actual error class - err.__class__.__name__, - # - additional keywords - 'kill' - ): - gc3libs.log.error( - "Ignored error in killing task '%s': %s: %s", - task, err.__class__.__name__, err) - # print again with traceback info at a higher log level - gc3libs.log.debug( - "(Original traceback follows.)", - exc_info=True) - else: - # propagate exceptions for debugging purposes - raise - # remove tasks that transitioned to other states - for index in reversed(transitioned): - del self._to_kill[index] - - # update state of STOPPED tasks; again need to make before new - # submissions, because it can alter the count of in-flight - # tasks. - if self._stopped: - gc3libs.log.debug("Engine %s about to update state of STOPPED+UNKNOWN jobs ...", self) - transitioned = [] - for index, task in enumerate(self._stopped): - try: - old_state = task.execution.state - self._core.update_job_state(task) - if self._store and task.changed: - self._store.save(task) - state = task.execution.state - if state in [ - Run.State.SUBMITTED, - Run.State.RUNNING, - ]: - if isinstance(task, Application): - currently_in_flight += 1 - if task.execution.state == Run.State.SUBMITTED: - currently_submitted += 1 - self._in_flight.append(task) - # task changed state, mark as to remove - transitioned.append(index) - elif state in [ - Run.State.TERMINATING, - Run.State.TERMINATED, - Run.State.UNKNOWN, - ]: - queue = self.__get_task_queue(task) - queue.append(task) - # task changed state, mark as to remove - transitioned.append(index) - # pylint: disable=broad-except - except Exception as err: - if gc3libs.error_ignored( - # context: - # - module - 'core', - # - class - 'Engine', - # - method - 'progress', - # - actual error class - err.__class__.__name__, - # - additional keywords - 'state', - 'update', - 'STOPPED', - ): - gc3libs.log.error( - "Ignoring error in updating state of" - " STOPPED task '%s': %s: %s", - task, err.__class__.__name__, err, - exc_info=True) - else: - # propagate exception to caller - raise - # remove tasks that transitioned to other states - for index in reversed(transitioned): - del self._stopped[index] + 'RUNNING', + 'fetch_output', + ) + # elif state == Run.State.NEW: + # # can happen after a `.redo()`, requeue + # assert not isinstance(task, Application) + # elif state in [ + # Run.State.STOPPED, + # Run.State.TERMINATED, + # Run.State.TERMINATING, + # Run.State.UNKNOWN, + # ]: + # # nothing to do, task has already been requeued + # pass + # else: + # # if we got to this point, state has an invalid value + # gc3libs.log.error( + # "Invalid state `%r` returned by task %s.", + # state, task) + # if not gc3libs.error_ignored( + # # context: + # # - module + # 'core', + # # - class + # 'Engine', + # # - method + # 'progress', + # # - actual error class + # 'InternalError', + # # - additional keywords + # 'state', + # 'update', + # ): + # # propagate exception to caller + # raise gc3libs.exceptions.InternalError( + # "Invalid state '{state!r}' returned by task {task}" + # .format(state=state, task=task)) # now try to submit NEW tasks - if self._new: - gc3libs.log.debug("Engine %s about to submit NEW tasks ...", self) - transitioned = [] if (self.can_submit and currently_submitted < limit_submitted and currently_in_flight < limit_in_flight): + queue = self._managed.to_submit + if queue: + gc3libs.log.debug( + "Engine %s about to submit NEW tasks ...", self) # update state of all enabled resources, to give a chance to # all to get a new job; for a complete discussion, see: # https://github.com/uzh/gc3pie/issues/485 self._core.update_resources() # now try to submit - with self.scheduler(self._new, + with self.scheduler(queue, self._core.resources.values()) as _sched: # wrap the original generator object so that `send` # and `throw` do not yield a value -- we only get new # stuff from the call to the `next` method in the `for - # ... in schedule` line. + # ... in sched:` line sched = gc3libs.utils.YieldAtNext(_sched) - for task_index, resource_name in sched: - task = self._new[task_index] + for task, resource_name in sched: resource = self._core.resources[resource_name] - # try to submit; go to SUBMITTED if successful, - # FAILED if not try: self._core.submit(task, targets=[resource]) if self._store: self._store.save(task) - # XXX: can remove the following assert when - # we're sure Issue 419 is fixed - assert task_index not in transitioned - self._in_flight.append(task) - transitioned.append(task_index) + # if we get to this point, we know state is + # either SUBMITTED or RUNNING + self._managed.to_update.put(task) if isinstance(task, Application): currently_submitted += 1 currently_in_flight += 1 - # if we get to this point, we know state is not NEW anymore + # do book-keeping state = task.execution.state + self._counts.transitioned(task, 'NEW', state) + # notify scheduler sched.send(task.execution.state) - # pylint: disable=broad-except except Exception as err1: # record the error in the task's history task.execution.history( - "Submission to resource '%s' failed: %s: %s" % - (resource.name, - err1.__class__.__name__, - str(err1))) + "Submission to resource '%s' failed: %s: %s" + % (resource.name, err1.__class__.__name__, err1)) gc3libs.log.error( - "Got error in submitting task '%s', informing" - " scheduler: %s: %s", - task, - err1.__class__.__name__, - str(err1)) + "Got error in submitting task '%s'," + " informing scheduler: %s: %s", + task, err1.__class__.__name__, err1) # inform scheduler and let it handle it try: sched.throw(* sys.exc_info()) # pylint: disable=broad-except except Exception as err2: - if gc3libs.error_ignored( - # context: - # - module - 'core', - # - class - 'Engine', - # - method - 'progress', - # - actual error class - err2.__class__.__name__, - # - additional keywords - 'scheduler', - 'submit', - ): - gc3libs.log.debug( - "Ignored error in submitting task '%s':" - " %s: %s", - task, - err2.__class__.__name__, - err2, - exc_info=True) - else: - # propagate exceptions for debugging purposes - raise + self.__ignore_or_raise( + err2, "submitting task", task, + # context: + # - module + 'core', + # - class + 'Engine', + # - method + 'progress', + # - actual error class + err2.__class__.__name__, + # - additional keywords + 'scheduler', + 'submit', + ) # enforce Engine limits if (currently_submitted >= limit_submitted or currently_in_flight >= limit_in_flight): break - # remove tasks that transitioned to SUBMITTED state - for index in reversed(transitioned): - del self._new[index] # finally, retrieve output of finished tasks if self.can_retrieve: - if self._terminating: - gc3libs.log.debug("Engine %s about to retrieve output of TERMINATING tasks ...", self) - transitioned = [] - for index, task in enumerate(self._terminating): + queue = self._managed.to_fetch_output + if queue: + gc3libs.log.debug( + "Engine %s about to retrieve output of TERMINATING tasks ...", + self) + for _ in xrange(len(queue)): + task = queue.get() # try to get output try: self._core.fetch_output( task, overwrite=self.retrieve_overwrites, changed_only=self.retrieve_changed_only) - except gc3libs.exceptions.UnrecoverableDataStagingError as ex: + except gc3libs.exceptions.UnrecoverableDataStagingError as err: gc3libs.log.error( "Error in fetching output of task '%s'," " will mark it as TERMINATED" " (with error exit code %d): %s: %s", task, posix.EX_IOERR, - ex.__class__.__name__, str(ex), exc_info=True) + err.__class__.__name__, err, exc_info=True) task.execution.returncode = ( Run.Signals.DataStagingFailure, posix.EX_IOERR) task.execution.state = Run.State.TERMINATED task.changed = True # pylint: disable=broad-except - except Exception as ex: - if gc3libs.error_ignored( - # context: - # - module - 'core', - # - class - 'Engine', - # - method - 'progress', - # - actual error class - ex.__class__.__name__, - # - additional keywords - 'fetch_output', - ): - gc3libs.log.debug( - "Ignored error in fetching output of task '%s':" - " %s: %s", - task, - ex.__class__.__name__, - ex) - gc3libs.log.debug( - "(Original traceback follows.)", - exc_info=True) - else: - # propagate exceptions for debugging purposes - raise + except Exception as err: + self.__ignore_or_raise( + err, "fecthing output", task, + # context: + # - module + 'core', + # - class + 'Engine', + # - method + 'progress', + # - actual error class + ex.__class__.__name__, + # - additional keywords + 'fetch_output', + ) - for index, task in enumerate(self._terminating): if task.execution.state == Run.State.TERMINATED: - transitioned.append(index) - try: - self._core.free(task) - # pylint: disable=broad-except - except Exception as err: - gc3libs.log.error( - "Got error freeing up resources used by task '%s': %s: %s." - " (For cloud-based resources, it's possible that the VM" - " has been destroyed already.)", - task, err.__class__.__name__, err) - self._terminated.append(task) - if self._store and task.changed: - self._store.save(task) - # remove tasks for which final output has been retrieved - for index in reversed(transitioned): - del self._terminating[index] + self._counts.transitioned(task, 'TERMINATING', 'TERMINATED') + self._managed.requeue(task, 'cleanup') + else: + assert task.execution.state == 'TERMINATING' + queue.put(task) # retry next time + + # clean up terminated tasks + queue = self._managed.to_cleanup + if queue: + gc3libs.log.debug( + "Engine %s about to clean up TERMINATED tasks ...", self) + for _ in xrange(len(queue)): + task = queue.get() + try: + self._core.free(task) + self._managed.requeue(task, 'done') + # pylint: disable=broad-except + except Exception as err: + queue.put(task) # retry next time + gc3libs.log.error( + "Got error freeing up resources used by task '%s': %s: %s." + " (For cloud-based resources, it's possible that the VM" + " has been destroyed already.)", + task, err.__class__.__name__, err) + + if self._store and task.changed: + self._store.save(task) # now remove all terminated tasks if self.forget_terminated: - for task in self._terminated: - self._drop_terminated_task(task) + queue = self._managed.done + if queue: + gc3libs.log.debug( + "Engine %s now dropping TERMINATED tasks ...", self) + # the whole purpose of this loop is to call + # `self.remove()` on each item in the queue, so we cannot + # use the dequeue/requeue mechanism used in other loops, + # otherwise `self.remove()` fails since the task is not in + # `self._managed.done` ... + for task in list(queue): + try: + self.remove(task) + gc3libs.log.debug( + "Dropped TERMINATED task %s", task) + except Exception as err: # pylint: disable=broad-except + gc3libs.log.debug( + "Could not forget TERMINATED task '%s': %s: %s", + task, err.__class__.__name__, err) - def _drop_terminated_task(self, task): - assert task.execution.state == Run.State.TERMINATED - try: - self.remove(task, self._terminated) + gc3libs.log.debug("Engine.progress(): done.") + + + def __ignore_or_raise(self, err, action, task, *ctx): + if gc3libs.error_ignored(*ctx): gc3libs.log.debug( - "Dropped TERMINATED task %s (was: %s)", task, old_state) - except Exception as err: # pylint: disable=broad-except + "Ignored error in %s of task '%s': %s: %s", + action, task, err.__class__.__name__, err) gc3libs.log.debug( - "Could not forget TERMINATED task '%s': %s: %s", - task, err.__class__.__name__, err) + "(Original traceback follows.)", + exc_info=True) + else: + # propagate exceptions for debugging purposes + raise def redo(self, task, *args, **kwargs): @@ -1967,6 +2057,17 @@ def stats(self, only=None): This is deprecated since GC3Pie version 2.5. """ + result = defaultdict(int) + for task in self.iter_tasks(only): + state = task.execution.state + result[state] += 1 + result['total'] += 1 + if state == 'TERMINATED': + if task.execution.exitcode == 0: + result['ok'] += 1 + else: + result['failed'] += 1 + return result warn("Deprecated method `Engine.stats()` called" " -- please use `Engine.counts()` instead", DeprecationWarning, stacklevel=2) @@ -2002,8 +2103,7 @@ def submit(self, task, resubmit=False, targets=None, **extra_args): if resubmit: # since we are going to change the task's state, we need # to expunge it from the queues ... - queue = self.__get_task_queue(task) - if _contained(task, queue): + if task in self._managed: self.remove(task) task.redo() # ... and then add it again with the (possibly) new state @@ -2034,7 +2134,7 @@ def kill(self, task, **extra_args): """ Schedule a task for killing on the next `progress` run. """ - self._to_kill.append(task) + self._managed.requeue(task, 'kill') def peek(self, task, what='stdout', offset=0, size=None, **extra_args): """ diff --git a/gc3libs/tests/test_engine.py b/gc3libs/tests/test_engine.py index ede1893e..d99c7c8a 100644 --- a/gc3libs/tests/test_engine.py +++ b/gc3libs/tests/test_engine.py @@ -1,7 +1,7 @@ # test_engine.py # -*- coding: utf-8 -*- # -# Copyright (C) 2015 S3IT, Zentrale Informatik, University of Zurich +# Copyright (C) 2015, 2018 S3IT, Zentrale Informatik, University of Zurich # # # This program is free software; you can redistribute it and/or modify it @@ -52,13 +52,16 @@ def test_engine_resources(): assert test_rsc.max_walltime == 8*hours -def test_engine_progress(num_jobs=1, transition_graph=None, max_iter=100): +def test_engine_progress(num_jobs=5, transition_graph=None, max_iter=100): with temporary_engine() as engine: # generate some no-op tasks + tasks = [] for n in range(num_jobs): name = 'app{nr}'.format(nr=n+1) - engine.add(SuccessfulApp(name)) + app = SuccessfulApp(name) + engine.add(app) + tasks.append(app) # run them all current_iter = 0 @@ -68,6 +71,10 @@ def test_engine_progress(num_jobs=1, transition_graph=None, max_iter=100): done = engine.stats()[Run.State.TERMINATED] current_iter += 1 + # check state + for task in tasks: + assert task.execution.state == 'TERMINATED' + def test_engine_forget_terminated(num_jobs=3, transition_graph=None, max_iter=100): with temporary_engine() as engine: @@ -90,8 +97,9 @@ def test_engine_forget_terminated(num_jobs=3, transition_graph=None, max_iter=10 current_iter += 1 # check that they have been forgotten - assert not engine._terminated + assert 0 == len(engine._managed.done) for task in tasks: + assert task.execution.state == 'TERMINATED' assert not task._attached @@ -118,7 +126,7 @@ def test_engine_progress_collection_and_forget_terminated(): while seq.execution.state != 'TERMINATED': engine.progress() - assert not engine._terminated + assert 0 == len(engine._managed.done) assert not seq._attached for task in seq.tasks: assert not task._attached @@ -132,26 +140,25 @@ def test_engine_kill_SequentialTaskCollection(): while seq.execution.state != 'RUNNING': engine.progress() - # Because of our noop engine, as soon as the sequential is in - # running we will have a job in TERMINATED and the others in - # NEW. - assert ( - ['TERMINATED', 'NEW', 'NEW'] == - [i.execution.state for i in seq.tasks]) + # When the sequence is in RUNNING state, so must the first app + assert seq.tasks[0].execution.state == 'RUNNING' + assert seq.tasks[1].execution.state == 'NEW' + assert seq.tasks[2].execution.state == 'NEW' # Killing a sequential should put all the applications in # TERMINATED state. However, we will need an extra run of # engine.progress() to update the status of all the jobs. engine.kill(seq) - assert ( - ['TERMINATED', 'NEW', 'NEW'] == - [i.execution.state for i in seq.tasks]) + assert seq.tasks[0].execution.state == 'RUNNING' + assert seq.tasks[1].execution.state == 'NEW' + assert seq.tasks[2].execution.state == 'NEW' + assert seq.execution.state == 'RUNNING' engine.progress() - assert ( - ['TERMINATED', 'TERMINATED', 'TERMINATED'] == - [i.execution.state for i in seq.tasks]) + assert seq.tasks[0].execution.state == 'TERMINATED' + assert seq.tasks[1].execution.state == 'TERMINATED' + assert seq.tasks[2].execution.state == 'TERMINATED' assert seq.execution.state == 'TERMINATED' @@ -163,37 +170,37 @@ def test_engine_kill_redo_SequentialTaskCollection(): while seq.execution.state != 'RUNNING': engine.progress() - # Because of our noop engine, as soon as the sequential is in - # running we will have a job in TERMINATED and the others in - # NEW. - assert ( - ['TERMINATED', 'NEW', 'NEW'] == - [i.execution.state for i in seq.tasks]) + # When the sequence is in RUNNING state, so must the first app + assert seq.tasks[0].execution.state == 'RUNNING' + assert seq.tasks[1].execution.state == 'NEW' + assert seq.tasks[2].execution.state == 'NEW' # Killing a sequential should put all the applications in # TERMINATED state. However, we will need an extra run of # engine.progress() to update the status of all the jobs. engine.kill(seq) - assert ( - ['TERMINATED', 'NEW', 'NEW'] == - [i.execution.state for i in seq.tasks]) + assert seq.tasks[0].execution.state == 'RUNNING' + assert seq.tasks[1].execution.state == 'NEW' + assert seq.tasks[2].execution.state == 'NEW' + assert seq.execution.state == 'RUNNING' engine.progress() - - assert ( - ['TERMINATED', 'TERMINATED', 'TERMINATED'] == - [i.execution.state for i in seq.tasks]) + assert seq.tasks[0].execution.state == 'TERMINATED' + assert seq.tasks[1].execution.state == 'TERMINATED' + assert seq.tasks[2].execution.state == 'TERMINATED' assert seq.execution.state == 'TERMINATED' engine.redo(seq) - assert ( - ['NEW', 'NEW', 'NEW'] == - [i.execution.state for i in seq.tasks]) + assert seq.tasks[0].execution.state == 'NEW' + assert seq.tasks[1].execution.state == 'NEW' + assert seq.tasks[2].execution.state == 'NEW' assert seq.execution.state == 'NEW' + engine.progress() - assert ( - ['SUBMITTED', 'NEW', 'NEW'] == - [i.execution.state for i in seq.tasks]) + assert seq.tasks[0].execution.state == 'SUBMITTED' + assert seq.tasks[1].execution.state == 'NEW' + assert seq.tasks[2].execution.state == 'NEW' + def test_engine_kill_ParallelTaskCollection(): # Creates an engine with 2 cores. @@ -201,24 +208,27 @@ def test_engine_kill_ParallelTaskCollection(): par = SimpleParallelTaskCollection(3) engine.add(par) - while par.execution.state != 'RUNNING': + for _ in range(20): engine.progress() + if par.execution.state == 'RUNNING': + break # Because of our noop engine, as soon as the parallel is in # running we will have all jobs in SUBMITTED and the others in # NEW. assert ( - ['TERMINATED', 'SUBMITTED', 'NEW'] == + ['SUBMITTED', 'SUBMITTED', 'NEW'] == [i.execution.state for i in par.tasks]) # Killing a parallel should put all the applications in - # TERMINATED state. However, we need a run of + # TERMINATED state. However, we need two runs of # engine.progress() to update the status of all the jobs engine.kill(par) - assert ( - ['TERMINATED', 'SUBMITTED', 'NEW'] == + ['SUBMITTED', 'SUBMITTED', 'NEW'] == [i.execution.state for i in par.tasks]) + + engine.progress() engine.progress() assert ( @@ -400,7 +410,7 @@ def test_engine_submit_to_multiple_resources(num_resources=3, num_jobs=50): for n in range(num_resources) ] num_jobs_per_resource = [ - len([task for task in engine._in_flight + len([task for task in engine._managed.to_update if task.execution.resource_name == rsc.name]) for rsc in rscs ]