From fab841e29e39602e3576705c619f3628fb9e0a72 Mon Sep 17 00:00:00 2001 From: Nathan Hammond Date: Fri, 27 Apr 2018 18:49:09 -0700 Subject: [PATCH] fix retries (#513) --- .../api/models/task_attempts.py | 24 ++++++++++++------- server/loomengine_server/api/models/tasks.py | 8 ++++--- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/server/loomengine_server/api/models/task_attempts.py b/server/loomengine_server/api/models/task_attempts.py index f07faf66..013c54d9 100644 --- a/server/loomengine_server/api/models/task_attempts.py +++ b/server/loomengine_server/api/models/task_attempts.py @@ -103,6 +103,11 @@ def has_terminal_status(self): or self.status_is_failed \ or self.status_is_killed + def might_succeed(self): + return self.status_is_initializing \ + or self.status_is_finished \ + or self.status_is_running + def finish(self): if self.has_terminal_status(): return @@ -476,10 +481,16 @@ def _run_execute_task_attempt_playbook(task_attempt): env.update(new_vars) - p = subprocess.Popen(cmd_list, - env=env, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) + try: + p = subprocess.Popen(cmd_list, + env=env, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + except Exception as e: + logger.error(str(e)) + task_attempt.system_error(detail=str(e)) + return + terminal_output = '' for line in iter(p.stdout.readline, ''): terminal_output += line @@ -491,7 +502,4 @@ def _run_execute_task_attempt_playbook(task_attempt): % (task_attempt.uuid, p.returncode)) msg = "Failed to launch worker process for TaskAttempt %s" \ % task_attempt.uuid - task_attempt.add_event(msg, - detail=terminal_output, - is_error=True) - task_attempt.fail(detail="Failed to launch worker process") + task_attempt.system_error(detail=terminal_output) diff --git a/server/loomengine_server/api/models/tasks.py b/server/loomengine_server/api/models/tasks.py index bf56a1dd..02160b74 100644 --- a/server/loomengine_server/api/models/tasks.py +++ b/server/loomengine_server/api/models/tasks.py @@ -104,6 +104,8 @@ def status(self): return 'Unknown' def is_responsive(self): + if self.task_attempt and not self.task_attempt.might_succeed(): + return False heartbeat = int(get_setting('TASKRUNNER_HEARTBEAT_INTERVAL_SECONDS')) timeout = int(get_setting('TASKRUNNER_HEARTBEAT_TIMEOUT_SECONDS')) try: @@ -463,9 +465,9 @@ def _execute_task(task_uuid, delay=0, force_rerun=False): if not force_rerun: # By skipping this, a new TaskAttempt will always be created. # Use existing TaskAttempt if a valid one exists with the same fingerprint - if fingerprint.active_task_attempt: - task_attempt = fingerprint.active_task_attempt - task.activate_task_attempt(task_attempt) + if fingerprint.active_task_attempt \ + and fingerprint.active_task_attempt.might_succeed(): + task.activate_task_attempt(fingerprint.active_task_attempt) return task_attempt = task.create_and_activate_task_attempt()