diff --git a/.github/workflows/mypy.yml b/.github/workflows/mypy.yml index 2beca9cd3..f861c759e 100644 --- a/.github/workflows/mypy.yml +++ b/.github/workflows/mypy.yml @@ -28,7 +28,7 @@ jobs: - name: Install Type Stub Libraries run: | - python3 -m pip install types-croniter types-requests types-PyYAML types-setuptools types-PyMySQL + python3 -m pip install types-croniter types-requests types-PyYAML types-setuptools types-PyMySQL types-python-dateutil - name: mypy sdk working-directory: sdk diff --git a/integration_tests/sdk/aqueduct_tests/flow_test.py b/integration_tests/sdk/aqueduct_tests/flow_test.py index 0283e3d88..9d7ac8c19 100644 --- a/integration_tests/sdk/aqueduct_tests/flow_test.py +++ b/integration_tests/sdk/aqueduct_tests/flow_test.py @@ -495,6 +495,63 @@ def noop(): assert len(flow.list_saved_objects()) == 0 +def test_flow_with_disabled_snapshots(client, flow_name, data_resource, engine): + @op + def op_disabled_by_wf(df): + return df + + @metric + def metric_enabled(df): + return df.shape[0] + + @metric + def metric_disabled(df): + return df.shape[0] + + @check + def check_enabled(count): + return count > 10 + + reviews = extract(data_resource, DataObject.SENTIMENT, output_name="extract artifact") + op_artf = op_disabled_by_wf(reviews) + metric_enabled(op_artf) + metric_disabled_artf = metric_disabled(op_artf) + metric_disabled_artf.disable_snapshot() + check_enabled(metric_disabled_artf) + + op_artf.enable_snapshot() + save(data_resource, op_artf) + + flow = publish_flow_test( + client, op_artf, name=flow_name(), engine=engine, disable_snapshots=True + ) + latest_run = flow.latest() + assert ( + latest_run.artifact("extract artifact").execution_state().status == ExecutionStatus.DELETED + ) + assert latest_run.artifact("extract artifact").get() is None + assert ( + latest_run.artifact("op_disabled_by_wf artifact").execution_state().status + == ExecutionStatus.DELETED + ) + assert latest_run.artifact("op_disabled_by_wf artifact").get() is None + assert ( + latest_run.artifact("metric_enabled artifact").execution_state().status + == ExecutionStatus.SUCCEEDED + ) + assert latest_run.artifact("metric_enabled artifact").get() == 100 + assert ( + latest_run.artifact("metric_disabled artifact").execution_state().status + == ExecutionStatus.DELETED + ) + assert latest_run.artifact("metric_disabled artifact").get() is None + assert ( + latest_run.artifact("check_enabled artifact").execution_state().status + == ExecutionStatus.SUCCEEDED + ) + assert latest_run.artifact("check_enabled artifact").get() + + def test_artifact_set_name(client, flow_name, engine): @op def foo(): diff --git a/integration_tests/sdk/shared/flow_helpers.py b/integration_tests/sdk/shared/flow_helpers.py index 5b34fc38b..4078ef38e 100644 --- a/integration_tests/sdk/shared/flow_helpers.py +++ b/integration_tests/sdk/shared/flow_helpers.py @@ -32,6 +32,7 @@ def publish_flow_test( source_flow: Optional[Union[Flow, str, uuid.UUID]] = None, should_block: bool = True, use_local: bool = False, + disable_snapshots: bool = False, ) -> Flow: """Publishes a flow and waits for a specified number of runs with specified statuses to complete. @@ -89,6 +90,7 @@ def publish_flow_test( engine=engine, source_flow=source_flow, use_local=use_local, + disable_snapshots=disable_snapshots, ) print("Workflow registration succeeded. Workflow ID %s. Name: %s" % (flow.id(), name)) diff --git a/manual_qa_tests/initialize.py b/manual_qa_tests/initialize.py index 9d6c761fe..b9791df35 100644 --- a/manual_qa_tests/initialize.py +++ b/manual_qa_tests/initialize.py @@ -14,6 +14,7 @@ succeed_dag_layout_test, succeed_march_madness_dag_layout_test, succeed_parameters, + succeed_with_snapshots_disabled, warning_bad_check, ) @@ -31,6 +32,7 @@ succeed_march_madness_dag_layout_test, fail_bad_operator, no_run, + succeed_with_snapshots_disabled, ] DEMO_NOTEBOOKS_PATHS = [ diff --git a/manual_qa_tests/workflows/succeed_with_snapshots_disabled.py b/manual_qa_tests/workflows/succeed_with_snapshots_disabled.py new file mode 100644 index 000000000..11bb89fed --- /dev/null +++ b/manual_qa_tests/workflows/succeed_with_snapshots_disabled.py @@ -0,0 +1,50 @@ +import aqueduct as aq + +NAME = "succeed_with_snapshots_disabled" +DESCRIPTION = """* Workflows Page: should succeed. +* Workflow Details Page: + * There artifacts are shown deleted: op_disabled artf, Demo query artf + * Both checks should show 'passed'. +""" + + +@aq.op(requirements=[]) +def op_disabled(df): + return df + + +@aq.metric(requirements=[]) +def metric(df): + return df.shape[0] + + +@aq.check(requirements=[]) +def check(count): + return count > 10 + + +@aq.check(requirements=[]) +def check_disabled(count): + return count > 10 + + +def deploy(client, resource_name): + resource = client.resource(resource_name) + reviews = resource.sql("SELECT * FROM hotel_reviews") + op_artf = op_disabled(reviews) + metric_artf = metric(op_artf) + check(metric_artf) + check_disabled_artf = check_disabled(metric_artf) + + op_artf.enable_snapshot() + check_disabled_artf.disable_snapshot() + + resource.save(op_artf, "succeed_with_snapshots_disabled_tmp", aq.LoadUpdateMode.REPLACE) + + client.publish_flow( + artifacts=[op_artf], + name=NAME, + description=DESCRIPTION, + schedule="", + disable_snapshots=True, + ) diff --git a/scripts/install_local.py b/scripts/install_local.py index 918cd6fea..83eef5006 100644 --- a/scripts/install_local.py +++ b/scripts/install_local.py @@ -26,7 +26,7 @@ ui_directory = join(os.environ["HOME"], ".aqueduct", "ui") # Make sure to update this if there is any schema change we want to include in the upgrade. -SCHEMA_VERSION = "27" +SCHEMA_VERSION = "28" def execute_command(args, cwd=None): diff --git a/sdk/aqueduct/artifacts/base_artifact.py b/sdk/aqueduct/artifacts/base_artifact.py index 289cb8edd..685f38267 100644 --- a/sdk/aqueduct/artifacts/base_artifact.py +++ b/sdk/aqueduct/artifacts/base_artifact.py @@ -1,10 +1,13 @@ import json import uuid from abc import ABC, abstractmethod -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, Union +import numpy as np from aqueduct.constants.enums import ArtifactType, OperatorType from aqueduct.models.dag import DAG +from aqueduct.models.execution_state import ExecutionState, ExecutionStatus +from aqueduct.type_annotations import Number from aqueduct.utils.naming import sanitize_artifact_name @@ -14,6 +17,33 @@ class BaseArtifact(ABC): _content: Any _from_flow_run: bool _from_operator_type: Optional[OperatorType] = None + _execution_state: Optional[ExecutionState] = None + + def __init__( + self, + dag: DAG, + artifact_id: uuid.UUID, + content: Optional[Union[bool, np.bool_, Number]] = None, + from_flow_run: bool = False, + execution_state: Optional[ExecutionState] = None, + ) -> None: + self._dag = dag + self._artifact_id = artifact_id + + # This parameter indicates whether the artifact is fetched from flow-run or not. + self._from_flow_run = from_flow_run + self._set_content(content) + + # For now, the execution_state is only relevant when it's fetched from a flow run. + # It stays 'None' when the artifact runs in previews. + self._execution_state = execution_state + + def _is_content_deleted(self) -> bool: + if self._from_flow_run: + if self._execution_state and self._execution_state.status == ExecutionStatus.DELETED: + return True + + return False def id(self) -> uuid.UUID: """Fetch the id associated with this artifact. @@ -29,6 +59,12 @@ def name(self) -> str: def type(self) -> ArtifactType: return self._dag.must_get_artifact(artifact_id=self._artifact_id).type + def snapshot_enabled(self) -> bool: + return self._dag.must_get_artifact(artifact_id=self._artifact_id).should_persist + + def execution_state(self) -> Optional[ExecutionState]: + return self._execution_state + def _get_content(self) -> Any: return self._content @@ -41,6 +77,12 @@ def set_operator_type(self, operator_type: OperatorType) -> None: def set_name(self, name: str) -> None: self._dag.update_artifact_name(self._artifact_id, sanitize_artifact_name(name)) + def enable_snapshot(self) -> None: + self._dag.update_artifact_should_persist(self._artifact_id, True) + + def disable_snapshot(self) -> None: + self._dag.update_artifact_should_persist(self._artifact_id, False) + def _describe(self) -> Dict[str, Any]: input_operator = self._dag.must_get_operator(with_output_artifact_id=self._artifact_id) return { diff --git a/sdk/aqueduct/artifacts/bool_artifact.py b/sdk/aqueduct/artifacts/bool_artifact.py index 6987f028f..86cc4828f 100644 --- a/sdk/aqueduct/artifacts/bool_artifact.py +++ b/sdk/aqueduct/artifacts/bool_artifact.py @@ -10,6 +10,7 @@ from aqueduct.constants.enums import OperatorType from aqueduct.error import ArtifactNeverComputedException from aqueduct.models.dag import DAG +from aqueduct.models.execution_state import ExecutionState from aqueduct.models.operators import get_operator_type from aqueduct.utils.describe import get_readable_description_for_check from aqueduct.utils.utils import format_header_for_print @@ -43,15 +44,11 @@ def __init__( artifact_id: uuid.UUID, content: Optional[Union[bool, np.bool_]] = None, from_flow_run: bool = False, + execution_state: Optional[ExecutionState] = None, ): - self._dag = dag - self._artifact_id = artifact_id + super().__init__(dag, artifact_id, content, from_flow_run, execution_state) - # This parameter indicates whether the artifact is fetched from flow-run or not. - self._from_flow_run = from_flow_run - self._set_content(content) - - def get(self, parameters: Optional[Dict[str, Any]] = None) -> Union[bool, np.bool_]: + def get(self, parameters: Optional[Dict[str, Any]] = None) -> Optional[Union[bool, np.bool_]]: """Materializes a BoolArtifact into a boolean. Returns: @@ -63,6 +60,9 @@ def get(self, parameters: Optional[Dict[str, Any]] = None) -> Union[bool, np.boo InternalServerError: An unexpected error occurred in the server. """ + if self._is_content_deleted(): + return None + self._dag.must_get_artifact(self._artifact_id) if self._from_flow_run: diff --git a/sdk/aqueduct/artifacts/generic_artifact.py b/sdk/aqueduct/artifacts/generic_artifact.py index 574fad443..d4dad34cf 100644 --- a/sdk/aqueduct/artifacts/generic_artifact.py +++ b/sdk/aqueduct/artifacts/generic_artifact.py @@ -11,6 +11,7 @@ from aqueduct.constants.enums import ArtifactType, ExecutionStatus from aqueduct.error import ArtifactNeverComputedException from aqueduct.models.dag import DAG +from aqueduct.models.execution_state import ExecutionState from aqueduct.utils.utils import format_header_for_print from aqueduct import globals @@ -30,22 +31,13 @@ def __init__( artifact_type: ArtifactType = ArtifactType.UNTYPED, content: Optional[Any] = None, from_flow_run: bool = False, - execution_status: Optional[ExecutionStatus] = None, + execution_state: Optional[ExecutionState] = None, ): # Cannot initialize a generic artifact's content without also setting its type. if content is not None: assert artifact_type != ArtifactType.UNTYPED - self._dag = dag - self._artifact_id = artifact_id - - # This parameter indicates whether the artifact is fetched from flow-run or not. - self._from_flow_run = from_flow_run - self._set_content(content) - # This is only relevant to generic artifact produced from flow_run.artifact(). - # We need this to distinguish between when an artifact's content is None versus - # when it fails to compute successfully. - self._execution_status = execution_status + super().__init__(dag, artifact_id, content, from_flow_run, execution_state) def get(self, parameters: Optional[Dict[str, Any]] = None) -> Any: """Materializes the artifact. @@ -59,10 +51,17 @@ def get(self, parameters: Optional[Dict[str, Any]] = None) -> Any: InternalServerError: An unexpected error occurred in the server. """ + if self._is_content_deleted(): + return None + self._dag.must_get_artifact(self._artifact_id) if self._from_flow_run: - if self._execution_status != ExecutionStatus.SUCCEEDED: + if ( + not self._execution_state + or self._execution_state.status != ExecutionStatus.SUCCEEDED + ): + # DELETED case is already covered. raise ArtifactNeverComputedException( "This artifact was part of an existing flow run but was never computed successfully!", ) diff --git a/sdk/aqueduct/artifacts/numeric_artifact.py b/sdk/aqueduct/artifacts/numeric_artifact.py index f1b11c7d3..c2becd265 100644 --- a/sdk/aqueduct/artifacts/numeric_artifact.py +++ b/sdk/aqueduct/artifacts/numeric_artifact.py @@ -20,6 +20,7 @@ from aqueduct.error import AqueductError, ArtifactNeverComputedException from aqueduct.models.artifact import ArtifactMetadata from aqueduct.models.dag import DAG +from aqueduct.models.execution_state import ExecutionState from aqueduct.models.operators import ( CheckSpec, FunctionSpec, @@ -68,15 +69,11 @@ def __init__( artifact_id: uuid.UUID, content: Optional[Number] = None, from_flow_run: bool = False, + execution_state: Optional[ExecutionState] = None, ): - self._dag = dag - self._artifact_id = artifact_id + super().__init__(dag, artifact_id, content, from_flow_run, execution_state) - # This parameter indicates whether the artifact is fetched from flow-run or not. - self._from_flow_run = from_flow_run - self._set_content(content) - - def get(self, parameters: Optional[Dict[str, Any]] = None) -> Number: + def get(self, parameters: Optional[Dict[str, Any]] = None) -> Optional[Number]: """Materializes a NumericArtifact into its immediate float value. Returns: @@ -89,9 +86,12 @@ def get(self, parameters: Optional[Dict[str, Any]] = None) -> Number: An unexpected error occurred within the Aqueduct cluster. """ self._dag.must_get_artifact(self._artifact_id) + if self._is_content_deleted(): + return None if self._from_flow_run: if self._get_content() is None: + # DELETED case is already covered. raise ArtifactNeverComputedException( "This artifact was part of an existing flow run but was never computed successfully!", ) diff --git a/sdk/aqueduct/artifacts/table_artifact.py b/sdk/aqueduct/artifacts/table_artifact.py index d3b9bf7c6..e1beec61d 100644 --- a/sdk/aqueduct/artifacts/table_artifact.py +++ b/sdk/aqueduct/artifacts/table_artifact.py @@ -20,6 +20,7 @@ ) from aqueduct.error import AqueductError, ArtifactNeverComputedException from aqueduct.models.dag import DAG +from aqueduct.models.execution_state import ExecutionState from aqueduct.models.operators import CheckSpec, FunctionSpec, MetricSpec, OperatorSpec from aqueduct.utils.dag_deltas import RemoveCheckOperatorDelta, apply_deltas_to_dag from aqueduct.utils.describe import ( @@ -62,15 +63,11 @@ def __init__( artifact_id: uuid.UUID, content: Optional[pd.DataFrame] = None, from_flow_run: bool = False, + execution_state: Optional[ExecutionState] = None, ): - self._dag = dag - self._artifact_id = artifact_id + super().__init__(dag, artifact_id, content, from_flow_run, execution_state) - # This parameter indicates whether the artifact is fetched from flow-run or not. - self._from_flow_run = from_flow_run - self._set_content(content) - - def get(self, parameters: Optional[Dict[str, Any]] = None) -> pd.DataFrame: + def get(self, parameters: Optional[Dict[str, Any]] = None) -> Optional[pd.DataFrame]: """Materializes TableArtifact into an actual dataframe. Args: @@ -88,9 +85,12 @@ def get(self, parameters: Optional[Dict[str, Any]] = None) -> pd.DataFrame: An unexpected error occurred within the Aqueduct cluster. """ self._dag.must_get_artifact(self._artifact_id) + if self._is_content_deleted(): + return None if self._from_flow_run: if self._get_content() is None: + # DELETED case is already covered. raise ArtifactNeverComputedException( "This artifact was part of an existing flow run but was never computed successfully!", ) @@ -113,7 +113,9 @@ def get(self, parameters: Optional[Dict[str, Any]] = None) -> pd.DataFrame: assert isinstance(content, pd.DataFrame) return content - def head(self, n: int = 5, parameters: Optional[Dict[str, Any]] = None) -> pd.DataFrame: + def head( + self, n: int = 5, parameters: Optional[Dict[str, Any]] = None + ) -> Optional[pd.DataFrame]: """Returns a preview of the table artifact. >>> db = client.resource(name="demo/") @@ -128,7 +130,8 @@ def head(self, n: int = 5, parameters: Optional[Dict[str, Any]] = None) -> pd.Da A dataframe containing the table contents of this artifact. """ df = self.get(parameters=parameters) - return df.head(n) + + return df.head(n) if df is not None else None PRESET_METRIC_LIST = ["number_of_missing_values", "number_of_rows", "max", "min", "mean", "std"] diff --git a/sdk/aqueduct/backend/api_client.py b/sdk/aqueduct/backend/api_client.py index e2b8ad442..a06a0b75e 100644 --- a/sdk/aqueduct/backend/api_client.py +++ b/sdk/aqueduct/backend/api_client.py @@ -19,6 +19,7 @@ from aqueduct.logger import logger from aqueduct.models.artifact import ArtifactMetadata from aqueduct.models.dag import DAG, Metadata +from aqueduct.models.execution_state import ExecutionState from aqueduct.models.operators import Operator, ParamSpec from aqueduct.models.resource import BaseResource, ResourceInfo from aqueduct.models.response_models import ( @@ -43,6 +44,7 @@ WorkflowDagResultResponse, ) from aqueduct.utils.serialization import deserialize +from dateutil import parser as datetime_parser from pkg_resources import get_distribution, parse_version from ..resources.connect_config import DynamicK8sConfig, ResourceConfig @@ -610,12 +612,7 @@ def get_workflow(self, flow_id: str) -> GetWorkflowV1Response: WorkflowDagResultResponse( id=dag_result.id, created_at=int( - datetime.datetime.strptime( - resp_dags[str(dag_result.dag_id)].created_at[:-4], - "%Y-%m-%dT%H:%M:%S.%f" - if resp_dags[str(dag_result.dag_id)].created_at[-1] == "Z" - else "%Y-%m-%dT%H:%M:%S.%f%z", - ).timestamp() + datetime_parser.parse(resp_dags[str(dag_result.dag_id)].created_at).timestamp() ), status=dag_result.exec_state.status, exec_state=dag_result.exec_state, @@ -650,8 +647,11 @@ def list_workflows(self) -> List[ListWorkflowResponseEntry]: def get_artifact_result_data( self, dag_result_id: str, artifact_id: str - ) -> Tuple[Optional[Any], ExecutionStatus]: - """Returns an empty string if the operator was not successfully executed.""" + ) -> Tuple[Optional[Any], Optional[ExecutionState]]: + """ + Returns the artifact's result and execution state if available. + Prints non-blocking warning if the value if either is not available. + """ headers = self._generate_auth_headers() url = self.construct_full_url( self.GET_ARTIFACT_RESULT_TEMPLATE % (dag_result_id, artifact_id) @@ -660,7 +660,9 @@ def get_artifact_result_data( self.raise_errors(resp) parsed_response = _parse_artifact_result_response(resp) - execution_status = parsed_response["metadata"]["exec_state"]["status"] + execution_state = None + if "exec_state" in parsed_response["metadata"]: + execution_state = ExecutionState(**parsed_response["metadata"]["exec_state"]) serialization_type = parsed_response["metadata"]["serialization_type"] artifact_type = parsed_response["metadata"]["artifact_type"] @@ -669,10 +671,13 @@ def get_artifact_result_data( if "data" in parsed_response: return_value = deserialize(serialization_type, artifact_type, parsed_response["data"]) - if execution_status != ExecutionStatus.SUCCEEDED: - logger().warning("Artifact result unavailable due to unsuccessful execution.") + if not execution_state: + logger().warning("Artifact execution state is unavailable.") + + if return_value is None: + logger().warning("Artifact result is unavailable.") - return (return_value, execution_status) + return (return_value, execution_state) def get_image_url( self, diff --git a/sdk/aqueduct/client.py b/sdk/aqueduct/client.py index ace2eb95c..88c180f83 100644 --- a/sdk/aqueduct/client.py +++ b/sdk/aqueduct/client.py @@ -553,6 +553,7 @@ def publish_flow( source_flow: Optional[Union[Flow, str, uuid.UUID]] = None, run_now: Optional[bool] = None, use_local: Optional[bool] = False, + disable_snapshots: Optional[bool] = False, ) -> Flow: """Uploads and kicks off the given flow in the system. @@ -602,6 +603,10 @@ def publish_flow( behavior is 'True'. use_local: Must be set if any artifact in the flow is derived from local data. + disable_snapshots: + If set to 'True', this option disables snapshots for all artifacts + that are not generated from parameters, metrics, or checks. + It achieves this by calling the `.disable_snapshot()` method on each of these artifacts. Raises: InvalidUserArgumentException: @@ -734,6 +739,9 @@ def publish_flow( dag.validate_and_resolve_artifact_names() + if disable_snapshots: + dag.disable_op_output_snapshots() + if dag.engine_config.type == RuntimeType.AIRFLOW: if run_now is not None: raise InvalidUserArgumentException( diff --git a/sdk/aqueduct/constants/enums.py b/sdk/aqueduct/constants/enums.py index bca84172f..6099bcc64 100644 --- a/sdk/aqueduct/constants/enums.py +++ b/sdk/aqueduct/constants/enums.py @@ -111,6 +111,7 @@ class ExecutionStatus(str, Enum, metaclass=MetaEnum): PENDING = "pending" REGISTERED = "registered" CANCELED = "canceled" + DELETED = "deleted" class NotificationLogLevel(str, Enum, metaclass=MetaEnum): diff --git a/sdk/aqueduct/flow_run.py b/sdk/aqueduct/flow_run.py index 7b173dcd1..0140d3c79 100644 --- a/sdk/aqueduct/flow_run.py +++ b/sdk/aqueduct/flow_run.py @@ -98,12 +98,12 @@ def describe(self) -> None: for param_op in param_operators: ( param_content, - execution_status, + execution_state, ) = globals.__GLOBAL_API_CLIENT__.get_artifact_result_data( self._id, str(param_op.outputs[0]) ) - if execution_status != ExecutionStatus.SUCCEEDED: + if not execution_state or execution_state.status != ExecutionStatus.SUCCEEDED: param_content = "Parameter not successfully initialized." print("* " + param_op.name + ": " + str(param_content)) @@ -166,7 +166,7 @@ def artifact(self, name: str) -> Optional[base_artifact.BaseArtifact]: if artifact_from_dag is None: return None - content, execution_status = globals.__GLOBAL_API_CLIENT__.get_artifact_result_data( + content, execution_state = globals.__GLOBAL_API_CLIENT__.get_artifact_result_data( self._id, str(artifact_from_dag.id) ) @@ -179,6 +179,7 @@ def artifact(self, name: str) -> Optional[base_artifact.BaseArtifact]: artifact_from_dag.id, content=content, from_flow_run=True, + execution_state=execution_state, ) elif artifact_from_dag.type is ArtifactType.NUMERIC: return numeric_artifact.NumericArtifact( @@ -186,6 +187,7 @@ def artifact(self, name: str) -> Optional[base_artifact.BaseArtifact]: artifact_from_dag.id, content=content, from_flow_run=True, + execution_state=execution_state, ) elif artifact_from_dag.type is ArtifactType.BOOL: return bool_artifact.BoolArtifact( @@ -193,6 +195,7 @@ def artifact(self, name: str) -> Optional[base_artifact.BaseArtifact]: artifact_from_dag.id, content=content, from_flow_run=True, + execution_state=execution_state, ) else: return generic_artifact.GenericArtifact( @@ -201,5 +204,5 @@ def artifact(self, name: str) -> Optional[base_artifact.BaseArtifact]: artifact_from_dag.type, content=content, from_flow_run=True, - execution_status=execution_status, + execution_state=execution_state, ) diff --git a/sdk/aqueduct/models/artifact.py b/sdk/aqueduct/models/artifact.py index bf0dbfc81..7c38adf92 100644 --- a/sdk/aqueduct/models/artifact.py +++ b/sdk/aqueduct/models/artifact.py @@ -14,6 +14,9 @@ class ArtifactMetadata(BaseModel): # If true, this artifact name is expected to be unique in the DAG. explicitly_named: bool = False from_local_data: bool = False + # Whether the content of this artifact will be persisted + # after workflow execution. + should_persist: bool = True class Config: fields = {"explicitly_named": {"exclude": ...}, "from_local_data": {"exclude": ...}} diff --git a/sdk/aqueduct/models/dag.py b/sdk/aqueduct/models/dag.py index c7ec6a7a8..c8408ebb9 100644 --- a/sdk/aqueduct/models/dag.py +++ b/sdk/aqueduct/models/dag.py @@ -457,6 +457,22 @@ def update_artifact_name(self, artifact_id: uuid.UUID, new_name: str) -> None: artifact.name = new_name artifact.explicitly_named = True + def update_artifact_should_persist(self, artifact_id: uuid.UUID, should_persist: bool) -> None: + self.must_get_artifact(artifact_id).should_persist = should_persist + + def disable_op_output_snapshots(self) -> None: + # Disable snapshots that may contain sensitive user data. + # For now, this means snapshots not from param, check, and metrics. + for op in self.list_operators(): + if get_operator_type(op) not in [ + OperatorType.CHECK, + OperatorType.METRIC, + OperatorType.SYSTEM_METRIC, + OperatorType.PARAM, + ]: + for artf_id in op.outputs: + self.update_artifact_should_persist(artf_id, False) + def update_param_spec(self, name: str, new_spec: OperatorSpec) -> None: """Checks that: 1) The parameter already exists, and there is not more than one with the same name. diff --git a/sdk/aqueduct/tests/serialization_test.py b/sdk/aqueduct/tests/serialization_test.py index 88b2ae1a0..229a91790 100644 --- a/sdk/aqueduct/tests/serialization_test.py +++ b/sdk/aqueduct/tests/serialization_test.py @@ -63,6 +63,7 @@ def test_artifact_serialization(): "id": str(artifact_id), "name": artifact_name, "type": ArtifactType.TABLE, + "should_persist": True, } ) diff --git a/sdk/requirements/python-3-10.txt b/sdk/requirements/python-3-10.txt index af7b89854..126440350 100644 --- a/sdk/requirements/python-3-10.txt +++ b/sdk/requirements/python-3-10.txt @@ -14,4 +14,5 @@ ruamel.yaml<=0.17.17 Pillow<=9.4.0 multipart<=0.2.4 requests_toolbelt<=0.10.1 -pymongo<=4.3.3 \ No newline at end of file +pymongo<=4.3.3 +python-dateutil<=2.8.2 \ No newline at end of file diff --git a/sdk/requirements/python-3-7.txt b/sdk/requirements/python-3-7.txt index 45e2fc0b2..92a44a0b2 100644 --- a/sdk/requirements/python-3-7.txt +++ b/sdk/requirements/python-3-7.txt @@ -14,4 +14,5 @@ ruamel.yaml<=0.17.17 Pillow<=9.4.0 multipart<=0.2.4 requests_toolbelt<=0.10.1 -pymongo<=4.3.3 \ No newline at end of file +pymongo<=4.3.3 +python-dateutil<=2.8.2 \ No newline at end of file diff --git a/sdk/requirements/python-3-8.txt b/sdk/requirements/python-3-8.txt index af7b89854..126440350 100644 --- a/sdk/requirements/python-3-8.txt +++ b/sdk/requirements/python-3-8.txt @@ -14,4 +14,5 @@ ruamel.yaml<=0.17.17 Pillow<=9.4.0 multipart<=0.2.4 requests_toolbelt<=0.10.1 -pymongo<=4.3.3 \ No newline at end of file +pymongo<=4.3.3 +python-dateutil<=2.8.2 \ No newline at end of file diff --git a/sdk/requirements/python-3-9.txt b/sdk/requirements/python-3-9.txt index af7b89854..126440350 100644 --- a/sdk/requirements/python-3-9.txt +++ b/sdk/requirements/python-3-9.txt @@ -14,4 +14,5 @@ ruamel.yaml<=0.17.17 Pillow<=9.4.0 multipart<=0.2.4 requests_toolbelt<=0.10.1 -pymongo<=4.3.3 \ No newline at end of file +pymongo<=4.3.3 +python-dateutil<=2.8.2 \ No newline at end of file diff --git a/src/golang/cmd/migrator/migrator/register.go b/src/golang/cmd/migrator/migrator/register.go index 23a56b5ab..db3f2301e 100644 --- a/src/golang/cmd/migrator/migrator/register.go +++ b/src/golang/cmd/migrator/migrator/register.go @@ -30,6 +30,7 @@ import ( _000025 "github.com/aqueducthq/aqueduct/cmd/migrator/versions/000025_add_storage_migration_table" _000026 "github.com/aqueducthq/aqueduct/cmd/migrator/versions/000026_drop_integration_validated_column" _000027 "github.com/aqueducthq/aqueduct/cmd/migrator/versions/000027_rename_integrations_table" + _000028 "github.com/aqueducthq/aqueduct/cmd/migrator/versions/000028_add_artifact_should_persist_column" "github.com/aqueducthq/aqueduct/lib/database" ) @@ -198,4 +199,10 @@ func init() { downPostgres: _000027.DownPostgres, name: "rename integration table to resource", } + + registeredMigrations[28] = &migration{ + upPostgres: _000028.UpPostgres, upSqlite: _000028.UpSqlite, + downPostgres: _000028.DownPostgres, + name: "add should_persist column to artifact table", + } } diff --git a/src/golang/cmd/migrator/versions/000028_add_artifact_should_persist_column/down_postgres.go b/src/golang/cmd/migrator/versions/000028_add_artifact_should_persist_column/down_postgres.go new file mode 100644 index 000000000..f12231579 --- /dev/null +++ b/src/golang/cmd/migrator/versions/000028_add_artifact_should_persist_column/down_postgres.go @@ -0,0 +1,5 @@ +package _000028_add_artifact_should_persist_column + +const downPostgresScript = ` +ALTER TABLE artifact DROP COLUMN IF EXISTS should_persist; +` diff --git a/src/golang/cmd/migrator/versions/000028_add_artifact_should_persist_column/main.go b/src/golang/cmd/migrator/versions/000028_add_artifact_should_persist_column/main.go new file mode 100644 index 000000000..de1d589df --- /dev/null +++ b/src/golang/cmd/migrator/versions/000028_add_artifact_should_persist_column/main.go @@ -0,0 +1,19 @@ +package _000028_add_artifact_should_persist_column + +import ( + "context" + + "github.com/aqueducthq/aqueduct/lib/database" +) + +func UpPostgres(ctx context.Context, db database.Database) error { + return db.Execute(ctx, upPostgresScript) +} + +func UpSqlite(ctx context.Context, db database.Database) error { + return db.Execute(ctx, upSqliteScript) +} + +func DownPostgres(ctx context.Context, db database.Database) error { + return db.Execute(ctx, downPostgresScript) +} diff --git a/src/golang/cmd/migrator/versions/000028_add_artifact_should_persist_column/up_postgres.go b/src/golang/cmd/migrator/versions/000028_add_artifact_should_persist_column/up_postgres.go new file mode 100644 index 000000000..0acfe7752 --- /dev/null +++ b/src/golang/cmd/migrator/versions/000028_add_artifact_should_persist_column/up_postgres.go @@ -0,0 +1,6 @@ +package _000028_add_artifact_should_persist_column + +const upPostgresScript = ` +ALTER TABLE artifact +ADD COLUMN should_persist BOOL DEFAULT TRUE NOT NULL; +` diff --git a/src/golang/cmd/migrator/versions/000028_add_artifact_should_persist_column/up_sqlite.go b/src/golang/cmd/migrator/versions/000028_add_artifact_should_persist_column/up_sqlite.go new file mode 100644 index 000000000..412ce50f1 --- /dev/null +++ b/src/golang/cmd/migrator/versions/000028_add_artifact_should_persist_column/up_sqlite.go @@ -0,0 +1,6 @@ +package _000028_add_artifact_should_persist_column + +const upSqliteScript = ` +ALTER TABLE artifact +ADD COLUMN should_persist BOOL DEFAULT TRUE NOT NULL; +` diff --git a/src/golang/lib/engine/aq_engine.go b/src/golang/lib/engine/aq_engine.go index 0dee5c6f5..44ff0c029 100644 --- a/src/golang/lib/engine/aq_engine.go +++ b/src/golang/lib/engine/aq_engine.go @@ -316,6 +316,8 @@ func (eng *aqEngine) ExecuteWorkflow( return shared.FailedExecutionStatus, errors.Wrap(err, "Unable to create NewWorkflowDag.") } + defer dag_utils.DeleteTemporaryArtifactContents(ctx, dag) + opToDependencyCount := make(map[uuid.UUID]int, len(dag.Operators())) for _, op := range dag.Operators() { inputs, err := dag.OperatorInputs(op) diff --git a/src/golang/lib/models/artifact.go b/src/golang/lib/models/artifact.go index 66387ec15..931ae647a 100644 --- a/src/golang/lib/models/artifact.go +++ b/src/golang/lib/models/artifact.go @@ -12,18 +12,20 @@ const ( ArtifactTable = "artifact" // Artifact column names - ArtifactID = "id" - ArtifactName = "name" - ArtifactDescription = "description" - ArtifactType = "type" + ArtifactID = "id" + ArtifactName = "name" + ArtifactDescription = "description" + ArtifactType = "type" + ArtifactShouldPersist = "should_persist" ) // An Artifact maps to the artifact table. type Artifact struct { - ID uuid.UUID `db:"id" json:"id"` - Name string `db:"name" json:"name"` - Description string `db:"description" json:"description"` - Type shared.ArtifactType `db:"type" json:"type"` + ID uuid.UUID `db:"id" json:"id"` + Name string `db:"name" json:"name"` + Description string `db:"description" json:"description"` + Type shared.ArtifactType `db:"type" json:"type"` + ShouldPersist bool `db:"should_persist" json:"should_persist"` } // ArtifactCols returns a comma-separated string of all Artifact columns. @@ -37,6 +39,7 @@ func AllArtifactCols() []string { ArtifactName, ArtifactDescription, ArtifactType, + ArtifactShouldPersist, } } diff --git a/src/golang/lib/models/schema_version.go b/src/golang/lib/models/schema_version.go index 295dd7229..b7ea62193 100644 --- a/src/golang/lib/models/schema_version.go +++ b/src/golang/lib/models/schema_version.go @@ -9,7 +9,7 @@ const ( // This is the source of truth for the required schema version // for both the server and executor. This value MUST be updated // when a new schema change is added. - CurrentSchemaVersion = 27 + CurrentSchemaVersion = 28 SchemaVersionTable = "schema_version" diff --git a/src/golang/lib/models/shared/execution_state.go b/src/golang/lib/models/shared/execution_state.go index cc5fc2fa3..1c0f603ed 100644 --- a/src/golang/lib/models/shared/execution_state.go +++ b/src/golang/lib/models/shared/execution_state.go @@ -19,7 +19,10 @@ type ExecutionState struct { } func (e ExecutionState) Terminated() bool { - return e.Status == FailedExecutionStatus || e.Status == SucceededExecutionStatus || e.Status == CanceledExecutionStatus + return e.Status == FailedExecutionStatus || + e.Status == SucceededExecutionStatus || + e.Status == CanceledExecutionStatus || + e.Status == DeletedExecutionStatus } func (e *ExecutionState) HasBlockingFailure() bool { diff --git a/src/golang/lib/models/shared/execution_status.go b/src/golang/lib/models/shared/execution_status.go index bf62bf544..b8b27227d 100644 --- a/src/golang/lib/models/shared/execution_status.go +++ b/src/golang/lib/models/shared/execution_status.go @@ -15,15 +15,21 @@ const ( type ExecutionStatus string const ( - // Registered is a special state that indicates a object has been registered - // but has no runs yet. This is typically used in workflows. + // Typical lifecycle of an object: + // Registered -> Pending -> Running -> Canceled + // |-> Failed + // |-> Succeeded -> (Optional) Deleted RegisteredExecutionStatus ExecutionStatus = "registered" PendingExecutionStatus ExecutionStatus = "pending" RunningExecutionStatus ExecutionStatus = "running" CanceledExecutionStatus ExecutionStatus = "canceled" FailedExecutionStatus ExecutionStatus = "failed" SucceededExecutionStatus ExecutionStatus = "succeeded" - UnknownExecutionStatus ExecutionStatus = "unknown" + // 'deleted' refers to 'erased after success'. + // Caller should consider 'deleted' as success for error handling, + // but should not expect any non-metadata content to be available. + DeletedExecutionStatus ExecutionStatus = "deleted" + UnknownExecutionStatus ExecutionStatus = "unknown" ) type NullExecutionStatus struct { diff --git a/src/golang/lib/models/shared/execution_timestamps.go b/src/golang/lib/models/shared/execution_timestamps.go index d273f5b16..e1a917c62 100644 --- a/src/golang/lib/models/shared/execution_timestamps.go +++ b/src/golang/lib/models/shared/execution_timestamps.go @@ -11,6 +11,7 @@ type ExecutionTimestamps struct { PendingAt *time.Time `json:"pending_at"` RunningAt *time.Time `json:"running_at"` FinishedAt *time.Time `json:"finished_at"` + DeletedAt *time.Time `json:"deleted_at"` } // ExecutionTimestampsJsonFieldByStatus returns the json_field @@ -38,5 +39,9 @@ func ExecutionTimestampsJsonFieldByStatus( return "registered_at", nil } + if status == DeletedExecutionStatus { + return "deleted_at", nil + } + return "", errors.Newf("Execution status %s is not valid in timestamps", status) } diff --git a/src/golang/lib/models/views/artifact_node.go b/src/golang/lib/models/views/artifact_node.go index 393f234f4..2c141f10e 100644 --- a/src/golang/lib/models/views/artifact_node.go +++ b/src/golang/lib/models/views/artifact_node.go @@ -17,11 +17,12 @@ const ( ) type ArtifactNode struct { - ID uuid.UUID `db:"id" json:"id"` - DagID uuid.UUID `db:"dag_id" json:"dag_id"` - Name string `db:"name" json:"name"` - Description string `db:"description" json:"description"` - Type shared.ArtifactType `db:"type" json:"type"` + ID uuid.UUID `db:"id" json:"id"` + DagID uuid.UUID `db:"dag_id" json:"dag_id"` + Name string `db:"name" json:"name"` + Description string `db:"description" json:"description"` + Type shared.ArtifactType `db:"type" json:"type"` + ShouldPersist bool `db:"should_persist" json:"should_persist"` Input uuid.UUID `db:"input" json:"input"` Outputs shared.NullableIndexedList[uuid.UUID] `db:"outputs" json:"outputs"` diff --git a/src/golang/lib/models/views/merged_node.go b/src/golang/lib/models/views/merged_node.go index 5cf2687b6..de0107c43 100644 --- a/src/golang/lib/models/views/merged_node.go +++ b/src/golang/lib/models/views/merged_node.go @@ -10,26 +10,28 @@ import ( ) const ( - OperatorWithArtifactNodeView = "merged_node" - OperatorWithArtifactNodeID = "id" - OperatorWithArtifactNodeDagID = "dag_id" - OperatorWithArtifactNodeArtifactID = "artifact_id" - OperatorWithArtifactNodeName = "name" - OperatorWithArtifactNodeDescription = "description" - OperatorWithArtifactNodeSpec = "spec" - OperatorWithArtifactNodeType = "type" - OperatorWithArtifactNodeInputs = "inputs" - OperatorWithArtifactNodeOutputs = "outputs" + OperatorWithArtifactNodeView = "merged_node" + OperatorWithArtifactNodeID = "id" + OperatorWithArtifactNodeDagID = "dag_id" + OperatorWithArtifactNodeArtifactID = "artifact_id" + OperatorWithArtifactNodeName = "name" + OperatorWithArtifactNodeDescription = "description" + OperatorWithArtifactNodeSpec = "spec" + OperatorWithArtifactNodeType = "type" + OperatorWithArtifactNodeInputs = "inputs" + OperatorWithArtifactNodeOutputs = "outputs" + OperatorWithArtifactNodeShouldPersist = "should_persist" ) type OperatorWithArtifactNode struct { - ID uuid.UUID `db:"id" json:"id"` - DagID uuid.UUID `db:"dag_id" json:"dag_id"` - ArtifactID uuid.UUID `db:"artifact_id" json:"artifact_id"` - Name string `db:"name" json:"name"` - Description string `db:"description" json:"description"` - Spec operator.Spec `db:"spec" json:"spec"` - Type shared.ArtifactType `db:"type" json:"type"` + ID uuid.UUID `db:"id" json:"id"` + DagID uuid.UUID `db:"dag_id" json:"dag_id"` + ArtifactID uuid.UUID `db:"artifact_id" json:"artifact_id"` + Name string `db:"name" json:"name"` + Description string `db:"description" json:"description"` + Spec operator.Spec `db:"spec" json:"spec"` + Type shared.ArtifactType `db:"type" json:"type"` + ShouldPersist bool `db:"should_persist" json:"should_persist"` Inputs shared.NullableIndexedList[uuid.UUID] `db:"inputs" json:"inputs"` Outputs shared.NullableIndexedList[uuid.UUID] `db:"outputs" json:"outputs"` @@ -62,5 +64,6 @@ func allOperatorWithArtifactNodeCols() []string { OperatorWithArtifactNodeType, OperatorWithArtifactNodeInputs, OperatorWithArtifactNodeOutputs, + OperatorWithArtifactNodeShouldPersist, } } diff --git a/src/golang/lib/repos/artifact.go b/src/golang/lib/repos/artifact.go index c6ae629a2..36556b2a1 100644 --- a/src/golang/lib/repos/artifact.go +++ b/src/golang/lib/repos/artifact.go @@ -66,6 +66,7 @@ type artifactWriter interface { name string, description string, artifactType shared.ArtifactType, + shouldPersist bool, DB database.Database, ) (*models.Artifact, error) diff --git a/src/golang/lib/repos/sqlite/artifact.go b/src/golang/lib/repos/sqlite/artifact.go index 2e62d47ef..831461016 100644 --- a/src/golang/lib/repos/sqlite/artifact.go +++ b/src/golang/lib/repos/sqlite/artifact.go @@ -22,6 +22,7 @@ const artifactNodeViewSubQuery = ` workflow_dag.id AS dag_id, artifact.name AS name, artifact.description AS description, + artifact.should_persist AS should_persist, artifact.type as type, CAST( json_group_array( -- Group to_ids and idx into one array json_object( @@ -43,6 +44,7 @@ const artifactNodeViewSubQuery = ` workflow_dag.id AS dag_id, artifact.name AS name, artifact.description AS description, + artifact.should_persist AS should_persist, artifact.type as type, workflow_dag_edge.from_id AS input FROM @@ -56,6 +58,7 @@ const artifactNodeViewSubQuery = ` artf_with_input.dag_id AS dag_id, artf_with_input.name AS name, artf_with_input.description AS description, + artf_with_input.should_persist AS should_persist, artf_with_input.type AS type, artf_with_outputs.outputs AS outputs, artf_with_input.input AS input @@ -209,11 +212,12 @@ func (*artifactReader) GetMetricsByUpstreamArtifactBatch( type artifactWithUpstreamID struct { // copy of artifact - ID uuid.UUID `db:"id"` - Name string `db:"name"` - Description string `db:"description"` - Type shared.ArtifactType `db:"type"` - UpstreamID uuid.UUID `db:"upstream_id"` + ID uuid.UUID `db:"id"` + Name string `db:"name"` + Description string `db:"description"` + Type shared.ArtifactType `db:"type"` + ShouldPersist bool `db:"should_persist"` + UpstreamID uuid.UUID `db:"upstream_id"` } var queryRows []artifactWithUpstreamID @@ -225,10 +229,11 @@ func (*artifactReader) GetMetricsByUpstreamArtifactBatch( results := make(map[uuid.UUID][]models.Artifact, len(queryRows)) for _, queryRow := range queryRows { results[queryRow.UpstreamID] = append(results[queryRow.UpstreamID], models.Artifact{ - ID: queryRow.ID, - Name: queryRow.Name, - Description: queryRow.Description, - Type: queryRow.Type, + ID: queryRow.ID, + Name: queryRow.Name, + Description: queryRow.Description, + Type: queryRow.Type, + ShouldPersist: queryRow.ShouldPersist, }) } @@ -257,6 +262,7 @@ func (*artifactWriter) Create( name string, description string, artifactType shared.ArtifactType, + shouldPersist bool, DB database.Database, ) (*models.Artifact, error) { cols := []string{ @@ -264,6 +270,7 @@ func (*artifactWriter) Create( models.ArtifactName, models.ArtifactDescription, models.ArtifactType, + models.ArtifactShouldPersist, } query := DB.PrepareInsertWithReturnAllStmt(models.ArtifactTable, cols, models.ArtifactCols()) @@ -272,7 +279,13 @@ func (*artifactWriter) Create( return nil, err } - args := []interface{}{ID, name, description, artifactType} + args := []interface{}{ + ID, + name, + description, + artifactType, + shouldPersist, + } return getArtifact(ctx, DB, query, args...) } diff --git a/src/golang/lib/repos/sqlite/operator.go b/src/golang/lib/repos/sqlite/operator.go index ba102aebc..12f65a6c7 100644 --- a/src/golang/lib/repos/sqlite/operator.go +++ b/src/golang/lib/repos/sqlite/operator.go @@ -107,6 +107,7 @@ var mergedNodeViewSubQuery = fmt.Sprintf(` operator_node.inputs AS inputs, artifact_node.id AS artifact_id, artifact_node.type AS type, + artifact_node.should_persist AS should_persist, artifact_node.outputs AS outputs FROM operator_node LEFT JOIN diff --git a/src/golang/lib/repos/tests/artifact.go b/src/golang/lib/repos/tests/artifact.go index 2c84cf86b..23a5f8681 100644 --- a/src/golang/lib/repos/tests/artifact.go +++ b/src/golang/lib/repos/tests/artifact.go @@ -113,12 +113,20 @@ func (ts *TestSuite) TestArtifact_Create() { artifactType := randArtifactType() expectedArtifact := &models.Artifact{ - Name: name, - Description: description, - Type: artifactType, + Name: name, + Description: description, + Type: artifactType, + ShouldPersist: true, } - actualArtifact, err := ts.artifact.Create(ts.ctx, name, description, artifactType, ts.DB) + actualArtifact, err := ts.artifact.Create( + ts.ctx, + name, + description, + artifactType, + true, + ts.DB, + ) require.Nil(ts.T(), err) require.NotEqual(ts.T(), uuid.Nil, actualArtifact.ID) diff --git a/src/golang/lib/repos/tests/seed.go b/src/golang/lib/repos/tests/seed.go index 65d13b840..def0fbac8 100644 --- a/src/golang/lib/repos/tests/seed.go +++ b/src/golang/lib/repos/tests/seed.go @@ -122,7 +122,14 @@ func (ts *TestSuite) seedArtifact(count int) []models.Artifact { name := randString(10) description := randString(15) artifactType := randArtifactType() - artifact, err := ts.artifact.Create(ts.ctx, name, description, artifactType, ts.DB) + artifact, err := ts.artifact.Create( + ts.ctx, + name, + description, + artifactType, + true, + ts.DB, + ) require.Nil(ts.T(), err) artifacts = append(artifacts, *artifact) @@ -828,6 +835,7 @@ func (ts *TestSuite) seedComplexWorkflow() ( artfName, randString(15), shared.UntypedArtifact, // for now it's fine to have all artifacts untyped. + true, ts.DB, ) require.Nil(ts.T(), err) diff --git a/src/golang/lib/workflow/artifact/artifact.go b/src/golang/lib/workflow/artifact/artifact.go index 8751fc7e8..24517c671 100644 --- a/src/golang/lib/workflow/artifact/artifact.go +++ b/src/golang/lib/workflow/artifact/artifact.go @@ -3,6 +3,7 @@ package artifact import ( "context" "encoding/json" + "time" "github.com/aqueducthq/aqueduct/lib/database" "github.com/aqueducthq/aqueduct/lib/models" @@ -19,19 +20,25 @@ import ( const sampleTableRow = 500 // Artifact is an interface for managing and inspect the lifecycle of an artifact -// produced by a workflow run. +// produced by one single workflow run. type Artifact interface { ID() uuid.UUID Signature() uuid.UUID Type() shared.ArtifactType Name() string + ShouldPersistContent() bool + + // DeleteContent removes the artifact content from storage if it exists. + // This does not update the database, which should be handled + // by the caller. + DeleteContent(ctx context.Context) error // InitializeResult initializes the artifact in the database. InitializeResult(ctx context.Context, dagResultID uuid.UUID) error // PersistResult updates the artifact result in the database. // Errors if InitializeResult() hasn't been called yet. - PersistResult(ctx context.Context, execState *shared.ExecutionState) error + PersistResult(ctx context.Context) error // Finish is an end-of-lifecycle hook meant to do any final cleanup work. Finish(ctx context.Context) @@ -57,6 +64,13 @@ type Artifact interface { // For now, it's primarily used for table artifact to limit // the number of rows sent to client. SampleContent(ctx context.Context) ([]byte, bool, error) + + // SetExecState updates the execution state of the artifact. + // For now, it doesn't tries to 'merge' the incoming state with the current state + // e.g. if current exec state has a 'pending' status with 'PendingAt' timestamp, + // and the incoming state has a 'succeeded' status with 'SucceededAt' timestamp, + // the incoming state will completely replace the current state. + SetExecState(execState shared.ExecutionState) } type ArtifactImpl struct { @@ -69,11 +83,13 @@ type ArtifactImpl struct { // data, which is why it is used as the key in the preview artifact cache. signature uuid.UUID - name string - description string - artifactType shared.ArtifactType + name string + description string + artifactType shared.ArtifactType + shouldPersistContent bool execPaths *utils.ExecPaths + execState *shared.ExecutionState repo repos.Artifact resultRepo repos.ArtifactResult @@ -103,21 +119,29 @@ func NewArtifact( return nil, errors.Newf("An Artifact signature must be provided for a cache-aware artifact.") } + now := time.Now() return &ArtifactImpl{ - id: dbArtifact.ID, - signature: signature, - name: dbArtifact.Name, - description: dbArtifact.Description, - artifactType: dbArtifact.Type, - execPaths: execPaths, - repo: artifactRepo, - resultRepo: artifactResultRepo, - resultID: uuid.Nil, - resultMetadata: nil, - previewCacheManager: previewCacheManager, - resultsPersisted: false, - storageConfig: storageConfig, - db: db, + id: dbArtifact.ID, + signature: signature, + name: dbArtifact.Name, + description: dbArtifact.Description, + artifactType: dbArtifact.Type, + execPaths: execPaths, + repo: artifactRepo, + resultRepo: artifactResultRepo, + resultID: uuid.Nil, + resultMetadata: nil, + previewCacheManager: previewCacheManager, + resultsPersisted: false, + storageConfig: storageConfig, + shouldPersistContent: dbArtifact.ShouldPersist, + execState: &shared.ExecutionState{ + Status: shared.RegisteredExecutionStatus, + Timestamps: &shared.ExecutionTimestamps{ + RegisteredAt: &now, + }, + }, + db: db, }, nil } @@ -141,14 +165,16 @@ func NewArtifactFromDBObjects( } return &ArtifactImpl{ - id: dbArtifact.ID, - signature: signature, - name: dbArtifact.Name, - description: dbArtifact.Description, - artifactType: dbArtifact.Type, + id: dbArtifact.ID, + signature: signature, + name: dbArtifact.Name, + description: dbArtifact.Description, + artifactType: dbArtifact.Type, + shouldPersistContent: dbArtifact.ShouldPersist, execPaths: &utils.ExecPaths{ ArtifactContentPath: contentPath, }, + execState: &dbArtifactResult.ExecState.ExecutionState, repo: artifactRepo, resultRepo: artifactResultRepo, resultID: artifactResultId, @@ -186,6 +212,10 @@ func (a *ArtifactImpl) Computed(ctx context.Context) bool { return res } +func (a *ArtifactImpl) ShouldPersistContent() bool { + return a.shouldPersistContent +} + func (a *ArtifactImpl) InitializeResult(ctx context.Context, dagResultID uuid.UUID) error { if a.resultRepo == nil { return errors.New("Artifact's result writer cannot be nil.") @@ -206,14 +236,11 @@ func (a *ArtifactImpl) InitializeResult(ctx context.Context, dagResultID uuid.UU return nil } -func (a *ArtifactImpl) updateArtifactResultAfterComputation( - ctx context.Context, - execState *shared.ExecutionState, -) { +func (a *ArtifactImpl) updateArtifactResultAfterComputation(ctx context.Context) { changes := map[string]interface{}{ models.ArtifactResultMetadata: nil, - models.ArtifactResultStatus: execState.Status, - models.ArtifactResultExecState: execState, + models.ArtifactResultStatus: a.execState.Status, + models.ArtifactResultExecState: a.execState, } metadataExists := utils.ObjectExistsInStorage(ctx, a.storageConfig, a.execPaths.ArtifactMetadataPath) @@ -282,7 +309,7 @@ func (a *ArtifactImpl) updateArtifactTypeAfterComputation( } } -func (a *ArtifactImpl) PersistResult(ctx context.Context, execState *shared.ExecutionState) error { +func (a *ArtifactImpl) PersistResult(ctx context.Context) error { if a.previewCacheManager != nil { return errors.Newf("Artifact %s is cache-aware, so it cannot be persisted.", a.Name()) } @@ -290,17 +317,57 @@ func (a *ArtifactImpl) PersistResult(ctx context.Context, execState *shared.Exec if a.resultsPersisted { return errors.Newf("Artifact %s was already persisted!", a.name) } - if !execState.Terminated() { - return errors.Newf("Artifact %s has unexpected execution state: %s", a.Name(), execState.Status) + + if a.execState == nil { + return errors.Newf("Artifact %s doesn't have an execution state.", a.name) } - a.updateArtifactResultAfterComputation(ctx, execState) + a.updateArtifactResultAfterComputation(ctx) a.updateArtifactTypeAfterComputation(ctx) a.resultsPersisted = true return nil } +func (a *ArtifactImpl) DeleteContent(ctx context.Context) error { + storageObj := storage.NewStorage(a.storageConfig) + + if a.Computed(ctx) { + now := time.Now() + if a.execState == nil { + a.SetExecState(shared.ExecutionState{ + Status: shared.DeletedExecutionStatus, + Timestamps: &shared.ExecutionTimestamps{ + DeletedAt: &now, + }, + }) + } else { + // Ideally we can omit this by making SetExecState merge + // the current execState with the incoming one. + a.execState.Status = shared.DeletedExecutionStatus + a.execState.Timestamps.DeletedAt = &now + } + + err := storageObj.Delete(ctx, a.execPaths.ArtifactContentPath) + if err != nil { + return err + } + + _, err = a.resultRepo.Update( + ctx, + a.resultID, + map[string]interface{}{ + models.ArtifactResultStatus: a.execState.Status, + models.ArtifactResultExecState: a.execState, + }, + a.db, + ) + return err + } + + return nil +} + func (a *ArtifactImpl) Finish(ctx context.Context) { // There is nothing to do if the artifact was never even computed. if !a.Computed(ctx) { @@ -323,11 +390,6 @@ func (a *ArtifactImpl) Finish(ctx context.Context) { func (a *ArtifactImpl) GetMetadata(ctx context.Context) (*shared.ArtifactResultMetadata, error) { if a.resultMetadata == nil { - if !a.Computed(ctx) { - // metadata is not ready yet. - return nil, nil - } - // If the path is not available, we assume the data is not available. if !utils.ObjectExistsInStorage(ctx, a.storageConfig, a.execPaths.ArtifactMetadataPath) { return nil, nil @@ -345,9 +407,6 @@ func (a *ArtifactImpl) GetMetadata(ctx context.Context) (*shared.ArtifactResultM } func (a *ArtifactImpl) GetContent(ctx context.Context) ([]byte, error) { - if !a.Computed(ctx) { - return nil, errors.Newf("Cannot get content of Artifact %s, it has not yet been computed.", a.Name()) - } content, err := storage.NewStorage(a.storageConfig).Get(ctx, a.execPaths.ArtifactContentPath) if err != nil { return nil, err @@ -355,15 +414,17 @@ func (a *ArtifactImpl) GetContent(ctx context.Context) ([]byte, error) { return content, nil } +func (a *ArtifactImpl) SetExecState(execState shared.ExecutionState) { + a.execState = &execState +} + func (a *ArtifactImpl) SampleContent(ctx context.Context) ([]byte, bool, error) { metadata, err := a.GetMetadata(ctx) if err != nil { return nil, false, err } - // Ignore if artifact is not computed. - // Ideally we should use a.Computed() but that involves a potential API call to storage. - // So here we use metadata which is also sufficient. + // Ignore if artifact metadata is not available. if metadata == nil { return nil, false, nil } diff --git a/src/golang/lib/workflow/dag/artifacts.go b/src/golang/lib/workflow/dag/artifacts.go new file mode 100644 index 000000000..dec0d5483 --- /dev/null +++ b/src/golang/lib/workflow/dag/artifacts.go @@ -0,0 +1,16 @@ +package dag + +import ( + "context" + + log "github.com/sirupsen/logrus" +) + +func DeleteTemporaryArtifactContents(ctx context.Context, dag WorkflowDag) { + for _, artf := range dag.Artifacts() { + if !artf.ShouldPersistContent() { + err := artf.DeleteContent(ctx) + log.Errorf("error deleting temporary artifact result. Artf: %s, Err: %v", artf.Name(), err) + } + } +} diff --git a/src/golang/lib/workflow/dag/workflow_dag.go b/src/golang/lib/workflow/dag/workflow_dag.go index 8f2a2aa7b..c58045011 100644 --- a/src/golang/lib/workflow/dag/workflow_dag.go +++ b/src/golang/lib/workflow/dag/workflow_dag.go @@ -21,6 +21,7 @@ import ( ) type WorkflowDag interface { + // WorkflowDag tracks the life cycle of one single dag execution. ID() uuid.UUID UserID() uuid.UUID ResultID() uuid.UUID diff --git a/src/golang/lib/workflow/operator/base.go b/src/golang/lib/workflow/operator/base.go index 1881fd9bc..e3e41f7ce 100644 --- a/src/golang/lib/workflow/operator/base.go +++ b/src/golang/lib/workflow/operator/base.go @@ -372,7 +372,8 @@ func (bo *baseOperator) PersistResult(ctx context.Context) error { artifactExecState.Status = shared.CanceledExecutionStatus } - err := outputArtifact.PersistResult(ctx, &artifactExecState) + outputArtifact.SetExecState(artifactExecState) + err := outputArtifact.PersistResult(ctx) if err != nil { log.Errorf("Error occurred when persisting artifact %s.", outputArtifact.Name()) } diff --git a/src/golang/lib/workflow/operator/operator.go b/src/golang/lib/workflow/operator/operator.go index d66906dae..5f3ce2da3 100644 --- a/src/golang/lib/workflow/operator/operator.go +++ b/src/golang/lib/workflow/operator/operator.go @@ -22,7 +22,7 @@ import ( ) // Operator is an interface for managing and inspecting the lifecycle of an operator -// used by a workflow run. +// used by one single workflow run. type Operator interface { // Property getters. Retrieve property of the operator without making any changes. Type() operator.Type diff --git a/src/golang/lib/workflow/utils/database.go b/src/golang/lib/workflow/utils/database.go index 499b0ed44..b61b13fff 100644 --- a/src/golang/lib/workflow/utils/database.go +++ b/src/golang/lib/workflow/utils/database.go @@ -81,6 +81,7 @@ func WriteDAGToDatabase( artifact.Name, artifact.Description, artifact.Type, + artifact.ShouldPersist, DB, ) if err != nil { diff --git a/src/golang/lib/workflow/utils/storage.go b/src/golang/lib/workflow/utils/storage.go index 64f04c11c..dd91e319c 100644 --- a/src/golang/lib/workflow/utils/storage.go +++ b/src/golang/lib/workflow/utils/storage.go @@ -15,8 +15,9 @@ func CleanupStorageFile(ctx context.Context, storageConfig *shared.StorageConfig } func CleanupStorageFiles(ctx context.Context, storageConfig *shared.StorageConfig, keys []string) { + storageObj := storage.NewStorage(storageConfig) for _, key := range keys { - err := storage.NewStorage(storageConfig).Delete(ctx, key) + err := storageObj.Delete(ctx, key) if err != nil { log.Errorf("Unable to clean up storage file with key: %s. %v. \n %s", key, err, errors.New("").GetStack()) } diff --git a/src/python/bin/aqueduct b/src/python/bin/aqueduct index f7a13a947..77b253225 100755 --- a/src/python/bin/aqueduct +++ b/src/python/bin/aqueduct @@ -20,7 +20,7 @@ import yaml from packaging.version import parse as parse_version from tqdm import tqdm -SCHEMA_VERSION = "27" +SCHEMA_VERSION = "28" CHUNK_SIZE = 4096 # Connector Package Version Bounds diff --git a/src/ui/common/src/components/pages/artifact/id/components/Preview.tsx b/src/ui/common/src/components/pages/artifact/id/components/Preview.tsx index eaed5b9fa..2383ff055 100644 --- a/src/ui/common/src/components/pages/artifact/id/components/Preview.tsx +++ b/src/ui/common/src/components/pages/artifact/id/components/Preview.tsx @@ -1,9 +1,10 @@ -import { Alert, Box, Divider, Typography } from '@mui/material'; +import { Alert, Box, Typography } from '@mui/material'; import React from 'react'; import ArtifactContent from '../../../../../components/workflows/artifact/content'; import { ArtifactResultResponse } from '../../../../../handlers/responses/node'; import { NodeArtifactResultContentGetResponse } from '../../../../../handlers/v2/NodeArtifactResultContentGet'; +import ExecutionStatus from '../../../../../utils/shared'; type PreviewProps = { upstreamPending: boolean; @@ -22,57 +23,49 @@ export const Preview: React.FC = ({ contentLoading, contentError, }) => { - let preview = ( - <> - - - - - An upstream operator failed, causing this artifact to not be created. - - - - ); - if (upstreamPending) { - preview = ( - <> - + return ( + + An upstream operator is in progress so this artifact is not yet created. + + ); + } + if (previewAvailable) { + if (artifactResult?.exec_state?.status === ExecutionStatus.Deleted) { + return ( - - An upstream operator is in progress so this artifact is not yet - created. + + This artifact has succeeded, but the snapshot has been deleted. - - ); - } else if (previewAvailable) { - preview = ( - <> - - - - Preview - - - - - - + ); + } + return ( + + + Preview + + + ); } - return preview; + + return ( + + An upstream operator failed, causing this artifact to not be created. + + ); }; export default Preview; diff --git a/src/ui/common/src/components/pages/artifact/id/index.tsx b/src/ui/common/src/components/pages/artifact/id/index.tsx index 233fa0465..131c874e5 100644 --- a/src/ui/common/src/components/pages/artifact/id/index.tsx +++ b/src/ui/common/src/components/pages/artifact/id/index.tsx @@ -1,4 +1,5 @@ import { CircularProgress } from '@mui/material'; +import { Divider } from '@mui/material'; import Box from '@mui/material/Box'; import React from 'react'; import { useLocation, useParams } from 'react-router-dom'; @@ -141,6 +142,8 @@ const ArtifactDetailsPage: React.FC = ({ )} + + = ({ contentError={contentError ? (contentError as string) : ''} /> + + diff --git a/src/ui/common/src/components/pages/workflows/index.tsx b/src/ui/common/src/components/pages/workflows/index.tsx index 6dad40faa..6011301b5 100644 --- a/src/ui/common/src/components/pages/workflows/index.tsx +++ b/src/ui/common/src/components/pages/workflows/index.tsx @@ -259,10 +259,9 @@ const WorkflowsPage: React.FC = ({ user, Layout = DefaultLayout }) => { checkId: op.id, name: op.name, status: - nodesResults.artifacts[artifactId]?.exec_state?.status ?? + nodesResults.operators[op.id]?.exec_state?.status ?? ExecutionStatus.Registered, level: op.spec.check.level, - value: nodesResults.artifacts[artifactId]?.content_serialized, timestamp: nodesResults.artifacts[artifactId]?.exec_state?.timestamps ?.finished_at, diff --git a/src/ui/common/src/components/tables/CheckTableItem.tsx b/src/ui/common/src/components/tables/CheckTableItem.tsx index d6f2e1c06..c2ecd550b 100644 --- a/src/ui/common/src/components/tables/CheckTableItem.tsx +++ b/src/ui/common/src/components/tables/CheckTableItem.tsx @@ -1,71 +1,27 @@ -import { - faCircleCheck, - faCircleExclamation, - faMinus, - faTriangleExclamation, -} from '@fortawesome/free-solid-svg-icons'; -import { FontAwesomeIcon } from '@fortawesome/react-fontawesome'; -import Box from '@mui/material/Box'; import React from 'react'; -import { theme } from '../../styles/theme/theme'; -import { stringToExecutionStatus } from '../../utils/shared'; +import ExecutionStatus from '../../utils/shared'; import { StatusIndicator } from '../workflows/workflowStatus'; interface CheckTableItemProps { - checkValue: string; - status?: string; + status: ExecutionStatus; + value?: string; } export const CheckTableItem: React.FC = ({ - checkValue, status, + value, }) => { - let iconColor = theme.palette.black; - let checkIcon = faMinus; - - if (checkValue) { - switch (checkValue.toLowerCase()) { - case 'true': { - checkIcon = faCircleCheck; - iconColor = theme.palette.Success; - break; - } - case 'false': { - checkIcon = faCircleExclamation; - iconColor = theme.palette.Error; - break; - } - case 'warning': { - checkIcon = faTriangleExclamation; - iconColor = theme.palette.Warning; - break; - } - case 'none': { - checkIcon = faMinus; - iconColor = theme.palette.black; - break; - } - default: { - // None of the icon cases met, just fall through and render table value. - return <>{checkValue}; - } - } - } else { - // Check value not found, render the status indicator for this check. - return ( - - ); + if (value) { + return <>{value}; } return ( - - - + ); }; diff --git a/src/ui/common/src/components/tables/OperatorExecStateTable.tsx b/src/ui/common/src/components/tables/OperatorExecStateTable.tsx index bbb2e38bb..aba9a4c74 100644 --- a/src/ui/common/src/components/tables/OperatorExecStateTable.tsx +++ b/src/ui/common/src/components/tables/OperatorExecStateTable.tsx @@ -61,7 +61,7 @@ export const OperatorExecStateTable: React.FC = ({ // Send off to the MetricTableItem component. return ; } else if (tableType === OperatorExecStateTableType.Check) { - return ; + return ; } // Default case, code here shouldn't get hit assuming this table is just used to render metrics and cheecks. @@ -103,19 +103,18 @@ export const OperatorExecStateTable: React.FC = ({ tabIndex={-1} key={`tableBody-${rowIndex}`} > - {schema.fields.map((column, columnIndex) => { - const columnName = column.name.toLowerCase(); - const value = row[columnName]; - - return ( - - {getTableItem(tableType, columnName, value, row.status)} - - ); - })} + + {getTableItem(tableType, 'title', row['title'], undefined)} + + + {getTableItem(tableType, 'status', '', row['status'])} + ); })} diff --git a/src/ui/common/src/components/workflows/artifact/content.tsx b/src/ui/common/src/components/workflows/artifact/content.tsx index 47d50c89d..5cf372ef1 100644 --- a/src/ui/common/src/components/workflows/artifact/content.tsx +++ b/src/ui/common/src/components/workflows/artifact/content.tsx @@ -45,7 +45,7 @@ const ArtifactContent: React.FC = ({ ); } - if (!content || !artifactResult) { + if (!content?.content || !artifactResult) { return ( No result to show for this artifact. diff --git a/src/ui/common/src/components/workflows/artifact/metricsAndChecksOverview.tsx b/src/ui/common/src/components/workflows/artifact/metricsAndChecksOverview.tsx index 7df1d5c16..ef195323a 100644 --- a/src/ui/common/src/components/workflows/artifact/metricsAndChecksOverview.tsx +++ b/src/ui/common/src/components/workflows/artifact/metricsAndChecksOverview.tsx @@ -7,6 +7,8 @@ import { OperatorResponse, } from '../../../handlers/responses/node'; import { DataSchema } from '../../../utils/data'; +import { CheckLevel } from '../../../utils/operators'; +import ExecutionStatus from '../../../utils/shared'; import OperatorExecStateTable, { OperatorExecStateTableType, } from '../../tables/OperatorExecStateTable'; @@ -86,10 +88,19 @@ export const ChecksOverview: React.FC = ({ schema: schema, data: checks.map((checkOp) => { const name = checkOp.name; - const artfResult = (nodeResults?.artifacts ?? {})[checkOp.outputs[0]]; + const opResult = (nodeResults?.operators ?? {})[checkOp.id]; + let status = opResult?.exec_state?.status; + if ( + status === ExecutionStatus.Failed && + checkOp.spec?.check?.level === CheckLevel.Warning + ) { + status = ExecutionStatus.Warning; + } + return { title: name, - value: artfResult?.content_serialized, + value: status, + status: status, }; }), }; diff --git a/src/ui/common/src/components/workflows/nodes/Node.tsx b/src/ui/common/src/components/workflows/nodes/Node.tsx index 68a7f5afc..2f0645f61 100644 --- a/src/ui/common/src/components/workflows/nodes/Node.tsx +++ b/src/ui/common/src/components/workflows/nodes/Node.tsx @@ -40,6 +40,7 @@ const artifactNodeStatusLabels = { [ExecutionStatus.Running]: 'Running', [ExecutionStatus.Warning]: 'Warning', [ExecutionStatus.Unknown]: 'Unknown', + [ExecutionStatus.Deleted]: 'Deleted', }; const operatorNodeStatusLabels = { @@ -51,6 +52,7 @@ const operatorNodeStatusLabels = { [ExecutionStatus.Running]: 'Running', [ExecutionStatus.Warning]: 'Warning', [ExecutionStatus.Unknown]: 'Unknown', + [ExecutionStatus.Deleted]: 'Deleted', }; const checkNodeStatusLabels = { @@ -62,6 +64,7 @@ const checkNodeStatusLabels = { [ExecutionStatus.Running]: 'Running', [ExecutionStatus.Warning]: 'Warning', [ExecutionStatus.Unknown]: 'Unknown', + [ExecutionStatus.Deleted]: 'Deleted', }; export const artifactTypeToIconMapping = { @@ -163,6 +166,7 @@ export const Node: React.FC = ({ data, isConnectable }) => { let backgroundColor; switch (status) { case ExecutionStatus.Succeeded: + case ExecutionStatus.Deleted: backgroundColor = theme.palette.green[100]; break; case ExecutionStatus.Warning: diff --git a/src/ui/common/src/components/workflows/workflowStatus.tsx b/src/ui/common/src/components/workflows/workflowStatus.tsx index e6d22a177..1140f5a17 100644 --- a/src/ui/common/src/components/workflows/workflowStatus.tsx +++ b/src/ui/common/src/components/workflows/workflowStatus.tsx @@ -7,6 +7,7 @@ import { faTriangleExclamation, faX, } from '@fortawesome/free-solid-svg-icons'; +import { faTrash } from '@fortawesome/free-solid-svg-icons'; import { FontAwesomeIcon } from '@fortawesome/react-fontawesome'; import { Tooltip } from '@mui/material'; import Box from '@mui/material/Box'; @@ -130,6 +131,10 @@ export const StatusIndicator: React.FC = ({ icon = faTriangleExclamation; break; + case ExecutionStatus.Deleted: + icon = faTrash; + break; + default: return null; // This can never happen. } diff --git a/src/ui/common/src/utils/shared.ts b/src/ui/common/src/utils/shared.ts index e690c159b..e00d09098 100644 --- a/src/ui/common/src/utils/shared.ts +++ b/src/ui/common/src/utils/shared.ts @@ -46,6 +46,7 @@ export enum ExecutionStatus { Running = 'running', // Checks can have a warning status. Warning = 'warning', + Deleted = 'deleted', } export const getArtifactResultTableRow = ( @@ -117,6 +118,9 @@ export const stringToExecutionStatus = (status: string): ExecutionStatus => { case 'warning': executionStatus = ExecutionStatus.Warning; break; + case 'deleted': + executionStatus = ExecutionStatus.Deleted; + break; default: executionStatus = ExecutionStatus.Unknown; break;