From c14c52366266167aa4353a2b4c31de7ef5c9f01f Mon Sep 17 00:00:00 2001 From: Riccardo Murri Date: Tue, 8 May 2018 17:55:30 +0200 Subject: [PATCH] Ensure `Engine` obeys the "submit" and "in flight" limits. --- gc3libs/core.py | 116 ++++++++++------------------------- gc3libs/tests/test_engine.py | 33 ++++++++++ 2 files changed, 67 insertions(+), 82 deletions(-) diff --git a/gc3libs/core.py b/gc3libs/core.py index 278c72d5..e347b51e 100755 --- a/gc3libs/core.py +++ b/gc3libs/core.py @@ -1112,6 +1112,7 @@ def first_come_first_serve(task_queue, resources, matchmaker=MatchMaker()): gc3libs.log.debug( "Scheduler ignored error in submitting task '%s': %s: %s", task, err.__class__.__name__, err, exc_info=True) + continue else: # unsuccessful submission, push task back into queue task_queue.put(task) @@ -1257,7 +1258,14 @@ def __init__(self, controller, tasks=[], store=None, # init counters/statistics self._counts = self._Counters(self) - self._counts.init_for(Task) # always gather these + # always gather statistics about `Task` instances, as `Task` + # is the root of the GC3Pie task hierarchy + self._counts.init_for(Task) + # but also count `Application` instances to enforce submission + # and running limits (only real tasks that consume compute + # resources count for those -- "policy" tasks like + # `TaskCollection` should not) + self._counts.init_for(Application) TaskStateChange.connect(self._on_state_change) # Engine fully initialized, add all tasks @@ -1631,9 +1639,6 @@ def progress(self): """ gc3libs.log.debug("Engine.progress(): starting.") - # prepare - currently_submitted = 0 - currently_in_flight = 0 # pylint: disable=redefined-variable-type if self.max_in_flight > 0: limit_in_flight = self.max_in_flight @@ -1683,16 +1688,6 @@ def progress(self): self._managed.requeue(task) - # do book-keeping - state = task.execution.state - if old_state == Run.State.SUBMITTED: - if isinstance(task, Application): - currently_submitted -= 1 - currently_in_flight -= 1 - elif old_state == Run.State.RUNNING: - if isinstance(task, Application): - currently_in_flight -= 1 - # update status of tasks before launching new ones queue = self._managed.to_update if queue: @@ -1754,28 +1749,8 @@ def progress(self): else: self._managed.requeue(task) - if state == Run.State.SUBMITTED: - # only real applications need to be counted - # against the limit; policy tasks are exempt - # (this applies to all similar clauses below) - if isinstance(task, Application): - if old_state != Run.State.SUBMITTED: - currently_submitted += 1 - if old_state not in [ - Run.State.SUBMITTED, - Run.State.RUNNING, - ]: - currently_in_flight += 1 - elif state == Run.State.RUNNING: - if isinstance(task, Application): - if old_state == Run.State.SUBMITTED: - currently_submitted -= 1 - if old_state not in [ - Run.State.SUBMITTED, - Run.State.RUNNING, - ]: - currently_in_flight += 1 - if (self.retrieve_running and task.would_output + if self.retrieve_running: + if (state == Run.State.RUNNING and task.would_output and self.can_retrieve): # try to get output try: @@ -1786,7 +1761,7 @@ def progress(self): # pylint: disable=broad-except except Exception as err: self.__ignore_or_raise( - err, "fecthing output", task, + err, "fetching output", task, # context: # - module 'core', @@ -1800,45 +1775,23 @@ def progress(self): 'RUNNING', 'fetch_output', ) - # elif state == Run.State.NEW: - # # can happen after a `.redo()`, requeue - # assert not isinstance(task, Application) - # elif state in [ - # Run.State.STOPPED, - # Run.State.TERMINATED, - # Run.State.TERMINATING, - # Run.State.UNKNOWN, - # ]: - # # nothing to do, task has already been requeued - # pass - # else: - # # if we got to this point, state has an invalid value - # gc3libs.log.error( - # "Invalid state `%r` returned by task %s.", - # state, task) - # if not gc3libs.error_ignored( - # # context: - # # - module - # 'core', - # # - class - # 'Engine', - # # - method - # 'progress', - # # - actual error class - # 'InternalError', - # # - additional keywords - # 'state', - # 'update', - # ): - # # propagate exception to caller - # raise gc3libs.exceptions.InternalError( - # "Invalid state '{state!r}' returned by task {task}" - # .format(state=state, task=task)) + + # reckon how many tasks are "live"; we are only interested in + # tasks that consume real compute resources (i.e., + # `Application` instances) and exclude "policy" classes (e.g., + # all `TaskCollections`) + app_counts = self.counts(Application) + currently_submitted = app_counts['SUBMITTED'] + currently_in_flight = currently_submitted + app_counts['RUNNING'] # now try to submit NEW tasks if (self.can_submit and currently_submitted < limit_submitted and currently_in_flight < limit_in_flight): + submit_allowance = min( + limit_submitted - currently_submitted, + limit_in_flight - currently_in_flight + ) queue = self._managed.to_submit if queue: gc3libs.log.debug( @@ -1856,21 +1809,24 @@ def progress(self): # ... in sched:` line sched = gc3libs.utils.YieldAtNext(_sched) for task, resource_name in sched: + # enforce Engine limits + if submit_allowance <= 0: + # we need to put back the task in the queue + # here as we won't go call back into scheduler + self._managed.to_submit.put(task) + break resource = self._core.resources[resource_name] try: self._core.submit(task, targets=[resource]) - if self._store and task.changed: - self._store.save(task) # if we get to this point, we know state is # either SUBMITTED or RUNNING + if self._store and task.changed: + self._store.save(task) self._managed.to_update.put(task) if isinstance(task, Application): - currently_submitted += 1 - currently_in_flight += 1 - # do book-keeping - state = task.execution.state + submit_allowance -= 1 # notify scheduler - sched.send(state) + sched.send(task.execution.state) # pylint: disable=broad-except except Exception as err1: # record the error in the task's history @@ -1901,10 +1857,6 @@ def progress(self): 'scheduler', 'submit', ) - # enforce Engine limits - if (currently_submitted >= limit_submitted - or currently_in_flight >= limit_in_flight): - break # finally, retrieve output of finished tasks if self.can_retrieve: diff --git a/gc3libs/tests/test_engine.py b/gc3libs/tests/test_engine.py index a417566b..abfa7d2d 100755 --- a/gc3libs/tests/test_engine.py +++ b/gc3libs/tests/test_engine.py @@ -517,6 +517,39 @@ def test_engine_cannot_find_task_by_id_if_no_store(): engine.find_task_by_id(task_id) +@pytest.mark.parametrize("limit_submitted,limit_in_flight", [ + (2, 10), + (10, 5), +]) +def test_engine_limits(limit_submitted, limit_in_flight, + num_jobs=30, max_iter=100): + """ + Test that `Engine.limit_in_flight` and `Engine.limit_submitted` are honored. + """ + with temporary_engine(max_cores=50) as engine: + # set limits + engine.max_in_flight = 10 + engine.max_submitted = 2 + # populate with test apps + apps = [] + for n in range(num_jobs): + name = 'app{nr}'.format(nr=n) + app = SuccessfulApp(name) + engine.add(app) + apps.append(app) + stats = engine.counts() + iter = 0 + while stats['TERMINATED'] < num_jobs and iter < max_iter: + iter += 1 + engine.progress() + stats = engine.counts() + submitted = stats['SUBMITTED'] + assert submitted <= engine.max_submitted + in_flight = (stats['SUBMITTED'] + stats['RUNNING']) + assert in_flight <= engine.max_in_flight + assert stats["TERMINATED"] == num_jobs + + def test_engine_counts(num_jobs=100, max_iter=1000): """ Test that `Engine.count()` returns correct results.