Skip to content

Commit

Permalink
add flag argument to backfill all partitions in launch backfill graph…
Browse files Browse the repository at this point in the history
…ql mutation (#7499)
  • Loading branch information
prha committed Apr 20, 2022
1 parent a34a9e6 commit d57d83b
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 4 deletions.
3 changes: 2 additions & 1 deletion js_modules/dagit/packages/core/src/graphql/schema.graphql

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion js_modules/dagit/packages/core/src/types/globalTypes.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,18 @@ def create_and_launch_partition_backfill(graphene_info, backfill_params):

external_partition_set = next(iter(matches))

partition_names = backfill_params.get("partitionNames")
if backfill_params.get("allPartitions"):
result = graphene_info.context.get_external_partition_names(
repository.handle, external_partition_set.name
)
partition_names = result.partition_names
elif backfill_params.get("partitionNames"):
partition_names = backfill_params.get("partitionNames")
else:
raise Exception(
'Backfill requested without specifying either "allPartitions" or "partitionNames" '
"arguments"
)

backfill_id = make_new_backfill_id()
backfill = PartitionBackfill(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,10 @@ class Meta:

class GrapheneLaunchBackfillParams(graphene.InputObjectType):
selector = graphene.NonNull(GraphenePartitionSetSelector)
partitionNames = non_null_list(graphene.String)
partitionNames = graphene.List(graphene.NonNull(graphene.String))
reexecutionSteps = graphene.List(graphene.NonNull(graphene.String))
fromFailure = graphene.Boolean()
allPartitions = graphene.Boolean()
tags = graphene.List(graphene.NonNull(GrapheneExecutionTag))
forceSynchronousSubmission = graphene.Boolean()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,3 +291,36 @@ def test_launch_from_failure(self, graphql_context):
assert result.data["partitionBackfillOrError"]["numRequested"] == 0
assert result.data["partitionBackfillOrError"]["numTotal"] == 2
assert result.data["partitionBackfillOrError"]["fromFailure"]

def test_launch_backfill_with_all_partitions_flag(self, graphql_context):
repository_selector = infer_repository_selector(graphql_context)
partition_set_selector = {
"repositorySelector": repository_selector,
"partitionSetName": "chained_integer_partition",
}

result = execute_dagster_graphql_and_finish_runs(
graphql_context,
LAUNCH_PARTITION_BACKFILL_MUTATION,
variables={
"backfillParams": {
"selector": partition_set_selector,
"allPartitions": True,
}
},
)
assert not result.errors
assert result.data
assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess"
backfill_id = result.data["launchPartitionBackfill"]["backfillId"]

result = execute_dagster_graphql(
graphql_context, PARTITION_PROGRESS_QUERY, variables={"backfillId": backfill_id}
)

assert not result.errors
assert result.data
assert result.data["partitionBackfillOrError"]["__typename"] == "PartitionBackfill"
assert result.data["partitionBackfillOrError"]["status"] == "REQUESTED"
assert result.data["partitionBackfillOrError"]["numRequested"] == 0
assert result.data["partitionBackfillOrError"]["numTotal"] == 10

0 comments on commit d57d83b

Please sign in to comment.