Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/conductor/client/http/api/workflow_bulk_resource_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ def terminate(self, body, **kwargs): # noqa: E501
:param async_req bool
:param list[str] body: (required)
:param str reason:
:param bool trigger_failure_workflow:
:return: BulkResponse
If the method is called asynchronously,
returns the request thread.
Expand All @@ -445,12 +446,13 @@ def terminate_with_http_info(self, body, **kwargs): # noqa: E501
:param async_req bool
:param list[str] body: (required)
:param str reason:
:param bool trigger_failure_workflow:
:return: BulkResponse
If the method is called asynchronously,
returns the request thread.
"""

all_params = ['body', 'reason'] # noqa: E501
all_params = ['body', 'reason', 'triggerFailureWorkflow'] # noqa: E501
all_params.append('async_req')
all_params.append('_return_http_data_only')
all_params.append('_preload_content')
Expand Down Expand Up @@ -478,6 +480,9 @@ def terminate_with_http_info(self, body, **kwargs): # noqa: E501
if 'reason' in params:
query_params.append(('reason', params['reason'])) # noqa: E501

if 'triggerFailureWorkflow' in params:
query_params.append(('triggerFailureWorkflow', params['triggerFailureWorkflow'])) # noqa: E501

header_params = {}

form_params = []
Expand Down
7 changes: 6 additions & 1 deletion src/conductor/client/http/api/workflow_resource_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2390,6 +2390,7 @@ def terminate1(self, workflow_id, **kwargs): # noqa: E501
:param async_req bool
:param str workflow_id: (required)
:param str reason:
:param bool trigger_failure_workflow:
:return: None
If the method is called asynchronously,
returns the request thread.
Expand All @@ -2412,12 +2413,13 @@ def terminate1_with_http_info(self, workflow_id, **kwargs): # noqa: E501
:param async_req bool
:param str workflow_id: (required)
:param str reason:
:param bool trigger_failure_workflow:
:return: None
If the method is called asynchronously,
returns the request thread.
"""

all_params = ['workflow_id', 'reason'] # noqa: E501
all_params = ['workflow_id', 'reason', 'triggerFailureWorkflow'] # noqa: E501
all_params.append('async_req')
all_params.append('_return_http_data_only')
all_params.append('_preload_content')
Expand Down Expand Up @@ -2447,6 +2449,9 @@ def terminate1_with_http_info(self, workflow_id, **kwargs): # noqa: E501
if 'reason' in params:
query_params.append(('reason', params['reason'])) # noqa: E501

if 'triggerFailureWorkflow' in params:
query_params.append(('triggerFailureWorkflow', params['triggerFailureWorkflow'])) # noqa: E501

header_params = {}

form_params = []
Expand Down
4 changes: 3 additions & 1 deletion src/conductor/client/workflow/executor/workflow_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,13 @@ def resume(self, workflow_id: str) -> None:
workflow_id=workflow_id
)

def terminate(self, workflow_id: str, reason: str = None) -> None:
def terminate(self, workflow_id: str, reason: str = None, trigger_failure_workflow: bool = None) -> None:
"""Terminate workflow execution"""
kwargs = {}
if reason is not None:
kwargs['reason'] = reason
if trigger_failure_workflow is not None:
kwargs['triggerFailureWorkflow'] = trigger_failure_workflow
return self.workflow_client.terminate1(
workflow_id=workflow_id,
**kwargs
Expand Down
20 changes: 18 additions & 2 deletions tests/integration/workflow/test_workflow_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def test_workflow_methods(
version=1234,
).add(
task
)
).failure_workflow(workflow_name)
workflow_executor.register_workflow(
workflow.to_workflow_def(),
overwrite=True,
Expand All @@ -142,11 +142,15 @@ def test_workflow_methods(
_restart_workflow(workflow_executor, workflow_id)
_terminate_workflow(workflow_executor, workflow_id)
_retry_workflow(workflow_executor, workflow_id)
_terminate_workflow(workflow_executor, workflow_id)
failure_wf_id = _terminate_workflow_with_failure(workflow_executor, workflow_id, True)
_terminate_workflow(workflow_executor, failure_wf_id)
_rerun_workflow(workflow_executor, workflow_id)
workflow_executor.remove_workflow(
workflow_id, archive_workflow=False
)
workflow_executor.remove_workflow(
failure_wf_id, archive_workflow=False
)


def test_workflow_registration(workflow_executor: WorkflowExecutor):
Expand Down Expand Up @@ -266,6 +270,18 @@ def _terminate_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -
f'workflow expected to be TERMINATED, but received {workflow_status.status}, workflow_id: {workflow_id}'
)

def _terminate_workflow_with_failure(workflow_executor: WorkflowExecutor, workflow_id: str, trigger_failure_workflow: bool) -> str:
workflow_executor.terminate(workflow_id, 'test', trigger_failure_workflow)
workflow_status = workflow_executor.get_workflow_status(
workflow_id,
include_output=True,
include_variables=False,
)
if workflow_status.status != 'TERMINATED':
raise Exception(
f'workflow expected to be TERMINATED, but received {workflow_status.status}, workflow_id: {workflow_id}'
)
return workflow_status.output.get('conductor.failure_workflow')

def _restart_workflow(workflow_executor: WorkflowExecutor, workflow_id: str) -> None:
workflow_executor.restart(workflow_id)
Expand Down