Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Capture db timeout errors for tag key queries #12596

Merged
merged 4 commits into from
Mar 1, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view

Large diffs are not rendered by default.

16 changes: 14 additions & 2 deletions js_modules/dagit/packages/core/src/graphql/schema.graphql

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 17 additions & 3 deletions js_modules/dagit/packages/core/src/graphql/types.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 21 additions & 7 deletions js_modules/dagit/packages/core/src/runs/RunsFilterInput.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,11 @@ export const RunsFilterInput: React.FC<RunsFilterInputProps> = ({

const suggestions = searchSuggestionsForRuns(
options,
tagKeyData?.runTagKeys,
tagKeyData?.runTagKeysOrError?.__typename === 'RunTagKeys'
? tagKeyData.runTagKeysOrError.keys
: [],
selectedTagKey,
tagValueData?.runTags,
tagValueData?.runTagsOrError?.__typename === 'RunTags' ? tagValueData.runTagsOrError.tags : [],
enabledFilters,
);

Expand All @@ -225,7 +227,10 @@ export const RunsFilterInput: React.FC<RunsFilterInputProps> = ({
return;
}
const tagKeyText = text.slice(4);
if (tagKeyData?.runTagKeys && tagKeyData?.runTagKeys.includes(tagKeyText)) {
if (
tagKeyData?.runTagKeysOrError?.__typename === 'RunTagKeys' &&
tagKeyData.runTagKeysOrError.keys.includes(tagKeyText)
) {
setSelectedTagKey(tagKeyText);
}
};
Expand Down Expand Up @@ -272,15 +277,24 @@ export const RunsFilterInput: React.FC<RunsFilterInputProps> = ({

const RUN_TAG_KEYS_QUERY = gql`
query RunTagKeysQuery {
runTagKeys
runTagKeysOrError {
... on RunTagKeys {
keys
}
}
}
`;

const RUN_TAG_VALUES_QUERY = gql`
query RunTagValuesQuery($tagKeys: [String!]!) {
runTags(tagKeys: $tagKeys) {
key
values
runTagsOrError(tagKeys: $tagKeys) {
__typename
... on RunTags {
tags {
key
values
}
}
}
}
`;

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,14 @@
from .utils import capture_error

if TYPE_CHECKING:
from dagster_graphql.schema.tags import GraphenePipelineTagAndValues

from ..schema.asset_graph import GrapheneAssetLatestInfo, GrapheneAssetNode
from ..schema.errors import GrapheneRunNotFoundError
from ..schema.execution import GrapheneExecutionPlan
from ..schema.logs.events import GrapheneRunStepStats
from ..schema.pipelines.config import GraphenePipelineConfigValidationValid
from ..schema.pipelines.pipeline import GrapheneEventConnection, GrapheneRun
from ..schema.pipelines.pipeline_run_stats import GrapheneRunStatsSnapshot
from ..schema.runs import GrapheneRunGroup
from ..schema.runs import GrapheneRunGroup, GrapheneRunTagKeys, GrapheneRunTags
from ..schema.util import ResolveInfo


Expand All @@ -56,30 +54,39 @@ def get_run_by_id(
return GrapheneRun(record)


def get_run_tag_keys(graphene_info: "ResolveInfo") -> List[str]:
return [
tag_key
for tag_key in graphene_info.context.instance.get_run_tag_keys()
if get_tag_type(tag_key) != TagType.HIDDEN
]
@capture_error
def get_run_tag_keys(graphene_info: "ResolveInfo") -> "GrapheneRunTagKeys":
from ..schema.runs import GrapheneRunTagKeys

return GrapheneRunTagKeys(
keys=[
tag_key
for tag_key in graphene_info.context.instance.get_run_tag_keys()
if get_tag_type(tag_key) != TagType.HIDDEN
]
)


@capture_error
def get_run_tags(
graphene_info: "ResolveInfo",
tag_keys: Optional[List[str]] = None,
value_prefix: Optional[str] = None,
limit: Optional[int] = None,
) -> List["GraphenePipelineTagAndValues"]:
) -> "GrapheneRunTags":
from ..schema.runs import GrapheneRunTags
from ..schema.tags import GraphenePipelineTagAndValues

instance = graphene_info.context.instance
return [
GraphenePipelineTagAndValues(key=key, values=values)
for key, values in instance.get_run_tags(
tag_keys=tag_keys, value_prefix=value_prefix, limit=limit
)
if get_tag_type(key) != TagType.HIDDEN
]
return GrapheneRunTags(
tags=[
GraphenePipelineTagAndValues(key=key, values=values)
for key, values in instance.get_run_tags(
tag_keys=tag_keys, value_prefix=value_prefix, limit=limit
)
if get_tag_type(key) != TagType.HIDDEN
]
)


@capture_error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,12 @@
GrapheneRunGroupsOrError,
GrapheneRuns,
GrapheneRunsOrError,
GrapheneRunTagKeysOrError,
GrapheneRunTagsOrError,
parse_run_config_input,
)
from ..schedules import GrapheneScheduleOrError, GrapheneSchedulerOrError, GrapheneSchedulesOrError
from ..sensors import GrapheneSensorOrError, GrapheneSensorsOrError
from ..tags import GraphenePipelineTagAndValues
from ..test import GrapheneTestFields
from ..util import ResolveInfo, get_compute_log_manager, non_null_list
from .assets import GrapheneAssetOrError, GrapheneAssetsOrError
Expand Down Expand Up @@ -291,11 +292,11 @@ class Meta:
runId=graphene.NonNull(graphene.ID),
description="Retrieve a run by its run id.",
)
runTagKeys = graphene.Field(
non_null_list(graphene.String), description="Retrieve the distinct tag keys from all runs."
runTagKeysOrError = graphene.Field(
GrapheneRunTagKeysOrError, description="Retrieve the distinct tag keys from all runs."
)
runTags = graphene.Field(
non_null_list(GraphenePipelineTagAndValues),
runTagsOrError = graphene.Field(
GrapheneRunTagsOrError,
tagKeys=graphene.Argument(graphene.List(graphene.NonNull(graphene.String))),
valuePrefix=graphene.String(),
limit=graphene.Int(),
Expand Down Expand Up @@ -636,10 +637,10 @@ def resolve_partitionSetOrError(
partitionSetName, # type: ignore
)

def resolve_runTagKeys(self, graphene_info: ResolveInfo):
def resolve_runTagKeysOrError(self, graphene_info: ResolveInfo):
return get_run_tag_keys(graphene_info)

def resolve_runTags(
def resolve_runTagsOrError(
self,
graphene_info: ResolveInfo,
tagKeys: Optional[List[str]] = None,
Expand Down
27 changes: 27 additions & 0 deletions python_modules/dagster-graphql/dagster_graphql/schema/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
GraphenePythonError,
GrapheneRunGroupNotFoundError,
)
from .tags import GraphenePipelineTagAndValues
from .util import ResolveInfo, non_null_list


Expand Down Expand Up @@ -137,6 +138,32 @@ class Meta:
name = "RunGroupsOrError"


class GrapheneRunTagKeys(graphene.ObjectType):
keys = non_null_list(graphene.String)

class Meta:
name = "RunTagKeys"


class GrapheneRunTagKeysOrError(graphene.Union):
class Meta:
types = (GraphenePythonError, GrapheneRunTagKeys)
name = "RunTagKeysOrError"


class GrapheneRunTags(graphene.ObjectType):
tags = non_null_list(GraphenePipelineTagAndValues)

class Meta:
name = "RunTags"


class GrapheneRunTagsOrError(graphene.Union):
class Meta:
types = (GraphenePythonError, GrapheneRunTags)
name = "RunTagsOrError"


class GrapheneRunConfigData(GenericScalar, graphene.Scalar):
class Meta:
description = """This type is used when passing in a configuration object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,26 +80,41 @@

ALL_TAGS_QUERY = """
{
runTags {
... on PipelineTagAndValues {
key
values
runTagsOrError {
... on RunTags {
tags {
key
values
}
}
}
}
"""

ALL_TAG_KEYS_QUERY = """
{
runTagKeys
runTagKeysOrError {
__typename
... on RunTagKeys {
keys
}
... on PythonError {
message
stack
}
}
}
"""

FILTERED_TAGS_QUERY = """
query FilteredRunTagsQuery($tagKeys: [String!]!) {
runTags(tagKeys: $tagKeys) {
key
values
runTagsOrError(tagKeys: $tagKeys) {
... on RunTags {
tags {
key
values
}
}
}
}
"""
Expand Down Expand Up @@ -329,21 +344,21 @@ def test_get_runs_over_graphql(self, graphql_context):
assert len(runs) == 2

all_tag_keys_result = execute_dagster_graphql(read_context, ALL_TAG_KEYS_QUERY)
tag_keys = set(all_tag_keys_result.data["runTagKeys"])
tag_keys = set(all_tag_keys_result.data["runTagKeysOrError"]["keys"])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test the PythonError path - maybe use mock to force throwing

# check presence rather than set equality since we might have extra tags in cloud
assert "fruit" in tag_keys
assert "veggie" in tag_keys

all_tags_result = execute_dagster_graphql(read_context, ALL_TAGS_QUERY)
tags = all_tags_result.data["runTags"]
tags = all_tags_result.data["runTagsOrError"]["tags"]
tags_dict = {item["key"]: item["values"] for item in tags}
assert tags_dict["fruit"] == ["apple"]
assert tags_dict["veggie"] == ["carrot"]

filtered_tags_result = execute_dagster_graphql(
read_context, FILTERED_TAGS_QUERY, variables={"tagKeys": ["fruit"]}
)
tags = filtered_tags_result.data["runTags"]
tags = filtered_tags_result.data["runTagsOrError"]["tags"]
tags_dict = {item["key"]: item["values"] for item in tags}
assert len(tags_dict) == 1
assert tags_dict["fruit"] == ["apple"]
Expand Down