Skip to content

Commit

Permalink
[1/n] Add should persist column to artf table (#1390)
Browse files Browse the repository at this point in the history
  • Loading branch information
likawind committed Jun 6, 2023
1 parent ca3b9c0 commit 1c55306
Show file tree
Hide file tree
Showing 58 changed files with 626 additions and 276 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/mypy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
- name: Install Type Stub Libraries
run: |
python3 -m pip install types-croniter types-requests types-PyYAML types-setuptools types-PyMySQL
python3 -m pip install types-croniter types-requests types-PyYAML types-setuptools types-PyMySQL types-python-dateutil
- name: mypy sdk
working-directory: sdk
Expand Down
57 changes: 57 additions & 0 deletions integration_tests/sdk/aqueduct_tests/flow_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,63 @@ def noop():
assert len(flow.list_saved_objects()) == 0


def test_flow_with_disabled_snapshots(client, flow_name, data_resource, engine):
@op
def op_disabled_by_wf(df):
return df

@metric
def metric_enabled(df):
return df.shape[0]

@metric
def metric_disabled(df):
return df.shape[0]

@check
def check_enabled(count):
return count > 10

reviews = extract(data_resource, DataObject.SENTIMENT, output_name="extract artifact")
op_artf = op_disabled_by_wf(reviews)
metric_enabled(op_artf)
metric_disabled_artf = metric_disabled(op_artf)
metric_disabled_artf.disable_snapshot()
check_enabled(metric_disabled_artf)

op_artf.enable_snapshot()
save(data_resource, op_artf)

flow = publish_flow_test(
client, op_artf, name=flow_name(), engine=engine, disable_snapshots=True
)
latest_run = flow.latest()
assert (
latest_run.artifact("extract artifact").execution_state().status == ExecutionStatus.DELETED
)
assert latest_run.artifact("extract artifact").get() is None
assert (
latest_run.artifact("op_disabled_by_wf artifact").execution_state().status
== ExecutionStatus.DELETED
)
assert latest_run.artifact("op_disabled_by_wf artifact").get() is None
assert (
latest_run.artifact("metric_enabled artifact").execution_state().status
== ExecutionStatus.SUCCEEDED
)
assert latest_run.artifact("metric_enabled artifact").get() == 100
assert (
latest_run.artifact("metric_disabled artifact").execution_state().status
== ExecutionStatus.DELETED
)
assert latest_run.artifact("metric_disabled artifact").get() is None
assert (
latest_run.artifact("check_enabled artifact").execution_state().status
== ExecutionStatus.SUCCEEDED
)
assert latest_run.artifact("check_enabled artifact").get()


def test_artifact_set_name(client, flow_name, engine):
@op
def foo():
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/sdk/shared/flow_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def publish_flow_test(
source_flow: Optional[Union[Flow, str, uuid.UUID]] = None,
should_block: bool = True,
use_local: bool = False,
disable_snapshots: bool = False,
) -> Flow:
"""Publishes a flow and waits for a specified number of runs with specified statuses to complete.
Expand Down Expand Up @@ -89,6 +90,7 @@ def publish_flow_test(
engine=engine,
source_flow=source_flow,
use_local=use_local,
disable_snapshots=disable_snapshots,
)
print("Workflow registration succeeded. Workflow ID %s. Name: %s" % (flow.id(), name))

Expand Down
2 changes: 2 additions & 0 deletions manual_qa_tests/initialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
succeed_dag_layout_test,
succeed_march_madness_dag_layout_test,
succeed_parameters,
succeed_with_snapshots_disabled,
warning_bad_check,
)

Expand All @@ -31,6 +32,7 @@
succeed_march_madness_dag_layout_test,
fail_bad_operator,
no_run,
succeed_with_snapshots_disabled,
]

DEMO_NOTEBOOKS_PATHS = [
Expand Down
50 changes: 50 additions & 0 deletions manual_qa_tests/workflows/succeed_with_snapshots_disabled.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import aqueduct as aq

NAME = "succeed_with_snapshots_disabled"
DESCRIPTION = """* Workflows Page: should succeed.
* Workflow Details Page:
* There artifacts are shown deleted: op_disabled artf, Demo query artf
* Both checks should show 'passed'.
"""


@aq.op(requirements=[])
def op_disabled(df):
return df


@aq.metric(requirements=[])
def metric(df):
return df.shape[0]


@aq.check(requirements=[])
def check(count):
return count > 10


@aq.check(requirements=[])
def check_disabled(count):
return count > 10


def deploy(client, resource_name):
resource = client.resource(resource_name)
reviews = resource.sql("SELECT * FROM hotel_reviews")
op_artf = op_disabled(reviews)
metric_artf = metric(op_artf)
check(metric_artf)
check_disabled_artf = check_disabled(metric_artf)

op_artf.enable_snapshot()
check_disabled_artf.disable_snapshot()

resource.save(op_artf, "succeed_with_snapshots_disabled_tmp", aq.LoadUpdateMode.REPLACE)

client.publish_flow(
artifacts=[op_artf],
name=NAME,
description=DESCRIPTION,
schedule="",
disable_snapshots=True,
)
2 changes: 1 addition & 1 deletion scripts/install_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
ui_directory = join(os.environ["HOME"], ".aqueduct", "ui")

# Make sure to update this if there is any schema change we want to include in the upgrade.
SCHEMA_VERSION = "27"
SCHEMA_VERSION = "28"


def execute_command(args, cwd=None):
Expand Down
44 changes: 43 additions & 1 deletion sdk/aqueduct/artifacts/base_artifact.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import json
import uuid
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, Union

import numpy as np
from aqueduct.constants.enums import ArtifactType, OperatorType
from aqueduct.models.dag import DAG
from aqueduct.models.execution_state import ExecutionState, ExecutionStatus
from aqueduct.type_annotations import Number
from aqueduct.utils.naming import sanitize_artifact_name


Expand All @@ -14,6 +17,33 @@ class BaseArtifact(ABC):
_content: Any
_from_flow_run: bool
_from_operator_type: Optional[OperatorType] = None
_execution_state: Optional[ExecutionState] = None

def __init__(
self,
dag: DAG,
artifact_id: uuid.UUID,
content: Optional[Union[bool, np.bool_, Number]] = None,
from_flow_run: bool = False,
execution_state: Optional[ExecutionState] = None,
) -> None:
self._dag = dag
self._artifact_id = artifact_id

# This parameter indicates whether the artifact is fetched from flow-run or not.
self._from_flow_run = from_flow_run
self._set_content(content)

# For now, the execution_state is only relevant when it's fetched from a flow run.
# It stays 'None' when the artifact runs in previews.
self._execution_state = execution_state

def _is_content_deleted(self) -> bool:
if self._from_flow_run:
if self._execution_state and self._execution_state.status == ExecutionStatus.DELETED:
return True

return False

def id(self) -> uuid.UUID:
"""Fetch the id associated with this artifact.
Expand All @@ -29,6 +59,12 @@ def name(self) -> str:
def type(self) -> ArtifactType:
return self._dag.must_get_artifact(artifact_id=self._artifact_id).type

def snapshot_enabled(self) -> bool:
return self._dag.must_get_artifact(artifact_id=self._artifact_id).should_persist

def execution_state(self) -> Optional[ExecutionState]:
return self._execution_state

def _get_content(self) -> Any:
return self._content

Expand All @@ -41,6 +77,12 @@ def set_operator_type(self, operator_type: OperatorType) -> None:
def set_name(self, name: str) -> None:
self._dag.update_artifact_name(self._artifact_id, sanitize_artifact_name(name))

def enable_snapshot(self) -> None:
self._dag.update_artifact_should_persist(self._artifact_id, True)

def disable_snapshot(self) -> None:
self._dag.update_artifact_should_persist(self._artifact_id, False)

def _describe(self) -> Dict[str, Any]:
input_operator = self._dag.must_get_operator(with_output_artifact_id=self._artifact_id)
return {
Expand Down
14 changes: 7 additions & 7 deletions sdk/aqueduct/artifacts/bool_artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from aqueduct.constants.enums import OperatorType
from aqueduct.error import ArtifactNeverComputedException
from aqueduct.models.dag import DAG
from aqueduct.models.execution_state import ExecutionState
from aqueduct.models.operators import get_operator_type
from aqueduct.utils.describe import get_readable_description_for_check
from aqueduct.utils.utils import format_header_for_print
Expand Down Expand Up @@ -43,15 +44,11 @@ def __init__(
artifact_id: uuid.UUID,
content: Optional[Union[bool, np.bool_]] = None,
from_flow_run: bool = False,
execution_state: Optional[ExecutionState] = None,
):
self._dag = dag
self._artifact_id = artifact_id
super().__init__(dag, artifact_id, content, from_flow_run, execution_state)

# This parameter indicates whether the artifact is fetched from flow-run or not.
self._from_flow_run = from_flow_run
self._set_content(content)

def get(self, parameters: Optional[Dict[str, Any]] = None) -> Union[bool, np.bool_]:
def get(self, parameters: Optional[Dict[str, Any]] = None) -> Optional[Union[bool, np.bool_]]:
"""Materializes a BoolArtifact into a boolean.
Returns:
Expand All @@ -63,6 +60,9 @@ def get(self, parameters: Optional[Dict[str, Any]] = None) -> Union[bool, np.boo
InternalServerError:
An unexpected error occurred in the server.
"""
if self._is_content_deleted():
return None

self._dag.must_get_artifact(self._artifact_id)

if self._from_flow_run:
Expand Down
23 changes: 11 additions & 12 deletions sdk/aqueduct/artifacts/generic_artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from aqueduct.constants.enums import ArtifactType, ExecutionStatus
from aqueduct.error import ArtifactNeverComputedException
from aqueduct.models.dag import DAG
from aqueduct.models.execution_state import ExecutionState
from aqueduct.utils.utils import format_header_for_print

from aqueduct import globals
Expand All @@ -30,22 +31,13 @@ def __init__(
artifact_type: ArtifactType = ArtifactType.UNTYPED,
content: Optional[Any] = None,
from_flow_run: bool = False,
execution_status: Optional[ExecutionStatus] = None,
execution_state: Optional[ExecutionState] = None,
):
# Cannot initialize a generic artifact's content without also setting its type.
if content is not None:
assert artifact_type != ArtifactType.UNTYPED

self._dag = dag
self._artifact_id = artifact_id

# This parameter indicates whether the artifact is fetched from flow-run or not.
self._from_flow_run = from_flow_run
self._set_content(content)
# This is only relevant to generic artifact produced from flow_run.artifact().
# We need this to distinguish between when an artifact's content is None versus
# when it fails to compute successfully.
self._execution_status = execution_status
super().__init__(dag, artifact_id, content, from_flow_run, execution_state)

def get(self, parameters: Optional[Dict[str, Any]] = None) -> Any:
"""Materializes the artifact.
Expand All @@ -59,10 +51,17 @@ def get(self, parameters: Optional[Dict[str, Any]] = None) -> Any:
InternalServerError:
An unexpected error occurred in the server.
"""
if self._is_content_deleted():
return None

self._dag.must_get_artifact(self._artifact_id)

if self._from_flow_run:
if self._execution_status != ExecutionStatus.SUCCEEDED:
if (
not self._execution_state
or self._execution_state.status != ExecutionStatus.SUCCEEDED
):
# DELETED case is already covered.
raise ArtifactNeverComputedException(
"This artifact was part of an existing flow run but was never computed successfully!",
)
Expand Down
14 changes: 7 additions & 7 deletions sdk/aqueduct/artifacts/numeric_artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from aqueduct.error import AqueductError, ArtifactNeverComputedException
from aqueduct.models.artifact import ArtifactMetadata
from aqueduct.models.dag import DAG
from aqueduct.models.execution_state import ExecutionState
from aqueduct.models.operators import (
CheckSpec,
FunctionSpec,
Expand Down Expand Up @@ -68,15 +69,11 @@ def __init__(
artifact_id: uuid.UUID,
content: Optional[Number] = None,
from_flow_run: bool = False,
execution_state: Optional[ExecutionState] = None,
):
self._dag = dag
self._artifact_id = artifact_id
super().__init__(dag, artifact_id, content, from_flow_run, execution_state)

# This parameter indicates whether the artifact is fetched from flow-run or not.
self._from_flow_run = from_flow_run
self._set_content(content)

def get(self, parameters: Optional[Dict[str, Any]] = None) -> Number:
def get(self, parameters: Optional[Dict[str, Any]] = None) -> Optional[Number]:
"""Materializes a NumericArtifact into its immediate float value.
Returns:
Expand All @@ -89,9 +86,12 @@ def get(self, parameters: Optional[Dict[str, Any]] = None) -> Number:
An unexpected error occurred within the Aqueduct cluster.
"""
self._dag.must_get_artifact(self._artifact_id)
if self._is_content_deleted():
return None

if self._from_flow_run:
if self._get_content() is None:
# DELETED case is already covered.
raise ArtifactNeverComputedException(
"This artifact was part of an existing flow run but was never computed successfully!",
)
Expand Down

0 comments on commit 1c55306

Please sign in to comment.