Skip to content

Commit

Permalink
Update GQL to expose StaleStatus and StaleStatusCause (#11952)
Browse files Browse the repository at this point in the history
### Summary & Motivation

This updates GQL to expose the new StaleStatus and StaleStatusCause
data.

### How I Tested These Changes

Unit tests (in progress)
  • Loading branch information
smackesey authored and clairelin135 committed Feb 22, 2023
1 parent f131a97 commit 1e4aa98
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 5 deletions.
14 changes: 14 additions & 0 deletions js_modules/dagit/packages/core/src/graphql/schema.graphql

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions js_modules/dagit/packages/core/src/graphql/types.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import TYPE_CHECKING, List, Optional, Sequence, Union, cast
from typing import TYPE_CHECKING, Any, List, Optional, Sequence, Union, cast

import graphene
from dagster import (
Expand All @@ -8,6 +8,7 @@
from dagster._core.definitions.external_asset_graph import ExternalAssetGraph
from dagster._core.definitions.logical_version import (
NULL_LOGICAL_VERSION,
StaleStatus,
)
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.event_api import EventRecordsFilter
Expand Down Expand Up @@ -79,6 +80,17 @@
if TYPE_CHECKING:
from .external import GrapheneRepository

GrapheneAssetStaleStatus = graphene.Enum.from_enum(StaleStatus, name="StaleStatus")


class GrapheneAssetStaleStatusCause(graphene.ObjectType):
status = graphene.NonNull(GrapheneAssetStaleStatus)
key = graphene.NonNull(GrapheneAssetKey)
reason = graphene.NonNull(graphene.String)

class Meta:
name = "StaleStatusCause"


class GrapheneAssetDependency(graphene.ObjectType):
class Meta:
Expand Down Expand Up @@ -219,6 +231,8 @@ class GrapheneAssetNode(graphene.ObjectType):
projectedLogicalVersion = graphene.String()
repository = graphene.NonNull(lambda: external.GrapheneRepository)
required_resources = non_null_list(GrapheneResourceRequirement)
staleStatus = graphene.Field(GrapheneAssetStaleStatus)
staleStatusCauses = non_null_list(GrapheneAssetStaleStatusCause)
type = graphene.Field(GrapheneDagsterType)

class Meta:
Expand Down Expand Up @@ -542,7 +556,23 @@ def resolve_configField(self, _graphene_info: ResolveInfo) -> Optional[GrapheneC
def resolve_computeKind(self, _graphene_info: ResolveInfo) -> Optional[str]:
return self._external_asset_node.compute_kind

def resolve_currentLogicalVersion(self, _graphene_info: ResolveInfo) -> Optional[str]:
def resolve_staleStatus(self, graphene_info: ResolveInfo) -> Any: # (GrapheneAssetStaleStatus)
return self.stale_status_loader.get_status(self._external_asset_node.asset_key)

def resolve_staleStatusCauses(
self, graphene_info: ResolveInfo
) -> Sequence[GrapheneAssetStaleStatusCause]:
causes = self.stale_status_loader.get_status_causes(self._external_asset_node.asset_key)
return [
GrapheneAssetStaleStatusCause(
cause.status,
GrapheneAssetKey(path=cause.key.path),
cause.reason,
)
for cause in causes
]

def resolve_currentLogicalVersion(self, graphene_info: ResolveInfo) -> Optional[str]:
version = self.stale_status_loader.get_current_logical_version(
self._external_asset_node.asset_key
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,12 @@
}
currentLogicalVersion
projectedLogicalVersion
staleStatus
staleStatusCauses {
status
key { path }
reason
}
assetMaterializations {
tags {
key
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Mapping
from typing import Any, Mapping, Optional, Sequence

from dagster import (
AssetIn,
Expand All @@ -8,6 +8,7 @@
asset,
repository,
)
from dagster._core.definitions.events import AssetKey
from dagster._core.definitions.logical_version import LOGICAL_VERSION_TAG_KEY
from dagster._core.test_utils import instance_for_test, wait_for_runs_to_finish
from dagster._core.workspace.context import WorkspaceRequestContext
Expand Down Expand Up @@ -61,6 +62,29 @@ def test_dependencies_changed():
assert _fetch_logical_versions(context_v2, repo_v2)


def test_stale_status():
repo = get_repo_v1()

with instance_for_test() as instance:
with define_out_of_process_context(__file__, "get_repo_v1", instance) as context:
result = _fetch_logical_versions(context, repo)
foo = _get_asset_node("foo", result)
assert foo["currentLogicalVersion"] is None
assert foo["staleStatus"] == "STALE"
assert foo["staleStatusCauses"] == [
{"status": "STALE", "reason": "never materialized", "key": {"path": ["foo"]}}
]

assert _materialize_assets(context, repo)
wait_for_runs_to_finish(context.instance)

result = _fetch_logical_versions(context, repo)
foo = _get_asset_node("foo", result)
assert foo["currentLogicalVersion"] is not None
assert foo["staleStatus"] == "FRESH"
assert foo["staleStatusCauses"] == []


def test_logical_version_from_tags():
repo_v1 = get_repo_v1()
with instance_for_test() as instance:
Expand Down Expand Up @@ -110,8 +134,14 @@ def test_partitioned_self_dep():
assert _get_asset_node("b", result)["projectedLogicalVersion"] == "UNKNOWN"


def _materialize_assets(context: WorkspaceRequestContext, repo: RepositoryDefinition):
selector = infer_job_or_pipeline_selector(context, repo.get_implicit_asset_job_names()[0])
def _materialize_assets(
context: WorkspaceRequestContext,
repo: RepositoryDefinition,
asset_selection: Optional[Sequence[AssetKey]] = None,
):
selector = infer_job_or_pipeline_selector(
context, repo.get_implicit_asset_job_names()[0], asset_selection=asset_selection
)
return execute_dagster_graphql(
context,
LAUNCH_PIPELINE_EXECUTION_MUTATION,
Expand Down

0 comments on commit 1e4aa98

Please sign in to comment.