diff --git a/gc3libs/core.py b/gc3libs/core.py index 1c3ef786..8dc94a57 100755 --- a/gc3libs/core.py +++ b/gc3libs/core.py @@ -1546,7 +1546,8 @@ def progress(self): elif state == Run.State.TERMINATED: # task changed state, mark as to remove transitioned.append(index) - self._terminated.append(task) + if not self.forget_terminated: + self._terminated.append(task) else: # if we got to this point, state has an invalid value gc3libs.log.error( @@ -1621,7 +1622,8 @@ def progress(self): elif old_state == Run.State.RUNNING: if isinstance(task, Application): currently_in_flight -= 1 - self._terminated.append(task) + if not self.forget_terminated: + self._terminated.append(task) transitioned.append(index) # pylint: disable=broad-except except Exception as err: @@ -1676,7 +1678,8 @@ def progress(self): # task changed state, mark as to remove transitioned.append(index) elif state == Run.State.TERMINATED: - self._terminated.append(task) + if not self.forget_terminated: + self._terminated.append(task) # task changed state, mark as to remove transitioned.append(index) # pylint: disable=broad-except @@ -1879,9 +1882,11 @@ def progress(self): if self._store and task.changed: self._store.save(task) - # remove tasks for which final output has been retrieved - for index in reversed(transitioned): - del self._terminating[index] + if not self.forget_terminated: + # remove tasks for which final output has been retrieved + # (only if TERMINATED tasks have not been dropped already) + for index in reversed(transitioned): + del self._terminating[index] def redo(self, task, *args, **kwargs): diff --git a/gc3libs/tests/test_engine.py b/gc3libs/tests/test_engine.py index 4cacd244..c78ee18a 100644 --- a/gc3libs/tests/test_engine.py +++ b/gc3libs/tests/test_engine.py @@ -69,6 +69,27 @@ def test_engine_progress(num_jobs=1, transition_graph=None, max_iter=100): current_iter += 1 +def test_engine_forget_terminated(num_jobs=3, transition_graph=None, max_iter=100): + with temporary_engine() as engine: + engine.forget_terminated = True + + # generate some no-op tasks + for n in range(num_jobs): + name = 'app{nr}'.format(nr=n+1) + engine.add(SuccessfulApp(name)) + + # run them all + current_iter = 0 + done = engine.stats()[Run.State.TERMINATED] + while done < num_jobs and current_iter < max_iter: + engine.progress() + done = engine.stats()[Run.State.TERMINATED] + current_iter += 1 + + # check that they have been forgotten + assert not engine._terminated + + def test_engine_progress_collection(): with temporary_engine() as engine: seq = SimpleSequentialTaskCollection(3) @@ -81,6 +102,20 @@ def test_engine_progress_collection(): assert seq.stage().execution.state == 'TERMINATED' +def test_engine_progress_collection_and_forget_terminated(): + with temporary_engine() as engine: + engine.forget_terminated = True + + seq = SimpleSequentialTaskCollection(3) + engine.add(seq) + + # run through sequence + while seq.execution.state != 'TERMINATED': + engine.progress() + + assert not engine._terminated + + def test_engine_kill_SequentialTaskCollection(): with temporary_engine() as engine: seq = SimpleSequentialTaskCollection(3) diff --git a/gc3libs/workflow.py b/gc3libs/workflow.py index f499d0da..125f493e 100755 --- a/gc3libs/workflow.py +++ b/gc3libs/workflow.py @@ -151,7 +151,8 @@ def update_state(self, **extra_args): Update the running state of all managed tasks. """ for task in self.tasks: - self._controller.update_job_state(task, **extra_args) + if task.execution.state not in [Run.State.NEW, Run.State.TERMINATED]: + self._controller.update_job_state(task, **extra_args) def kill(self, **extra_args): # XXX: provide default implementation that kills all jobs? @@ -449,7 +450,8 @@ def update_state(self, **extra_args): # update state of current task task = self.tasks[self._current_task] - task.update_state(**extra_args) + if task.execution.state not in [Run.State. NEW, Run.State.TERMINATED]: + task.update_state(**extra_args) gc3libs.log.debug("Task `%s` in state %s", task, task.execution.state) # now set state based on the state of current task: @@ -862,7 +864,8 @@ def update_state(self, **extra_args): for task in self.tasks: # gc3libs.log.debug("Updating state of %s in collection %s ..." # % (task, self)) - task.update_state(**extra_args) + if task.execution.state not in [Run.State.NEW, Run.State.TERMINATED]: + task.update_state(**extra_args) self.execution.state = self._state() if self.execution.state == Run.State.TERMINATED: self.execution.returncode = (0, 0) @@ -1118,7 +1121,8 @@ def update_state(self): TERMINATED and `self.retry()` is `True`. """ own_state_old = self.execution.state - self.task.update_state() + if self.task.execution.state not in [Run.State.NEW, Run.State.TERMINATED]: + self.task.update_state() own_state_new = self._recompute_state() if (self.task.execution.state == Run.State.TERMINATED and own_state_old != Run.State.TERMINATED):