diff --git a/src/conductor/client/automator/task_runner.py b/src/conductor/client/automator/task_runner.py index a7efb57d6..7998efab6 100644 --- a/src/conductor/client/automator/task_runner.py +++ b/src/conductor/client/automator/task_runner.py @@ -168,33 +168,35 @@ def __update_task(self, task_result: TaskResult): task_definition_name=task_definition_name ) ) - try: - response = self.task_client.update_task( - body=task_result - ) - except Exception as e: - if self.metrics_collector is not None: - self.metrics_collector.increment_task_update_error( - task_definition_name, type(e) + for attempt in range(4): + if attempt > 0: + # Wait for [10s, 20s, 30s] before next attempt + time.sleep(attempt * 10) + try: + response = self.task_client.update_task(body=task_result) + logger.debug( + 'Updated task, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}, response: {response}'.format( + task_id=task_result.task_id, + workflow_instance_id=task_result.workflow_instance_id, + task_definition_name=task_definition_name, + response=response + ) ) - logger.info( - 'Failed to update task, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}, reason: {reason}'.format( - task_id=task_result.task_id, - workflow_instance_id=task_result.workflow_instance_id, - task_definition_name=task_definition_name, - reason=traceback.format_exc() + return response + except Exception as e: + if self.metrics_collector is not None: + self.metrics_collector.increment_task_update_error( + task_definition_name, type(e) + ) + logger.debug( + 'Failed to update task, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}, reason: {reason}'.format( + task_id=task_result.task_id, + workflow_instance_id=task_result.workflow_instance_id, + task_definition_name=task_definition_name, + reason=traceback.format_exc() + ) ) - ) - return None - logger.debug( - 'Updated task, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}, response: {response}'.format( - task_id=task_result.task_id, - workflow_instance_id=task_result.workflow_instance_id, - task_definition_name=task_definition_name, - response=response - ) - ) - return response + return None def __wait_for_polling_interval(self) -> None: polling_interval = self.worker.get_polling_interval_in_seconds() diff --git a/tests/integration/workflow/test_workflow_execution.py b/tests/integration/workflow/test_workflow_execution.py index 947c72c7c..c38689917 100644 --- a/tests/integration/workflow/test_workflow_execution.py +++ b/tests/integration/workflow/test_workflow_execution.py @@ -136,15 +136,16 @@ def test_workflow_methods( *start_workflow_requests ) for workflow_id in workflow_ids: - _pause_workflow(workflow_executor, workflow_id) - _resume_workflow(workflow_executor, workflow_id) - _terminate_workflow(workflow_executor, workflow_id) - _restart_workflow(workflow_executor, workflow_id) - _terminate_workflow(workflow_executor, workflow_id) - _retry_workflow(workflow_executor, workflow_id) - failure_wf_id = _terminate_workflow_with_failure(workflow_executor, workflow_id, True) - _terminate_workflow(workflow_executor, failure_wf_id) - _rerun_workflow(workflow_executor, workflow_id) + __pause_workflow(workflow_executor, workflow_id) + __resume_workflow(workflow_executor, workflow_id) + __terminate_workflow(workflow_executor, workflow_id) + __restart_workflow(workflow_executor, workflow_id) + __terminate_workflow(workflow_executor, workflow_id) + __retry_workflow(workflow_executor, workflow_id) + failure_wf_id = __terminate_workflow_with_failure( + workflow_executor, workflow_id, True) + __terminate_workflow(workflow_executor, failure_wf_id) + __rerun_workflow(workflow_executor, workflow_id) workflow_executor.remove_workflow( workflow_id, archive_workflow=False ) @@ -232,7 +233,17 @@ def generate_worker(execute_function: ExecuteTaskFunction) -> Worker: ) -def _pause_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None: +def __pause_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None: + _run_with_retry_attempt( + __validate_pause_workflow, + { + "workflow_executor": workflow_executor, + "workflow_id": workflow_id, + } + ) + + +def __validate_pause_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None: workflow_executor.pause(workflow_id) workflow_status = workflow_executor.get_workflow_status( workflow_id, @@ -245,7 +256,17 @@ def _pause_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> No ) -def _resume_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None: +def __resume_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None: + _run_with_retry_attempt( + __validate_resume_workflow, + { + "workflow_executor": workflow_executor, + "workflow_id": workflow_id, + } + ) + + +def __validate_resume_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None: workflow_executor.resume(workflow_id) workflow_status = workflow_executor.get_workflow_status( workflow_id, @@ -258,7 +279,17 @@ def _resume_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> N ) -def _terminate_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None: +def __terminate_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None: + _run_with_retry_attempt( + __validate_terminate_workflow, + { + "workflow_executor": workflow_executor, + "workflow_id": workflow_id, + } + ) + + +def __validate_terminate_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None: workflow_executor.terminate(workflow_id) workflow_status = workflow_executor.get_workflow_status( workflow_id, @@ -270,7 +301,8 @@ def _terminate_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) - f'workflow expected to be TERMINATED, but received {workflow_status.status}, workflow_id: {workflow_id}' ) -def _terminate_workflow_with_failure(workflow_executor: WorkflowExecutor, workflow_id: str, trigger_failure_workflow: bool) -> str: + +def __terminate_workflow_with_failure(workflow_executor: WorkflowExecutor, workflow_id: str, trigger_failure_workflow: bool) -> str: workflow_executor.terminate(workflow_id, 'test', trigger_failure_workflow) workflow_status = workflow_executor.get_workflow_status( workflow_id, @@ -283,7 +315,18 @@ def _terminate_workflow_with_failure(workflow_executor: WorkflowExecutor, workfl ) return workflow_status.output.get('conductor.failure_workflow') -def _restart_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None: + +def __restart_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None: + _run_with_retry_attempt( + __validate_restart_workflow, + { + "workflow_executor": workflow_executor, + "workflow_id": workflow_id, + } + ) + + +def __validate_restart_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None: workflow_executor.restart(workflow_id) workflow_status = workflow_executor.get_workflow_status( workflow_id, @@ -296,7 +339,17 @@ def _restart_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> ) -def _retry_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None: +def __retry_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None: + _run_with_retry_attempt( + __validate_retry_workflow, + { + "workflow_executor": workflow_executor, + "workflow_id": workflow_id, + } + ) + + +def __validate_retry_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None: workflow_executor.retry(workflow_id) workflow_status = workflow_executor.get_workflow_status( workflow_id, @@ -309,7 +362,17 @@ def _retry_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> No ) -def _rerun_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None: +def __rerun_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None: + _run_with_retry_attempt( + __validate_rerun_workflow, + { + "workflow_executor": workflow_executor, + "workflow_id": workflow_id, + } + ) + + +def __validate_rerun_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None: workflow_executor.rerun(RerunWorkflowRequest(), workflow_id) workflow_status = workflow_executor.get_workflow_status( workflow_id,