Skip to content

Commit

Permalink
add graphql fields for querying run tags (#12409)
Browse files Browse the repository at this point in the history
### Summary & Motivation
We want separate graphql endpoints to query run tag keys and filtered
run tag values.

### How I Tested These Changes
Added new graphql tests
  • Loading branch information
prha authored and clairelin135 committed Feb 22, 2023
1 parent 4c74851 commit 2214ad6
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 12 deletions.
3 changes: 2 additions & 1 deletion js_modules/dagit/packages/core/src/graphql/schema.graphql

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion js_modules/dagit/packages/core/src/graphql/types.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,28 @@ def get_run_by_id(
return GrapheneRun(record)


def get_run_tags(graphene_info: "ResolveInfo") -> List["GraphenePipelineTagAndValues"]:
def get_run_tag_keys(graphene_info: "ResolveInfo") -> List[str]:
return [
tag_key
for tag_key in graphene_info.context.instance.get_run_tag_keys()
if get_tag_type(tag_key) != TagType.HIDDEN
]


def get_run_tags(
graphene_info: "ResolveInfo",
tag_keys: Optional[List[str]] = None,
value_prefix: Optional[str] = None,
limit: Optional[int] = None,
) -> List["GraphenePipelineTagAndValues"]:
from ..schema.tags import GraphenePipelineTagAndValues

instance = graphene_info.context.instance
return [
GraphenePipelineTagAndValues(key=key, values=values)
for key, values in instance.get_run_tags()
for key, values in instance.get_run_tags(
tag_keys=tag_keys, value_prefix=value_prefix, limit=limit
)
if get_tag_type(key) != TagType.HIDDEN
]

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Dict, Mapping, Optional, Sequence, cast
from typing import Any, Dict, List, Mapping, Optional, Sequence, cast

import dagster._check as check
import graphene
Expand Down Expand Up @@ -49,6 +49,7 @@
get_run_by_id,
get_run_group,
get_run_groups,
get_run_tag_keys,
get_run_tags,
validate_pipeline_config,
)
Expand Down Expand Up @@ -290,8 +291,14 @@ class Meta:
runId=graphene.NonNull(graphene.ID),
description="Retrieve a run by its run id.",
)
pipelineRunTags = graphene.Field(
runTagKeys = graphene.Field(
non_null_list(graphene.String), description="Retrieve the distinct tag keys from all runs."
)
runTags = graphene.Field(
non_null_list(GraphenePipelineTagAndValues),
tagKeys=graphene.Argument(graphene.List(graphene.NonNull(graphene.String))),
valuePrefix=graphene.String(),
limit=graphene.Int(),
description="Retrieve all the distinct key-value tags from all runs.",
)

Expand Down Expand Up @@ -629,8 +636,17 @@ def resolve_partitionSetOrError(
partitionSetName, # type: ignore
)

def resolve_pipelineRunTags(self, graphene_info: ResolveInfo):
return get_run_tags(graphene_info)
def resolve_runTagKeys(self, graphene_info: ResolveInfo):
return get_run_tag_keys(graphene_info)

def resolve_runTags(
self,
graphene_info: ResolveInfo,
tagKeys: Optional[List[str]] = None,
valuePrefix: Optional[str] = None,
limit: Optional[int] = None,
):
return get_run_tags(graphene_info, tagKeys, valuePrefix, limit)

def resolve_runGroupOrError(self, graphene_info: ResolveInfo, runId):
return get_run_group(graphene_info, runId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@

ALL_TAGS_QUERY = """
{
pipelineRunTags {
runTags {
... on PipelineTagAndValues {
key
values
Expand All @@ -89,6 +89,22 @@
}
"""

ALL_TAG_KEYS_QUERY = """
{
runTagKeys
}
"""

FILTERED_TAGS_QUERY = """
query FilteredRunTagsQuery($tagKeys: [String!]!) {
runTags(tagKeys: $tagKeys) {
key
values
}
}
"""


ALL_RUNS_QUERY = """
{
pipelineRunsOrError {
Expand Down Expand Up @@ -299,14 +315,26 @@ def test_get_runs_over_graphql(self, graphql_context):
runs = result.data["pipelineOrError"]["runs"]
assert len(runs) == 2

all_tags_result = execute_dagster_graphql(read_context, ALL_TAGS_QUERY)
tags = all_tags_result.data["pipelineRunTags"]
all_tag_keys_result = execute_dagster_graphql(read_context, ALL_TAG_KEYS_QUERY)
tag_keys = set(all_tag_keys_result.data["runTagKeys"])
# check presence rather than set equality since we might have extra tags in cloud
assert "fruit" in tag_keys
assert "veggie" in tag_keys

all_tags_result = execute_dagster_graphql(read_context, ALL_TAGS_QUERY)
tags = all_tags_result.data["runTags"]
tags_dict = {item["key"]: item["values"] for item in tags}

assert tags_dict["fruit"] == ["apple"]
assert tags_dict["veggie"] == ["carrot"]

filtered_tags_result = execute_dagster_graphql(
read_context, FILTERED_TAGS_QUERY, variables={"tagKeys": ["fruit"]}
)
tags = filtered_tags_result.data["runTags"]
tags_dict = {item["key"]: item["values"] for item in tags}
assert len(tags_dict) == 1
assert tags_dict["fruit"] == ["apple"]

# delete the second run
result = execute_dagster_graphql(
read_context, DELETE_RUN_MUTATION, variables={"runId": run_id_two}
Expand Down

0 comments on commit 2214ad6

Please sign in to comment.