Skip to content

Commit

Permalink
comments
Browse files Browse the repository at this point in the history
  • Loading branch information
likawind committed Jun 6, 2023
1 parent 63a678f commit d0464bd
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 5 deletions.
5 changes: 5 additions & 0 deletions integration_tests/sdk/aqueduct_tests/flow_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,22 +529,27 @@ def check_enabled(count):
assert (
latest_run.artifact("extract artifact").execution_state().status == ExecutionStatus.DELETED
)
assert latest_run.artifact("extract artifact").get() is None
assert (
latest_run.artifact("op_disabled_by_wf artifact").execution_state().status
== ExecutionStatus.DELETED
)
assert latest_run.artifact("op_disabled_by_wf artifact").get() is None
assert (
latest_run.artifact("metric_enabled artifact").execution_state().status
== ExecutionStatus.SUCCEEDED
)
assert latest_run.artifact("metric_enabled artifact").get() == 100
assert (
latest_run.artifact("metric_disabled artifact").execution_state().status
== ExecutionStatus.DELETED
)
assert latest_run.artifact("metric_disabled artifact").get() is None
assert (
latest_run.artifact("check_enabled artifact").execution_state().status
== ExecutionStatus.SUCCEEDED
)
assert latest_run.artifact("check_enabled artifact").get()


def test_artifact_set_name(client, flow_name, engine):
Expand Down
9 changes: 8 additions & 1 deletion sdk/aqueduct/artifacts/base_artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import numpy as np
from aqueduct.constants.enums import ArtifactType, OperatorType
from aqueduct.models.dag import DAG
from aqueduct.models.execution_state import ExecutionState
from aqueduct.models.execution_state import ExecutionState, ExecutionStatus
from aqueduct.type_annotations import Number
from aqueduct.utils.naming import sanitize_artifact_name

Expand Down Expand Up @@ -38,6 +38,13 @@ def __init__(
# It stays 'None' when the artifact runs in previews.
self._execution_state = execution_state

def _is_content_deleted(self) -> bool:
if self._from_flow_run:
if self._execution_state and self._execution_state.status == ExecutionStatus.DELETED:
return True

return False

def id(self) -> uuid.UUID:
"""Fetch the id associated with this artifact.
Expand Down
5 changes: 4 additions & 1 deletion sdk/aqueduct/artifacts/bool_artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def __init__(
):
super().__init__(dag, artifact_id, content, from_flow_run, execution_state)

def get(self, parameters: Optional[Dict[str, Any]] = None) -> Union[bool, np.bool_]:
def get(self, parameters: Optional[Dict[str, Any]] = None) -> Optional[Union[bool, np.bool_]]:
"""Materializes a BoolArtifact into a boolean.
Returns:
Expand All @@ -60,6 +60,9 @@ def get(self, parameters: Optional[Dict[str, Any]] = None) -> Union[bool, np.boo
InternalServerError:
An unexpected error occurred in the server.
"""
if self._is_content_deleted():
return None

self._dag.must_get_artifact(self._artifact_id)

if self._from_flow_run:
Expand Down
3 changes: 3 additions & 0 deletions sdk/aqueduct/artifacts/generic_artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ def get(self, parameters: Optional[Dict[str, Any]] = None) -> Any:
InternalServerError:
An unexpected error occurred in the server.
"""
if self._is_content_deleted():
return None

self._dag.must_get_artifact(self._artifact_id)

if self._from_flow_run:
Expand Down
4 changes: 3 additions & 1 deletion sdk/aqueduct/artifacts/numeric_artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def __init__(
):
super().__init__(dag, artifact_id, content, from_flow_run, execution_state)

def get(self, parameters: Optional[Dict[str, Any]] = None) -> Number:
def get(self, parameters: Optional[Dict[str, Any]] = None) -> Optional[Number]:
"""Materializes a NumericArtifact into its immediate float value.
Returns:
Expand All @@ -86,6 +86,8 @@ def get(self, parameters: Optional[Dict[str, Any]] = None) -> Number:
An unexpected error occurred within the Aqueduct cluster.
"""
self._dag.must_get_artifact(self._artifact_id)
if self._is_content_deleted():
return None

if self._from_flow_run:
if self._get_content() is None:
Expand Down
11 changes: 9 additions & 2 deletions sdk/aqueduct/artifacts/table_artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def __init__(
):
super().__init__(dag, artifact_id, content, from_flow_run, execution_state)

def get(self, parameters: Optional[Dict[str, Any]] = None) -> pd.DataFrame:
def get(self, parameters: Optional[Dict[str, Any]] = None) -> Optional[pd.DataFrame]:
"""Materializes TableArtifact into an actual dataframe.
Args:
Expand All @@ -85,6 +85,8 @@ def get(self, parameters: Optional[Dict[str, Any]] = None) -> pd.DataFrame:
An unexpected error occurred within the Aqueduct cluster.
"""
self._dag.must_get_artifact(self._artifact_id)
if self._is_content_deleted():
return None

if self._from_flow_run:
if self._get_content() is None:
Expand All @@ -110,7 +112,9 @@ def get(self, parameters: Optional[Dict[str, Any]] = None) -> pd.DataFrame:
assert isinstance(content, pd.DataFrame)
return content

def head(self, n: int = 5, parameters: Optional[Dict[str, Any]] = None) -> pd.DataFrame:
def head(
self, n: int = 5, parameters: Optional[Dict[str, Any]] = None
) -> Optional[pd.DataFrame]:
"""Returns a preview of the table artifact.
>>> db = client.resource(name="demo/")
Expand All @@ -125,6 +129,9 @@ def head(self, n: int = 5, parameters: Optional[Dict[str, Any]] = None) -> pd.Da
A dataframe containing the table contents of this artifact.
"""
df = self.get(parameters=parameters)
if df is None:
return None

return df.head(n)

PRESET_METRIC_LIST = ["number_of_missing_values", "number_of_rows", "max", "min", "mean", "std"]
Expand Down

0 comments on commit d0464bd

Please sign in to comment.