Skip to content

Commit

Permalink
[CDF-21357] add data workflows executions retry endpoint support (#1729)
Browse files Browse the repository at this point in the history
Adds Data Workflow execution retries endpoint support in the SDK. Retrying a terminal execution will retry the last failed task(s), enabling to retry and advance the execution where it left off.

Additionally adds a test for the cancel functionality, and updates the API docs references.
  • Loading branch information
VerstraeteBert authored Apr 22, 2024
1 parent 4b30d3c commit e21dbdb
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 14 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## [7.38.0] - 2024-04-22
### Added
- Support for `workflows.executions.retry`

## [7.37.4] - 2024-04-22
### Improved
- Enabled automatic retries on Data Workflows POST endpoints
Expand Down
54 changes: 42 additions & 12 deletions cognite/client/_api/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class WorkflowTaskAPI(BetaWorkflowAPIClient):
def update(
self, task_id: str, status: Literal["completed", "failed"], output: dict | None = None
) -> WorkflowTaskExecution:
"""`Update status of async task. <https://pr-2282.specs.preview.cogniteapp.com/20230101.json.html#tag/Tasks/operation/UpdateTaskStatus>`_
"""`Update status of async task. <https://api-docs.cognite.com/20230101-beta/tag/Tasks/operation/UpdateTaskStatus>`_
For tasks that has been marked with 'is_async = True', the status must be updated by calling this endpoint with either 'completed' or 'failed'.
Expand Down Expand Up @@ -112,7 +112,7 @@ class WorkflowExecutionAPI(BetaWorkflowAPIClient):
_RESOURCE_PATH = "/workflows/executions"

def retrieve_detailed(self, id: str) -> WorkflowExecutionDetailed | None:
"""`Retrieve a workflow execution with detailed information. <https://pr-2282.specs.preview.cogniteapp.com/20230101.json.html#tag/Workflow-Execution/operation/ExecutionOfSpecificRunOfWorkflow>`_
"""`Retrieve a workflow execution with detailed information. <https://api-docs.cognite.com/20230101-beta/tag/Workflow-executions/operation/ExecutionOfSpecificRunOfWorkflow>`_
Args:
id (str): The server-generated id of the workflow execution.
Expand Down Expand Up @@ -153,7 +153,7 @@ def trigger(
metadata: dict | None = None,
client_credentials: ClientCredentials | None = None,
) -> WorkflowExecution:
"""`Trigger a workflow execution. <https://pr-2282.specs.preview.cogniteapp.com/20230101.json.html#tag/Workflow-Execution/operation/TriggerRunOfSpecificVersionOfWorkflow>`_
"""`Trigger a workflow execution. <https://api-docs.cognite.com/20230101-beta/tag/Workflow-executions/operation/TriggerRunOfSpecificVersionOfWorkflow>`_
Args:
workflow_external_id (str): External id of the workflow.
Expand Down Expand Up @@ -215,7 +215,7 @@ def list(
created_time_end: int | None = None,
limit: int = DEFAULT_LIMIT_READ,
) -> WorkflowExecutionList:
"""`List workflow executions in the project. <https://pr-2282.specs.preview.cogniteapp.com/20230101.json.html#tag/Workflow-Execution/operation/ListWorkflowExecutions>`_
"""`List workflow executions in the project. <https://api-docs.cognite.com/20230101-beta/tag/Workflow-executions/operation/ListWorkflowExecutions>`_
Args:
workflow_version_ids (WorkflowVersionIdentifier | MutableSequence[WorkflowVersionIdentifier] | None): Workflow version id or list of workflow version ids to filter on.
Expand Down Expand Up @@ -263,7 +263,7 @@ def list(
)

def cancel(self, executions: Sequence[CancelExecution]) -> Sequence[WorkflowExecution]:
"""`cancel a workflow execution. <https://pr-2282.specs.preview.cogniteapp.com/20230101.json.html#tag/Workflow-Execution/operation/CancelationOfSpecificRunsOfWorkflow>`_
"""`cancel a workflow execution. <https://api-docs.cognite.com/20230101-beta/tag/Workflow-executions/operation/WorkflowExecutionCancellation>`_
Args:
executions (Sequence[CancelExecution]): List of executions to cancel.
Expand Down Expand Up @@ -293,6 +293,36 @@ def cancel(self, executions: Sequence[CancelExecution]) -> Sequence[WorkflowExec

return [WorkflowExecution._load(execution) for execution in response.json()["items"]]

def retry(self, id: str, client_credentials: ClientCredentials | None = None) -> WorkflowExecution:
"""`Retry a workflow execution. <https://api-docs.cognite.com/20230101-beta/tag/Workflow-executions/operation/WorkflowExecutionRetryn>`_
Args:
id (str): The server-generated id of the workflow execution.
client_credentials (ClientCredentials | None): Specific credentials that should be used to retry the workflow execution. When passed will take precedence over the current credentials.
Returns:
WorkflowExecution: The retried workflow execution.
Examples:
Retry a workflow execution that has been cancelled or failed:
>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import CancelExecution
>>> client = CogniteClient()
>>> res = client.workflows.executions.trigger("foo", "1")
>>> client.workflows.executions.cancel([CancelExecution(res.id, "test cancellation")])
>>> client.workflows.executions.retry(res.id)
"""
self._warning.warn()
nonce = create_session_and_return_nonce(
self._cognite_client, api_name="Workflow API", client_credentials=client_credentials
)
response = self._post(
url_path=f"{self._RESOURCE_PATH}/{id}/retry",
json={"authentication": {"nonce": nonce}},
)
return WorkflowExecution._load(response.json())


class WorkflowVersionAPI(BetaWorkflowAPIClient):
_RESOURCE_PATH = "/workflows/versions"
Expand All @@ -302,7 +332,7 @@ def __init__(self, config: ClientConfig, api_version: str | None, cognite_client
self._DELETE_LIMIT = 100

def upsert(self, version: WorkflowVersionUpsert, mode: Literal["replace"] = "replace") -> WorkflowVersion:
"""`Create a workflow version. <https://pr-2282.specs.preview.cogniteapp.com/20230101.json.html#tag/Workflows/operation/CreateOrUpdateWorkflow>`_
"""`Create a workflow version. <https://api-docs.cognite.com/20230101-beta/tag/Workflow-versions/operation/CreateOrUpdateWorkflowVersion>`_
Note this is an upsert endpoint, so if a workflow with the same version external id already exists, it will be updated.
Expand Down Expand Up @@ -356,7 +386,7 @@ def delete(
workflow_version_id: WorkflowVersionIdentifier | MutableSequence[WorkflowVersionIdentifier],
ignore_unknown_ids: bool = False,
) -> None:
"""`Delete a workflow version(s). <https://pr-2282.specs.preview.cogniteapp.com/20230101.json.html#tag/Workflow-Version/operation/DeleteSpecificVersionsOfWorkflow>`_
"""`Delete a workflow version(s). <https://api-docs.cognite.com/20230101-beta/tag/Workflow-versions/operation/DeleteSpecificVersionsOfWorkflow>`_
Args:
workflow_version_id (WorkflowVersionIdentifier | MutableSequence[WorkflowVersionIdentifier]): Workflow version id or list of workflow version ids to delete.
Expand Down Expand Up @@ -387,7 +417,7 @@ def delete(
)

def retrieve(self, workflow_external_id: str, version: str) -> WorkflowVersion | None:
"""`Retrieve a workflow version. <https://pr-2282.specs.preview.cogniteapp.com/20230101.json.html#tag/Workflow-Version/operation/GetSpecificVersion>`_
"""`Retrieve a workflow version. <https://api-docs.cognite.com/20230101-beta/tag/Workflow-versions/operation/GetSpecificVersion>`_
Args:
workflow_external_id (str): External id of the workflow.
Expand Down Expand Up @@ -421,7 +451,7 @@ def list(
workflow_version_ids: WorkflowIdentifier | MutableSequence[WorkflowIdentifier] | None = None,
limit: int = DEFAULT_LIMIT_READ,
) -> WorkflowVersionList:
"""`List workflow versions in the project <https://pr-2282.specs.preview.cogniteapp.com/20230101.json.html#tag/Workflow-Version/operation/ListWorkflowVersions>`_
"""`List workflow versions in the project <https://api-docs.cognite.com/20230101-beta/tag/Workflow-versions/operation/ListWorkflowVersions>`_
Args:
workflow_version_ids (WorkflowIdentifier | MutableSequence[WorkflowIdentifier] | None): Workflow version id or list of workflow version ids to filter on.
Expand Down Expand Up @@ -483,7 +513,7 @@ def __init__(
self._DELETE_LIMIT = 100

def upsert(self, workflow: WorkflowUpsert, mode: Literal["replace"] = "replace") -> Workflow:
"""`Create a workflow. <https://pr-2282.specs.preview.cogniteapp.com/20230101.json.html#tag/Workflows/operation/CreateOrUpdateWorkflow>`_
"""`Create a workflow. <https://api-docs.cognite.com/20230101-beta/tag/Workflow-versions/operation/CreateOrUpdateWorkflow>`_
Note this is an upsert endpoint, so if a workflow with the same external id already exists, it will be updated.
Expand Down Expand Up @@ -514,7 +544,7 @@ def upsert(self, workflow: WorkflowUpsert, mode: Literal["replace"] = "replace")
return Workflow._load(response.json()["items"][0])

def retrieve(self, external_id: str) -> Workflow | None:
"""`Retrieve a workflow. <https://pr-2282.specs.preview.cogniteapp.com/20230101.json.html#tag/Workflows/operation/CreateOrUpdateWorkflow>`_
"""`Retrieve a workflow. <https://api-docs.cognite.com/20230101-beta/tag/Workflow-versions/operation/CreateOrUpdateWorkflow>`_
Args:
external_id (str): Identifier for a Workflow. Must be unique for the project.
Expand Down Expand Up @@ -562,7 +592,7 @@ def delete(self, external_id: str | SequenceNotStr[str], ignore_unknown_ids: boo
)

def list(self, limit: int = DEFAULT_LIMIT_READ) -> WorkflowList:
"""`List all workflows in the project. <https://pr-2282.specs.preview.cogniteapp.com/20230101.json.html#tag/Workflows/operation/FetchAllWorkflows>`_
"""`List all workflows in the project. <https://api-docs.cognite.com/20230101-beta/tag/Workflow-versions/operation/FetchAllWorkflows>`_
Args:
limit (int): Maximum number of results to return. Defaults to 25. Set to -1, float("inf") or None
Expand Down
2 changes: 1 addition & 1 deletion cognite/client/_version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import annotations

__version__ = "7.37.4"
__version__ = "7.38.0"
__api_subversion__ = "20230101"
4 changes: 4 additions & 0 deletions docs/source/workflow_orchestration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ Cancel Workflow Execution
^^^^^^^^^^^^^^^^^^^^^^^^^^
.. automethod:: cognite.client._api.workflows.WorkflowExecutionAPI.cancel

Retry Workflow Execution
^^^^^^^^^^^^^^^^^^^^^^^^^^
.. automethod:: cognite.client._api.workflows.WorkflowExecutionAPI.retry

Workflow Tasks
------------------
Update Status of Async Task
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[tool.poetry]
name = "cognite-sdk"

version = "7.37.4"
version = "7.38.0"
description = "Cognite Python SDK"
readme = "README.md"
documentation = "https://cognite-sdk-python.readthedocs-hosted.com"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from cognite.client import CogniteClient
from cognite.client.data_classes import Function
from cognite.client.data_classes.workflows import (
CancelExecution,
CDFTaskParameters,
FunctionTaskParameters,
SubworkflowTaskParameters,
Expand Down Expand Up @@ -410,3 +411,23 @@ def test_trigger_retrieve_detailed_update_update_task(

async_task = cognite_client.workflows.tasks.update(async_task.id, "completed")
assert async_task.status == "completed"

@pytest.mark.usefixtures("clean_created_sessions")
def test_trigger_cancel_retry_workflow(
self, cognite_client: CogniteClient, add_multiply_workflow: WorkflowVersion
) -> None:
workflow_execution = cognite_client.workflows.executions.trigger(
add_multiply_workflow.workflow_external_id,
add_multiply_workflow.version,
)

cancelled_workflow_executions = cognite_client.workflows.executions.cancel(
[CancelExecution(id=workflow_execution.id, reason="test")]
)
assert len(cancelled_workflow_executions) == 1
cancelled_workflow_execution = cancelled_workflow_executions[0]
assert cancelled_workflow_execution.status == "terminated"
assert cancelled_workflow_execution.reason_for_incompletion == "test"

retried_workflow_execution = cognite_client.workflows.executions.retry(workflow_execution.id)
assert retried_workflow_execution.status == "running"

0 comments on commit e21dbdb

Please sign in to comment.