Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

workflow api's for schedule queries #280

Merged
merged 19 commits into from
May 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions pyatlan/client/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,16 +394,41 @@
INDEX_SEARCH = API(INDEX_API, HTTPMethod.POST, HTTPStatus.OK, endpoint=EndPoint.ATLAS)
WORKFLOW_INDEX_API = "workflows/indexsearch"
WORKFLOW_INDEX_RUN_API = "runs/indexsearch"
SCHEDULE_QUERY_WORKFLOWS_SEARCH_API = "runs/cron/scheduleQueriesBetweenDuration"
SCHEDULE_QUERY_WORKFLOWS_MISSED_API = "runs/cron/missedScheduleQueriesBetweenDuration"

SCHEDULE_QUERY_WORKFLOWS_SEARCH = API(
SCHEDULE_QUERY_WORKFLOWS_SEARCH_API,
HTTPMethod.GET,
HTTPStatus.OK,
endpoint=EndPoint.HERACLES,
)

SCHEDULE_QUERY_WORKFLOWS_MISSED = API(
SCHEDULE_QUERY_WORKFLOWS_MISSED_API,
HTTPMethod.GET,
HTTPStatus.OK,
endpoint=EndPoint.HERACLES,
)

WORKFLOW_INDEX_SEARCH = API(
WORKFLOW_INDEX_API, HTTPMethod.POST, HTTPStatus.OK, endpoint=EndPoint.HERACLES
)
WORKFLOW_INDEX_RUN_SEARCH = API(
WORKFLOW_INDEX_RUN_API, HTTPMethod.POST, HTTPStatus.OK, endpoint=EndPoint.HERACLES
)
# triggers a workflow using the current user's credentials
WORKFLOW_RERUN_API = "workflows/submit"
WORKFLOW_RERUN = API(
WORKFLOW_RERUN_API, HTTPMethod.POST, HTTPStatus.OK, endpoint=EndPoint.HERACLES
)

# triggers a workflow using the workflow owner's credentials
WORKFLOW_OWNER_RERUN_API = "workflows/triggerAsOwner"
WORKFLOW_OWNER_RERUN = API(
WORKFLOW_OWNER_RERUN_API, HTTPMethod.POST, HTTPStatus.OK, endpoint=EndPoint.HERACLES
)

WORKFLOW_RUN_API = "workflows?submit=true"
WORKFLOW_RUN = API(
WORKFLOW_RUN_API, HTTPMethod.POST, HTTPStatus.OK, endpoint=EndPoint.HERACLES
Expand Down Expand Up @@ -440,6 +465,14 @@
HTTPStatus.OK,
endpoint=EndPoint.HERACLES,
)

WORKFLOW_CHANGE_OWNER = API(
WORKFLOW_API + "/{workflow_name}" + "/changeownership",
HTTPMethod.POST,
HTTPStatus.OK,
endpoint=EndPoint.HERACLES,
)

CREDENTIALS_API = "credentials"
TEST_CREDENTIAL_API = CREDENTIALS_API + "/test"
TEST_CREDENTIAL = API(
Expand Down
166 changes: 134 additions & 32 deletions pyatlan/client/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@
from pyatlan.client.constants import (
GET_ALL_SCHEDULE_RUNS,
GET_SCHEDULE_RUN,
SCHEDULE_QUERY_WORKFLOWS_MISSED,
SCHEDULE_QUERY_WORKFLOWS_SEARCH,
STOP_WORKFLOW_RUN,
WORKFLOW_ARCHIVE,
WORKFLOW_CHANGE_OWNER,
WORKFLOW_INDEX_RUN_SEARCH,
WORKFLOW_INDEX_SEARCH,
WORKFLOW_OWNER_RERUN,
WORKFLOW_RERUN,
WORKFLOW_RUN,
WORKFLOW_UPDATE,
Expand All @@ -23,6 +27,7 @@
from pyatlan.model.search import Bool, NestedQuery, Prefix, Query, Term
from pyatlan.model.workflow import (
ReRunRequest,
ScheduleQueriesSearchRequest,
Workflow,
WorkflowResponse,
WorkflowRunResponse,
Expand Down Expand Up @@ -56,14 +61,45 @@ def __init__(self, client: ApiCaller):
@staticmethod
def _parse_response(raw_json, response_type):
try:
if isinstance(raw_json, List):
if not raw_json:
return
elif isinstance(raw_json, list):
return parse_obj_as(List[response_type], raw_json)
return parse_obj_as(response_type, raw_json)
except ValidationError as err:
raise ErrorCode.JSON_ERROR.exception_with_parameters(
raw_json, 200, str(err)
) from err

@validate_arguments
def find_by_type(
self, prefix: WorkflowPackage, max_results: int = 10
) -> List[WorkflowSearchResult]:
"""
Find workflows based on their type (prefix). Note: Only workflows that have been run will be found.

:param prefix: name of the specific workflow to find (for example CONNECTION_DELETE)
:param max_results: the maximum number of results to retrieve
:returns: the list of workflows of the provided type, with the most-recently created first
:raises ValidationError: If the provided prefix is invalid workflow package
:raises AtlanError: on any API communication issue
"""
query = Bool(
filter=[
NestedQuery(
query=Prefix(field="metadata.name.keyword", value=prefix.value),
path="metadata",
)
]
)
request = WorkflowSearchRequest(query=query, size=max_results)
raw_json = self._client._call_api(
WORKFLOW_INDEX_SEARCH,
request_obj=request,
)
response = WorkflowSearchResponse(**raw_json)
return response.hits.hits or []

@validate_arguments
def _find_latest_run(self, workflow_name: str) -> Optional[WorkflowSearchResult]:
"""
Expand Down Expand Up @@ -154,36 +190,6 @@ def _add_schedule(
}
)

@validate_arguments
def find_by_type(
self, prefix: WorkflowPackage, max_results: int = 10
) -> List[WorkflowSearchResult]:
"""
Find workflows based on their type (prefix).
Note: Only workflows that have been run will be found.

:param prefix: name of the specific workflow to find (for example CONNECTION_DELETE)
:param max_results: the maximum number of results to retrieve
:returns: the list of workflows of the provided type, with the most-recently created first
:raises ValidationError: If the provided prefix is invalid workflow package
:raises AtlanError: on any API communication issue
"""
query = Bool(
filter=[
NestedQuery(
query=Prefix(field="metadata.name.keyword", value=prefix.value),
path="metadata",
)
]
)
request = WorkflowSearchRequest(query=query, size=max_results)
raw_json = self._client._call_api(
WORKFLOW_INDEX_SEARCH,
request_obj=request,
)
response = WorkflowSearchResponse(**raw_json)
return response.hits.hits or []

def _handle_workflow_types(self, workflow):
if isinstance(workflow, WorkflowPackage):
if results := self.find_by_type(workflow):
Expand Down Expand Up @@ -302,6 +308,22 @@ def update(self, workflow: Workflow) -> WorkflowResponse:
)
return WorkflowResponse(**raw_json)

@validate_arguments
def update_owner(self, workflow_name: str, username: str) -> WorkflowResponse:
"""
Update the owner of the specified workflow.

:param workflow_name: name of the workflow to update.
:param username: username of the new owner.
:raises AtlanError: on any API communication issue.
:returns: updated workflow.
"""
raw_json = self._client._call_api(
WORKFLOW_CHANGE_OWNER.format_path({"workflow_name": workflow_name}),
query_params={"username": username},
)
return WorkflowResponse(**raw_json)

@validate_arguments(config=dict(arbitrary_types_allowed=True))
def monitor(
self, workflow_response: WorkflowResponse, logger: Optional[Logger] = None
Expand Down Expand Up @@ -339,7 +361,6 @@ def monitor(
def _get_run_details(self, name: str) -> Optional[WorkflowSearchResult]:
return self._find_latest_run(workflow_name=name)

@validate_arguments
def get_runs(
self,
workflow_name: str,
Expand Down Expand Up @@ -528,3 +549,84 @@ def get_scheduled_run(self, workflow_name: str) -> WorkflowScheduleResponse:
GET_SCHEDULE_RUN.format_path({"workflow_name": f"{workflow_name}-cron"}),
)
return self._parse_response(raw_json, WorkflowScheduleResponse)

@validate_arguments
def find_schedule_query(
self, saved_query_id: str, max_results: int = 10
) -> List[WorkflowSearchResult]:
"""
Find scheduled query workflows by their saved query identifier.

:param saved_query_id: identifier of the saved query.
:param max_results: maximum number of results to retrieve. Defaults to `10`.
:raises AtlanError: on any API communication issue.
:returns: a list of scheduled query workflows.
"""
query = Bool(
filter=[
NestedQuery(
path="metadata",
query=Prefix(
field="metadata.name.keyword", value=f"asq-{saved_query_id}"
),
),
NestedQuery(
path="metadata",
query=Term(
field="metadata.annotations.package.argoproj.io/name.keyword",
value="@atlan/schedule-query",
),
),
]
)
request = WorkflowSearchRequest(query=query, size=max_results)
raw_json = self._client._call_api(
WORKFLOW_INDEX_SEARCH,
request_obj=request,
)
response = WorkflowSearchResponse(**raw_json)
return response.hits.hits or []

@validate_arguments
def re_run_schedule_query(self, schedule_query_id: str) -> WorkflowRunResponse:
"""
Re-run the scheduled query workflow by its schedule query identifier.
NOTE: Scheduled query workflows are re-triggered using
or impersonating the workflow owner's credentials.

:param schedule_query_id: identifier of the schedule query.
:raises AtlanError: on any API communication issue.
:returns: details of the workflow run.
"""
request = ReRunRequest(namespace="default", resource_name=schedule_query_id)
raw_json = self._client._call_api(
WORKFLOW_OWNER_RERUN,
request_obj=request,
)
return WorkflowRunResponse(**raw_json)

@validate_arguments
def find_schedule_query_between(
self, request: ScheduleQueriesSearchRequest, missed: bool = False
) -> Optional[List[WorkflowRunResponse]]:
"""
Find scheduled query workflows within the specified duration.

:param request: a `ScheduleQueriesSearchRequest` object containing
start and end dates in ISO 8601 format (e.g: `2024-03-25T16:30:00.000+05:30`).
:param missed: if `True`, perform a search for missed
scheduled query workflows. Defaults to `False`.
:raises AtlanError: on any API communication issue.
:returns: a list of scheduled query workflows found within the specified duration.
"""
query_params = {
"startDate": request.start_date,
"endDate": request.end_date,
}
SEARCH_API = (
SCHEDULE_QUERY_WORKFLOWS_MISSED
if missed
else SCHEDULE_QUERY_WORKFLOWS_SEARCH
)
raw_json = self._client._call_api(SEARCH_API, query_params=query_params)
return self._parse_response(raw_json, WorkflowRunResponse)
5 changes: 5 additions & 0 deletions pyatlan/model/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ class WorkflowRunResponse(WorkflowResponse):
status: WorkflowSearchResultStatus


class ScheduleQueriesSearchRequest(AtlanObject):
start_date: str = Field(description="Start date in ISO 8601 format")
end_date: str = Field(description="End date in ISO 8601 format")


class WorkflowSchedule(AtlanObject):
timezone: str
cron_schedule: str
Expand Down
30 changes: 30 additions & 0 deletions tests/unit/constants.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from pyatlan.model.assets import AtlasGlossary
from pyatlan.model.enums import AtlanWorkflowPhase
from pyatlan.model.workflow import (
ScheduleQueriesSearchRequest,
WorkflowMetadata,
WorkflowResponse,
WorkflowSchedule,
Expand Down Expand Up @@ -501,6 +502,7 @@
],
}


TEST_WORKFLOW_CLIENT_METHODS = {
"run": [
(["abc"], "value is not a valid dict"),
Expand Down Expand Up @@ -564,6 +566,34 @@
([[123]], "str type expected"),
([None], "none is not an allowed value"),
],
"find_schedule_query": [
([[123], 10], "saved_query_id\n str type expected"),
([None, 10], "none is not an allowed value"),
(["test-query-id", [123]], "max_results\n value is not a valid integer"),
(["test-query-id", None], "none is not an allowed value"),
],
"re_run_schedule_query": [
([[123]], "schedule_query_id\n str type expected"),
([None], "none is not an allowed value"),
],
"find_schedule_query_between": [
([[123], True], "value is not a valid dict"),
([None, True], "none is not an allowed value"),
(
[ScheduleQueriesSearchRequest(start_date="start", end_date="end"), [123]],
"missed\n value could not be parsed to a boolean",
),
(
[ScheduleQueriesSearchRequest(start_date="start", end_date="end"), None],
"none is not an allowed value",
),
],
"update_owner": [
([[123], 10], "workflow_name\n str type expected"),
([None, 10], "none is not an allowed value"),
(["test-workflow", [123]], "username\n str type expected"),
(["test-workflow", None], "none is not an allowed value"),
],
}

APPLICABLE_GLOSSARIES = "applicable_glossaries"
Expand Down
Loading
Loading