Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 27 additions & 25 deletions src/conductor/client/automator/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,33 +168,35 @@ def __update_task(self, task_result: TaskResult):
task_definition_name=task_definition_name
)
)
try:
response = self.task_client.update_task(
body=task_result
)
except Exception as e:
if self.metrics_collector is not None:
self.metrics_collector.increment_task_update_error(
task_definition_name, type(e)
for attempt in range(4):
if attempt > 0:
# Wait for [10s, 20s, 30s] before next attempt
time.sleep(attempt * 10)
try:
response = self.task_client.update_task(body=task_result)
logger.debug(
'Updated task, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}, response: {response}'.format(
task_id=task_result.task_id,
workflow_instance_id=task_result.workflow_instance_id,
task_definition_name=task_definition_name,
response=response
)
)
logger.info(
'Failed to update task, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}, reason: {reason}'.format(
task_id=task_result.task_id,
workflow_instance_id=task_result.workflow_instance_id,
task_definition_name=task_definition_name,
reason=traceback.format_exc()
return response
except Exception as e:
if self.metrics_collector is not None:
self.metrics_collector.increment_task_update_error(
task_definition_name, type(e)
)
logger.debug(
'Failed to update task, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}, reason: {reason}'.format(
task_id=task_result.task_id,
workflow_instance_id=task_result.workflow_instance_id,
task_definition_name=task_definition_name,
reason=traceback.format_exc()
)
)
)
return None
logger.debug(
'Updated task, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}, response: {response}'.format(
task_id=task_result.task_id,
workflow_instance_id=task_result.workflow_instance_id,
task_definition_name=task_definition_name,
response=response
)
)
return response
return None

def __wait_for_polling_interval(self) -> None:
polling_interval = self.worker.get_polling_interval_in_seconds()
Expand Down
95 changes: 79 additions & 16 deletions tests/integration/workflow/test_workflow_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,15 +136,16 @@ def test_workflow_methods(
*start_workflow_requests
)
for workflow_id in workflow_ids:
_pause_workflow(workflow_executor, workflow_id)
_resume_workflow(workflow_executor, workflow_id)
_terminate_workflow(workflow_executor, workflow_id)
_restart_workflow(workflow_executor, workflow_id)
_terminate_workflow(workflow_executor, workflow_id)
_retry_workflow(workflow_executor, workflow_id)
failure_wf_id = _terminate_workflow_with_failure(workflow_executor, workflow_id, True)
_terminate_workflow(workflow_executor, failure_wf_id)
_rerun_workflow(workflow_executor, workflow_id)
__pause_workflow(workflow_executor, workflow_id)
__resume_workflow(workflow_executor, workflow_id)
__terminate_workflow(workflow_executor, workflow_id)
__restart_workflow(workflow_executor, workflow_id)
__terminate_workflow(workflow_executor, workflow_id)
__retry_workflow(workflow_executor, workflow_id)
failure_wf_id = __terminate_workflow_with_failure(
workflow_executor, workflow_id, True)
__terminate_workflow(workflow_executor, failure_wf_id)
__rerun_workflow(workflow_executor, workflow_id)
workflow_executor.remove_workflow(
workflow_id, archive_workflow=False
)
Expand Down Expand Up @@ -232,7 +233,17 @@ def generate_worker(execute_function: ExecuteTaskFunction) -> Worker:
)


def _pause_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None:
def __pause_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None:
_run_with_retry_attempt(
__validate_pause_workflow,
{
"workflow_executor": workflow_executor,
"workflow_id": workflow_id,
}
)


def __validate_pause_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None:
workflow_executor.pause(workflow_id)
workflow_status = workflow_executor.get_workflow_status(
workflow_id,
Expand All @@ -245,7 +256,17 @@ def _pause_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> No
)


def _resume_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None:
def __resume_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None:
_run_with_retry_attempt(
__validate_resume_workflow,
{
"workflow_executor": workflow_executor,
"workflow_id": workflow_id,
}
)


def __validate_resume_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None:
workflow_executor.resume(workflow_id)
workflow_status = workflow_executor.get_workflow_status(
workflow_id,
Expand All @@ -258,7 +279,17 @@ def _resume_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> N
)


def _terminate_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None:
def __terminate_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None:
_run_with_retry_attempt(
__validate_terminate_workflow,
{
"workflow_executor": workflow_executor,
"workflow_id": workflow_id,
}
)


def __validate_terminate_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None:
workflow_executor.terminate(workflow_id)
workflow_status = workflow_executor.get_workflow_status(
workflow_id,
Expand All @@ -270,7 +301,8 @@ def _terminate_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -
f'workflow expected to be TERMINATED, but received {workflow_status.status}, workflow_id: {workflow_id}'
)

def _terminate_workflow_with_failure(workflow_executor: WorkflowExecutor, workflow_id: str, trigger_failure_workflow: bool) -> str:

def __terminate_workflow_with_failure(workflow_executor: WorkflowExecutor, workflow_id: str, trigger_failure_workflow: bool) -> str:
workflow_executor.terminate(workflow_id, 'test', trigger_failure_workflow)
workflow_status = workflow_executor.get_workflow_status(
workflow_id,
Expand All @@ -283,7 +315,18 @@ def _terminate_workflow_with_failure(workflow_executor: WorkflowExecutor, workfl
)
return workflow_status.output.get('conductor.failure_workflow')

def _restart_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None:

def __restart_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None:
_run_with_retry_attempt(
__validate_restart_workflow,
{
"workflow_executor": workflow_executor,
"workflow_id": workflow_id,
}
)


def __validate_restart_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None:
workflow_executor.restart(workflow_id)
workflow_status = workflow_executor.get_workflow_status(
workflow_id,
Expand All @@ -296,7 +339,17 @@ def _restart_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) ->
)


def _retry_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None:
def __retry_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None:
_run_with_retry_attempt(
__validate_retry_workflow,
{
"workflow_executor": workflow_executor,
"workflow_id": workflow_id,
}
)


def __validate_retry_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None:
workflow_executor.retry(workflow_id)
workflow_status = workflow_executor.get_workflow_status(
workflow_id,
Expand All @@ -309,7 +362,17 @@ def _retry_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> No
)


def _rerun_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None:
def __rerun_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None:
_run_with_retry_attempt(
__validate_rerun_workflow,
{
"workflow_executor": workflow_executor,
"workflow_id": workflow_id,
}
)


def __validate_rerun_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None:
workflow_executor.rerun(RerunWorkflowRequest(), workflow_id)
workflow_status = workflow_executor.get_workflow_status(
workflow_id,
Expand Down