Skip to content

Commit

Permalink
fix merge conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
cw75 committed May 31, 2023
2 parents 6ef6a42 + c715756 commit 7fd18b6
Show file tree
Hide file tree
Showing 51 changed files with 1,151 additions and 629 deletions.
4 changes: 2 additions & 2 deletions integration_tests/backend/test_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,8 +451,8 @@ def test_endpoint_nodes_get(self):
resp = self.get_response(self.GET_NODES_TEMPLATE % (flow_id, dag_id)).json()

all_output_counts = []
for artifact in resp["operators"]:
result = GetNodeOperatorResponse(**artifact)
for operator in resp["operators"]:
result = GetNodeOperatorResponse(**operator)
all_output_counts.append(len(result.outputs))
assert sum(all_output_counts) == len(all_output_counts)
assert set(all_output_counts) == set([1])
Expand Down
101 changes: 94 additions & 7 deletions sdk/aqueduct/backend/api_client.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import io
import json
import uuid
Expand All @@ -16,22 +17,30 @@
UnprocessableEntityError,
)
from aqueduct.logger import logger
from aqueduct.models.dag import DAG
from aqueduct.models.operators import ParamSpec
from aqueduct.models.artifact import ArtifactMetadata
from aqueduct.models.dag import DAG, Metadata
from aqueduct.models.operators import Operator, ParamSpec
from aqueduct.models.resource import BaseResource, ResourceInfo
from aqueduct.models.response_models import (
DeleteWorkflowResponse,
DynamicEngineStatusResponse,
GetDagResponse,
GetDagResultResponse,
GetImageURLResponse,
GetNodeArtifactResponse,
GetNodeOperatorResponse,
GetVersionResponse,
GetWorkflowDagResultResponse,
GetWorkflowResponse,
GetWorkflowV1Response,
ListWorkflowResponseEntry,
ListWorkflowSavedObjectsResponse,
PreviewResponse,
RegisterAirflowWorkflowResponse,
RegisterWorkflowResponse,
SavedObjectUpdate,
WorkflowDagResponse,
WorkflowDagResultResponse,
)
from aqueduct.utils.serialization import deserialize
from pkg_resources import get_distribution, parse_version
Expand All @@ -56,6 +65,13 @@ class APIClient:
HTTP_PREFIX = "http://"
HTTPS_PREFIX = "https://"

# V2
GET_WORKFLOW_TEMPLATE = "/api/v2/workflow/%s"
GET_DAGS_TEMPLATE = "/api/v2/workflow/%s/dags"
GET_DAG_RESULTS_TEMPLATE = "/api/v2/workflow/%s/results"
GET_NODES_TEMPLATE = "/api/v2/workflow/%s/dag/%s/nodes"

# V1
GET_VERSION_ROUTE = "/api/version"
CONNECT_RESOURCE_ROUTE = "/api/resource/connect"
DELETE_RESOURCE_ROUTE_TEMPLATE = "/api/resource/%s/delete"
Expand Down Expand Up @@ -531,12 +547,83 @@ def delete_workflow(
self.raise_errors(response)
return DeleteWorkflowResponse(**response.json())

def get_workflow(self, flow_id: str) -> GetWorkflowResponse:
def get_workflow(self, flow_id: str) -> GetWorkflowV1Response:
headers = self._generate_auth_headers()
url = self.construct_full_url(self.GET_WORKFLOW_ROUTE_TEMPLATE % flow_id)
resp = requests.get(url, headers=headers)
self.raise_errors(resp)
return GetWorkflowResponse(**resp.json())

url = self.construct_full_url(self.GET_WORKFLOW_TEMPLATE % flow_id)
flow_response = requests.get(url, headers=headers)
self.raise_errors(flow_response)
resp_flow = GetWorkflowResponse(**flow_response.json())
metadata = Metadata(
name=resp_flow.name,
description=resp_flow.description,
schedule=resp_flow.schedule,
retention_policy=resp_flow.retention_policy,
)

url = self.construct_full_url(self.GET_DAGS_TEMPLATE % flow_id)
dags_response = requests.get(url, headers=headers)
self.raise_errors(dags_response)
resp_dags = {dag["id"]: GetDagResponse(**dag) for dag in dags_response.json()}
# Metadata from WorkflowResponse so it is the same for all DAG.
dags = {}
for dag in resp_dags.values():
url = self.construct_full_url(self.GET_NODES_TEMPLATE % (flow_id, dag.id))
nodes_resp = requests.get(url, headers=headers)
self.raise_errors(nodes_resp)
resp_nodes = nodes_resp.json()

ops = {}
for operator in resp_nodes["operators"]:
op = GetNodeOperatorResponse(**operator)
ops[str(op.id)] = Operator(
id=op.id,
name=op.name,
description=op.description,
spec=op.spec,
inputs=op.inputs,
outputs=op.outputs,
)

artfs = {}
for artifact in resp_nodes["artifacts"]:
artf = GetNodeArtifactResponse(**artifact)
artfs[str(artf.id)] = ArtifactMetadata(
id=artf.id,
name=artf.name,
type=artf.type,
)

dags[dag.id] = WorkflowDagResponse(
id=dag.id,
workflow_id=dag.workflow_id,
metadata=metadata,
operators=ops,
artifacts=artfs,
)

url = self.construct_full_url(self.GET_DAG_RESULTS_TEMPLATE % flow_id)
results_resp = requests.get(url, headers=headers)
self.raise_errors(results_resp)
dag_results = [GetDagResultResponse(**dag_result) for dag_result in results_resp.json()]
workflow_dag_results = [
WorkflowDagResultResponse(
id=dag_result.id,
created_at=int(
datetime.datetime.strptime(
resp_dags[str(dag_result.dag_id)].created_at[:-4],
"%Y-%m-%dT%H:%M:%S.%f"
if resp_dags[str(dag_result.dag_id)].created_at[-1] == "Z"
else "%Y-%m-%dT%H:%M:%S.%f%z",
).timestamp()
),
status=dag_result.exec_state.status,
exec_state=dag_result.exec_state,
workflow_dag_id=dag_result.dag_id,
)
for dag_result in dag_results
]
return GetWorkflowV1Response(workflow_dags=dags, workflow_dag_results=workflow_dag_results)

def get_workflow_dag_result(self, flow_id: str, result_id: str) -> GetWorkflowDagResultResponse:
headers = self._generate_auth_headers()
Expand Down
8 changes: 8 additions & 0 deletions sdk/aqueduct/constants/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,14 @@ class ExecutionStatus(str, Enum, metaclass=MetaEnum):
CANCELED = "canceled"


class NotificationLogLevel(str, Enum, metaclass=MetaEnum):
SUCCESS = "success"
WARNING = "warning"
ERROR = "error"
INFO = "info"
NEUTRAL = "neutral"


class FailureType(Enum, metaclass=MetaEnum):
SYSTEM = 1
USER_FATAL = 2
Expand Down
4 changes: 2 additions & 2 deletions sdk/aqueduct/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from aqueduct.error import InvalidUserArgumentException
from aqueduct.flow_run import FlowRun
from aqueduct.models.response_models import (
GetWorkflowResponse,
GetWorkflowV1Response,
SavedObjectUpdate,
WorkflowDagResponse,
)
Expand Down Expand Up @@ -35,7 +35,7 @@ def id(self) -> uuid.UUID:
"""Returns the id of the flow."""
return uuid.UUID(self._id)

def _get_workflow_resp(self) -> GetWorkflowResponse:
def _get_workflow_resp(self) -> GetWorkflowV1Response:
resp = globals.__GLOBAL_API_CLIENT__.get_workflow(self._id)
return resp

Expand Down
42 changes: 41 additions & 1 deletion sdk/aqueduct/models/response_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
ArtifactType,
ExecutionStatus,
K8sClusterStatusType,
NotificationLogLevel,
SerializationType,
)
from aqueduct.models.artifact import ArtifactMetadata
Expand All @@ -23,6 +24,45 @@ class ArtifactResult(BaseModel):


# V2 Responses
class NotificationSettings(BaseModel):
"""Represents the notification settings associated with a workflow."""

settings: Optional[Dict[str, NotificationLogLevel]]


class GetWorkflowResponse(BaseModel):
"""Represents a single workflow.
Attributes:
id:
The id of the artifact node.
user_id:
The user id of the owner.
name:
The name of the workflow.
description:
The description of the workflow.
schedule:
The schedule of the workflow.
created_at:
When the workflow is created.
retention_policy:
Workflow retention policy regarding number of DAGs to save.
notification_settings:
Notification setting of workflow.
"""

id: uuid.UUID
user_id: uuid.UUID
name: str
description: str
schedule: Schedule
created_at: str
retention_policy: RetentionPolicy
notification_settings: NotificationSettings


class GetDagResponse(BaseModel):
id: uuid.UUID
workflow_id: uuid.UUID
Expand Down Expand Up @@ -336,7 +376,7 @@ def to_readable_dict(self) -> Dict[str, str]:
return readable


class GetWorkflowResponse(BaseModel):
class GetWorkflowV1Response(BaseModel):
"""This is the response object returned by api_client.get_workflow().
Attributes:
Expand Down
2 changes: 1 addition & 1 deletion sdk/aqueduct/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def generate_uuid() -> uuid.UUID:


WORKFLOW_UI_ROUTE_TEMPLATE = "/workflow/%s"
WORKFLOW_RUN_UI_ROUTE_TEMPLATE = "?workflowDagResultId=%s"
WORKFLOW_RUN_UI_ROUTE_TEMPLATE = "/result/%s"


def generate_ui_url(
Expand Down
2 changes: 1 addition & 1 deletion src/golang/lib/workflow/dag/workflow_dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (dag *workflowDagImpl) Link() string {

func (dag *workflowDagImpl) ResultLink() string {
return fmt.Sprintf(
"%s/workflow/%s?workflowDagResultId=%s",
"%s/workflow/%s/result/%s",
dag.displayIP,
dag.ID(),
dag.ResultID(),
Expand Down
2 changes: 1 addition & 1 deletion src/terraform/eks/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ module "eks_kubernetes_addons" {

module "vpc" {
source = "terraform-aws-modules/vpc/aws"
version = "~> 3.0"
version = "~> 4.0"

name = local.name
cidr = local.vpc_cidr
Expand Down
20 changes: 5 additions & 15 deletions src/ui/common/src/components/pages/resource/id/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import { OperatorResponse } from '../../../../handlers/responses/node';
import { handleFetchAllWorkflowSummaries } from '../../../../reducers/listWorkflowSummaries';
import {
handleListResourceObjects,
handleLoadResourceOperators,
handleTestConnectResource,
resetEditStatus,
resetTestConnectStatus,
Expand All @@ -38,7 +37,6 @@ import {
isNotificationResource,
ResourceCategories,
resourceExecState,
SupportedResources,
} from '../../../../utils/resources';
import ExecutionStatus, {
isFailed,
Expand Down Expand Up @@ -107,18 +105,13 @@ const ResourceDetailsPage: React.FC<ResourceDetailsPageProps> = ({
);

const selectedResource = resources[resourceId];
const resourceClass = SupportedResources[selectedResource?.service];

// Using the ListResourcesRoute.
// ENG-1036: We should create a route where we can pass in the resourceId and get the associated metadata and switch to using that.
useEffect(() => {
dispatch(handleLoadResources({ apiKey: user.apiKey }));
dispatch(handleFetchAllWorkflowSummaries({ apiKey: user.apiKey }));
dispatch(
handleLoadResourceOperators({
apiKey: user.apiKey,
resourceId: resourceId,
})
);
}, [dispatch, resourceId, user.apiKey]);

useEffect(() => {
Expand Down Expand Up @@ -176,11 +169,7 @@ const ResourceDetailsPage: React.FC<ResourceDetailsPageProps> = ({
resourceId: resourceId,
});

const {
data: resourceOperators,
error: testOpsErr,
isLoading: testOpsIsLoading,
} = useResourceOperatorsGetQuery({
const { data: resourceOperators } = useResourceOperatorsGetQuery({
apiKey: user.apiKey,
resourceId: resourceId,
});
Expand Down Expand Up @@ -212,7 +201,7 @@ const ResourceDetailsPage: React.FC<ResourceDetailsPageProps> = ({
});
}

if (fetchWorkflowsIsLoading) {
if (fetchWorkflowsIsLoading || !selectedResource || !resourceClass) {
return null;
}

Expand All @@ -229,7 +218,6 @@ const ResourceDetailsPage: React.FC<ResourceDetailsPageProps> = ({
}

const selectedResourceExecState = resourceExecState(selectedResource);

return (
<Layout
breadcrumbs={[
Expand Down Expand Up @@ -423,6 +411,8 @@ const ResourceDetailsPage: React.FC<ResourceDetailsPageProps> = ({
dispatch(resetEditStatus());
}}
resourceToEdit={selectedResource}
dialogContent={resourceClass.dialog}
validationSchema={resourceClass.validationSchema(!!selectedResource)}
/>
)}

Expand Down
Loading

0 comments on commit 7fd18b6

Please sign in to comment.