Skip to content

Commit

Permalink
Merge pull request #280 from atlanhq/workflows-new-api
Browse files Browse the repository at this point in the history
Added support for scheduled queries to the `WorkflowClient`
  • Loading branch information
Aryamanz29 committed May 25, 2024
2 parents beea7e2 + 720395f commit 3b66bc7
Show file tree
Hide file tree
Showing 5 changed files with 319 additions and 41 deletions.
33 changes: 33 additions & 0 deletions pyatlan/client/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,16 +394,41 @@
INDEX_SEARCH = API(INDEX_API, HTTPMethod.POST, HTTPStatus.OK, endpoint=EndPoint.ATLAS)
WORKFLOW_INDEX_API = "workflows/indexsearch"
WORKFLOW_INDEX_RUN_API = "runs/indexsearch"
SCHEDULE_QUERY_WORKFLOWS_SEARCH_API = "runs/cron/scheduleQueriesBetweenDuration"
SCHEDULE_QUERY_WORKFLOWS_MISSED_API = "runs/cron/missedScheduleQueriesBetweenDuration"

SCHEDULE_QUERY_WORKFLOWS_SEARCH = API(
SCHEDULE_QUERY_WORKFLOWS_SEARCH_API,
HTTPMethod.GET,
HTTPStatus.OK,
endpoint=EndPoint.HERACLES,
)

SCHEDULE_QUERY_WORKFLOWS_MISSED = API(
SCHEDULE_QUERY_WORKFLOWS_MISSED_API,
HTTPMethod.GET,
HTTPStatus.OK,
endpoint=EndPoint.HERACLES,
)

WORKFLOW_INDEX_SEARCH = API(
WORKFLOW_INDEX_API, HTTPMethod.POST, HTTPStatus.OK, endpoint=EndPoint.HERACLES
)
WORKFLOW_INDEX_RUN_SEARCH = API(
WORKFLOW_INDEX_RUN_API, HTTPMethod.POST, HTTPStatus.OK, endpoint=EndPoint.HERACLES
)
# triggers a workflow using the current user's credentials
WORKFLOW_RERUN_API = "workflows/submit"
WORKFLOW_RERUN = API(
WORKFLOW_RERUN_API, HTTPMethod.POST, HTTPStatus.OK, endpoint=EndPoint.HERACLES
)

# triggers a workflow using the workflow owner's credentials
WORKFLOW_OWNER_RERUN_API = "workflows/triggerAsOwner"
WORKFLOW_OWNER_RERUN = API(
WORKFLOW_OWNER_RERUN_API, HTTPMethod.POST, HTTPStatus.OK, endpoint=EndPoint.HERACLES
)

WORKFLOW_RUN_API = "workflows?submit=true"
WORKFLOW_RUN = API(
WORKFLOW_RUN_API, HTTPMethod.POST, HTTPStatus.OK, endpoint=EndPoint.HERACLES
Expand Down Expand Up @@ -440,6 +465,14 @@
HTTPStatus.OK,
endpoint=EndPoint.HERACLES,
)

WORKFLOW_CHANGE_OWNER = API(
WORKFLOW_API + "/{workflow_name}" + "/changeownership",
HTTPMethod.POST,
HTTPStatus.OK,
endpoint=EndPoint.HERACLES,
)

CREDENTIALS_API = "credentials"
TEST_CREDENTIAL_API = CREDENTIALS_API + "/test"
TEST_CREDENTIAL = API(
Expand Down
166 changes: 134 additions & 32 deletions pyatlan/client/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@
from pyatlan.client.constants import (
GET_ALL_SCHEDULE_RUNS,
GET_SCHEDULE_RUN,
SCHEDULE_QUERY_WORKFLOWS_MISSED,
SCHEDULE_QUERY_WORKFLOWS_SEARCH,
STOP_WORKFLOW_RUN,
WORKFLOW_ARCHIVE,
WORKFLOW_CHANGE_OWNER,
WORKFLOW_INDEX_RUN_SEARCH,
WORKFLOW_INDEX_SEARCH,
WORKFLOW_OWNER_RERUN,
WORKFLOW_RERUN,
WORKFLOW_RUN,
WORKFLOW_UPDATE,
Expand All @@ -23,6 +27,7 @@
from pyatlan.model.search import Bool, NestedQuery, Prefix, Query, Term
from pyatlan.model.workflow import (
ReRunRequest,
ScheduleQueriesSearchRequest,
Workflow,
WorkflowResponse,
WorkflowRunResponse,
Expand Down Expand Up @@ -56,14 +61,45 @@ def __init__(self, client: ApiCaller):
@staticmethod
def _parse_response(raw_json, response_type):
try:
if isinstance(raw_json, List):
if not raw_json:
return
elif isinstance(raw_json, list):
return parse_obj_as(List[response_type], raw_json)
return parse_obj_as(response_type, raw_json)
except ValidationError as err:
raise ErrorCode.JSON_ERROR.exception_with_parameters(
raw_json, 200, str(err)
) from err

@validate_arguments
def find_by_type(
self, prefix: WorkflowPackage, max_results: int = 10
) -> List[WorkflowSearchResult]:
"""
Find workflows based on their type (prefix). Note: Only workflows that have been run will be found.
:param prefix: name of the specific workflow to find (for example CONNECTION_DELETE)
:param max_results: the maximum number of results to retrieve
:returns: the list of workflows of the provided type, with the most-recently created first
:raises ValidationError: If the provided prefix is invalid workflow package
:raises AtlanError: on any API communication issue
"""
query = Bool(
filter=[
NestedQuery(
query=Prefix(field="metadata.name.keyword", value=prefix.value),
path="metadata",
)
]
)
request = WorkflowSearchRequest(query=query, size=max_results)
raw_json = self._client._call_api(
WORKFLOW_INDEX_SEARCH,
request_obj=request,
)
response = WorkflowSearchResponse(**raw_json)
return response.hits.hits or []

@validate_arguments
def _find_latest_run(self, workflow_name: str) -> Optional[WorkflowSearchResult]:
"""
Expand Down Expand Up @@ -154,36 +190,6 @@ def _add_schedule(
}
)

@validate_arguments
def find_by_type(
self, prefix: WorkflowPackage, max_results: int = 10
) -> List[WorkflowSearchResult]:
"""
Find workflows based on their type (prefix).
Note: Only workflows that have been run will be found.
:param prefix: name of the specific workflow to find (for example CONNECTION_DELETE)
:param max_results: the maximum number of results to retrieve
:returns: the list of workflows of the provided type, with the most-recently created first
:raises ValidationError: If the provided prefix is invalid workflow package
:raises AtlanError: on any API communication issue
"""
query = Bool(
filter=[
NestedQuery(
query=Prefix(field="metadata.name.keyword", value=prefix.value),
path="metadata",
)
]
)
request = WorkflowSearchRequest(query=query, size=max_results)
raw_json = self._client._call_api(
WORKFLOW_INDEX_SEARCH,
request_obj=request,
)
response = WorkflowSearchResponse(**raw_json)
return response.hits.hits or []

def _handle_workflow_types(self, workflow):
if isinstance(workflow, WorkflowPackage):
if results := self.find_by_type(workflow):
Expand Down Expand Up @@ -302,6 +308,22 @@ def update(self, workflow: Workflow) -> WorkflowResponse:
)
return WorkflowResponse(**raw_json)

@validate_arguments
def update_owner(self, workflow_name: str, username: str) -> WorkflowResponse:
"""
Update the owner of the specified workflow.
:param workflow_name: name of the workflow to update.
:param username: username of the new owner.
:raises AtlanError: on any API communication issue.
:returns: updated workflow.
"""
raw_json = self._client._call_api(
WORKFLOW_CHANGE_OWNER.format_path({"workflow_name": workflow_name}),
query_params={"username": username},
)
return WorkflowResponse(**raw_json)

@validate_arguments(config=dict(arbitrary_types_allowed=True))
def monitor(
self, workflow_response: WorkflowResponse, logger: Optional[Logger] = None
Expand Down Expand Up @@ -339,7 +361,6 @@ def monitor(
def _get_run_details(self, name: str) -> Optional[WorkflowSearchResult]:
return self._find_latest_run(workflow_name=name)

@validate_arguments
def get_runs(
self,
workflow_name: str,
Expand Down Expand Up @@ -528,3 +549,84 @@ def get_scheduled_run(self, workflow_name: str) -> WorkflowScheduleResponse:
GET_SCHEDULE_RUN.format_path({"workflow_name": f"{workflow_name}-cron"}),
)
return self._parse_response(raw_json, WorkflowScheduleResponse)

@validate_arguments
def find_schedule_query(
self, saved_query_id: str, max_results: int = 10
) -> List[WorkflowSearchResult]:
"""
Find scheduled query workflows by their saved query identifier.
:param saved_query_id: identifier of the saved query.
:param max_results: maximum number of results to retrieve. Defaults to `10`.
:raises AtlanError: on any API communication issue.
:returns: a list of scheduled query workflows.
"""
query = Bool(
filter=[
NestedQuery(
path="metadata",
query=Prefix(
field="metadata.name.keyword", value=f"asq-{saved_query_id}"
),
),
NestedQuery(
path="metadata",
query=Term(
field="metadata.annotations.package.argoproj.io/name.keyword",
value="@atlan/schedule-query",
),
),
]
)
request = WorkflowSearchRequest(query=query, size=max_results)
raw_json = self._client._call_api(
WORKFLOW_INDEX_SEARCH,
request_obj=request,
)
response = WorkflowSearchResponse(**raw_json)
return response.hits.hits or []

@validate_arguments
def re_run_schedule_query(self, schedule_query_id: str) -> WorkflowRunResponse:
"""
Re-run the scheduled query workflow by its schedule query identifier.
NOTE: Scheduled query workflows are re-triggered using
or impersonating the workflow owner's credentials.
:param schedule_query_id: identifier of the schedule query.
:raises AtlanError: on any API communication issue.
:returns: details of the workflow run.
"""
request = ReRunRequest(namespace="default", resource_name=schedule_query_id)
raw_json = self._client._call_api(
WORKFLOW_OWNER_RERUN,
request_obj=request,
)
return WorkflowRunResponse(**raw_json)

@validate_arguments
def find_schedule_query_between(
self, request: ScheduleQueriesSearchRequest, missed: bool = False
) -> Optional[List[WorkflowRunResponse]]:
"""
Find scheduled query workflows within the specified duration.
:param request: a `ScheduleQueriesSearchRequest` object containing
start and end dates in ISO 8601 format (e.g: `2024-03-25T16:30:00.000+05:30`).
:param missed: if `True`, perform a search for missed
scheduled query workflows. Defaults to `False`.
:raises AtlanError: on any API communication issue.
:returns: a list of scheduled query workflows found within the specified duration.
"""
query_params = {
"startDate": request.start_date,
"endDate": request.end_date,
}
SEARCH_API = (
SCHEDULE_QUERY_WORKFLOWS_MISSED
if missed
else SCHEDULE_QUERY_WORKFLOWS_SEARCH
)
raw_json = self._client._call_api(SEARCH_API, query_params=query_params)
return self._parse_response(raw_json, WorkflowRunResponse)
5 changes: 5 additions & 0 deletions pyatlan/model/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ class WorkflowRunResponse(WorkflowResponse):
status: WorkflowSearchResultStatus


class ScheduleQueriesSearchRequest(AtlanObject):
start_date: str = Field(description="Start date in ISO 8601 format")
end_date: str = Field(description="End date in ISO 8601 format")


class WorkflowSchedule(AtlanObject):
timezone: str
cron_schedule: str
Expand Down
30 changes: 30 additions & 0 deletions tests/unit/constants.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from pyatlan.model.assets import AtlasGlossary
from pyatlan.model.enums import AtlanWorkflowPhase
from pyatlan.model.workflow import (
ScheduleQueriesSearchRequest,
WorkflowMetadata,
WorkflowResponse,
WorkflowSchedule,
Expand Down Expand Up @@ -501,6 +502,7 @@
],
}


TEST_WORKFLOW_CLIENT_METHODS = {
"run": [
(["abc"], "value is not a valid dict"),
Expand Down Expand Up @@ -564,6 +566,34 @@
([[123]], "str type expected"),
([None], "none is not an allowed value"),
],
"find_schedule_query": [
([[123], 10], "saved_query_id\n str type expected"),
([None, 10], "none is not an allowed value"),
(["test-query-id", [123]], "max_results\n value is not a valid integer"),
(["test-query-id", None], "none is not an allowed value"),
],
"re_run_schedule_query": [
([[123]], "schedule_query_id\n str type expected"),
([None], "none is not an allowed value"),
],
"find_schedule_query_between": [
([[123], True], "value is not a valid dict"),
([None, True], "none is not an allowed value"),
(
[ScheduleQueriesSearchRequest(start_date="start", end_date="end"), [123]],
"missed\n value could not be parsed to a boolean",
),
(
[ScheduleQueriesSearchRequest(start_date="start", end_date="end"), None],
"none is not an allowed value",
),
],
"update_owner": [
([[123], 10], "workflow_name\n str type expected"),
([None, 10], "none is not an allowed value"),
(["test-workflow", [123]], "username\n str type expected"),
(["test-workflow", None], "none is not an allowed value"),
],
}

APPLICABLE_GLOSSARIES = "applicable_glossaries"
Expand Down
Loading

0 comments on commit 3b66bc7

Please sign in to comment.