Skip to content

Commit

Permalink
add graphql fields to quickly query partition status (#7614)
Browse files Browse the repository at this point in the history
* add backfills by job, partitionRuns

* add tests for new graphql fields
  • Loading branch information
prha committed Apr 28, 2022
1 parent a83f387 commit 1cfcac3
Show file tree
Hide file tree
Showing 11 changed files with 164 additions and 39 deletions.
50 changes: 29 additions & 21 deletions js_modules/dagit/packages/core/src/graphql/schema.graphql

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export const BackfillTerminationDialog = ({backfill, onClose, onComplete}: Props
if (!backfill) {
return null;
}
const numUnscheduled = (backfill.numTotal || 0) - (backfill.numRequested || 0);
const numUnscheduled = (backfill.partitionNames.length || 0) - (backfill.numRequested || 0);
const cancelableRuns = backfill.runs.filter(
(run) => !doneStatuses.has(run?.status) && run.canTerminate,
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,14 +446,15 @@ const getProgressCounts = (backfill: Backfill) => {
{numQueued: 0, numInProgress: 0, numSucceeded: 0, numFailed: 0},
);

const numTotal = backfill.partitionNames.length;
return {
numQueued,
numInProgress,
numSucceeded,
numFailed,
numUnscheduled: backfill.numTotal - backfill.numRequested,
numUnscheduled: numTotal - backfill.numRequested,
numSkipped: backfill.numRequested - latestPartitionRuns.length,
numTotal: backfill.numTotal,
numTotal,
};
};

Expand Down Expand Up @@ -596,7 +597,7 @@ const BACKFILLS_QUERY = gql`
backfillId
status
numRequested
numTotal
partitionNames
runs {
id
canTerminate
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,10 @@ export const PartitionProgress = (props: Props) => {
numTotalRuns,
} = counts;
const numFinished = numSucceeded + numFailed;
const unscheduled = results.numTotal - results.numRequested;
const numTotal = results.partitionNames.length;
const unscheduled = numTotal - results.numRequested;

const skipped = results.numRequested - numPartitionRuns;
const numTotal = results.numTotal;

const table = (
<TooltipTable>
Expand Down Expand Up @@ -291,7 +291,7 @@ const PARTITION_PROGRESS_QUERY = gql`
backfillId
status
numRequested
numTotal
partitionNames
runs(limit: $limit) {
id
canTerminate
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,36 @@ def get_partition_set_partition_statuses(graphene_info, repository_handle, parti
for partition_name in result.partition_names
]
)


def get_partition_set_partition_runs(graphene_info, partition_set):
from ..schema.partition_sets import GraphenePartitionRun
from ..schema.pipelines.pipeline import GrapheneRun

result = graphene_info.context.get_external_partition_names(
partition_set.repository_handle, partition_set.name
)
run_records = graphene_info.context.instance.get_run_records(
RunsFilter(tags={PARTITION_SET_TAG: partition_set.name})
)

by_partition = {}
for record in run_records:
partition_name = record.pipeline_run.tags.get(PARTITION_NAME_TAG)
if not partition_name or partition_name in by_partition:
# all_partition_set_runs is in descending order by creation time, we should ignore
# runs for the same partition if we've already considered the partition
continue
by_partition[partition_name] = record

return [
GraphenePartitionRun(
id=f"{partition_set.name}:{partition_name}",
partitionName=partition_name,
run=GrapheneRun(by_partition[partition_name])
if partition_name in by_partition
else None,
)
# for partition_name, run_record in by_partition.items()
for partition_name in result.partition_names
]
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ class Meta:

backfillId = graphene.NonNull(graphene.String)
status = graphene.NonNull(GrapheneBulkActionStatus)
partitionNames = non_null_list(graphene.String)
numRequested = graphene.NonNull(graphene.Int)
numTotal = graphene.NonNull(graphene.Int)
fromFailure = graphene.NonNull(graphene.Boolean)
reexecutionSteps = non_null_list(graphene.String)
partitionSetName = graphene.NonNull(graphene.String)
Expand All @@ -107,9 +107,9 @@ def __init__(self, backfill_job):
backfillId=backfill_job.backfill_id,
partitionSetName=backfill_job.partition_set_origin.partition_set_name,
status=backfill_job.status,
numTotal=len(backfill_job.partition_names),
fromFailure=bool(backfill_job.from_failure),
reexecutionSteps=backfill_job.reexecution_steps,
partitionNames=backfill_job.partition_names,
timestamp=backfill_job.backfill_timestamp,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from dagster_graphql.implementation.fetch_partition_sets import (
get_partition_by_name,
get_partition_config,
get_partition_set_partition_runs,
get_partition_set_partition_statuses,
get_partition_tags,
get_partitions,
Expand All @@ -14,6 +15,7 @@
from dagster.core.storage.tags import PARTITION_NAME_TAG, PARTITION_SET_TAG
from dagster.utils import merge_dicts

from .backfill import GraphenePartitionBackfill
from .errors import (
GraphenePartitionSetNotFoundError,
GraphenePipelineNotFoundError,
Expand Down Expand Up @@ -56,6 +58,15 @@ class Meta:
name = "PartitionStatus"


class GraphenePartitionRun(graphene.ObjectType):
id = graphene.NonNull(graphene.String)
partitionName = graphene.NonNull(graphene.String)
run = graphene.Field(GrapheneRun)

class Meta:
name = "PartitionRun"


class GraphenePartitionStatuses(graphene.ObjectType):
results = non_null_list(GraphenePartitionStatus)

Expand Down Expand Up @@ -174,7 +185,13 @@ class GraphenePartitionSet(graphene.ObjectType):
)
partition = graphene.Field(GraphenePartition, partition_name=graphene.NonNull(graphene.String))
partitionStatusesOrError = graphene.NonNull(GraphenePartitionStatusesOrError)
partitionRuns = non_null_list(GraphenePartitionRun)
repositoryOrigin = graphene.NonNull(GrapheneRepositoryOrigin)
backfills = graphene.Field(
non_null_list(GraphenePartitionBackfill),
cursor=graphene.String(),
limit=graphene.Int(),
)

class Meta:
name = "PartitionSet"
Expand Down Expand Up @@ -215,6 +232,9 @@ def resolve_partition(self, graphene_info, partition_name):
partition_name,
)

def resolve_partitionRuns(self, graphene_info):
return get_partition_set_partition_runs(graphene_info, self._external_partition_set)

def resolve_partitionStatusesOrError(self, graphene_info):
return get_partition_set_partition_statuses(
graphene_info, self._external_repository_handle, self._external_partition_set.name
Expand All @@ -224,6 +244,18 @@ def resolve_repositoryOrigin(self, _):
origin = self._external_partition_set.get_external_origin().external_repository_origin
return GrapheneRepositoryOrigin(origin)

def resolve_backfills(self, graphene_info, **kwargs):
matching = [
backfill
for backfill in graphene_info.context.instance.get_backfills(
cursor=kwargs.get("cursor"),
)
if backfill.partition_set_origin.partition_set_name == self._external_partition_set.name
and backfill.partition_set_origin.external_repository_origin.repository_name
== self._external_repository_handle.repository_name
]
return [GraphenePartitionBackfill(backfill) for backfill in matching[: kwargs.get("limit")]]


class GraphenePartitionSetOrError(graphene.Union):
class Meta:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
backfillId
status
numRequested
numTotal
partitionNames
fromFailure
reexecutionSteps
}
Expand Down Expand Up @@ -62,6 +62,22 @@
}
"""

GET_PARTITION_BACKFILLS_QUERY = """
query PartitionBackfillsQuery($repositorySelector: RepositorySelector!, $partitionSetName: String!) {
partitionSetOrError(repositorySelector: $repositorySelector, partitionSetName: $partitionSetName) {
__typename
...on PartitionSet {
name
pipelineName
backfills {
backfillId
}
}
}
}
"""


class TestDaemonPartitionBackfill(ExecutingGraphQLContextTestMatrix):
def test_launch_full_pipeline_backfill(self, graphql_context):
Expand Down Expand Up @@ -94,7 +110,38 @@ def test_launch_full_pipeline_backfill(self, graphql_context):
assert result.data["partitionBackfillOrError"]["__typename"] == "PartitionBackfill"
assert result.data["partitionBackfillOrError"]["status"] == "REQUESTED"
assert result.data["partitionBackfillOrError"]["numRequested"] == 0
assert result.data["partitionBackfillOrError"]["numTotal"] == 2
assert len(result.data["partitionBackfillOrError"]["partitionNames"]) == 2

def test_get_partition_backfills(self, graphql_context):
repository_selector = infer_repository_selector(graphql_context)
# launch a backfill for this partition set
launch_result = execute_dagster_graphql(
graphql_context,
LAUNCH_PARTITION_BACKFILL_MUTATION,
variables={
"backfillParams": {
"selector": {
"repositorySelector": repository_selector,
"partitionSetName": "integer_partition",
},
"partitionNames": ["2", "3"],
}
},
)
backfill_id = launch_result.data["launchPartitionBackfill"]["backfillId"]
result = execute_dagster_graphql(
graphql_context,
GET_PARTITION_BACKFILLS_QUERY,
variables={
"repositorySelector": repository_selector,
"partitionSetName": "integer_partition",
},
)
assert not result.errors
assert result.data
assert result.data["partitionSetOrError"]["__typename"] == "PartitionSet"
assert len(result.data["partitionSetOrError"]["backfills"]) == 1
assert result.data["partitionSetOrError"]["backfills"][0]["backfillId"] == backfill_id

def test_launch_partial_backfill(self, graphql_context):
# execute a full pipeline, without the failure environment variable
Expand Down Expand Up @@ -131,7 +178,7 @@ def test_launch_partial_backfill(self, graphql_context):
assert result.data["partitionBackfillOrError"]["__typename"] == "PartitionBackfill"
assert result.data["partitionBackfillOrError"]["status"] == "REQUESTED"
assert result.data["partitionBackfillOrError"]["numRequested"] == 0
assert result.data["partitionBackfillOrError"]["numTotal"] == 2
assert len(result.data["partitionBackfillOrError"]["partitionNames"]) == 2
assert result.data["partitionBackfillOrError"]["reexecutionSteps"] == ["after_failure"]

def test_cancel_backfill(self, graphql_context):
Expand Down Expand Up @@ -164,7 +211,7 @@ def test_cancel_backfill(self, graphql_context):
assert result.data["partitionBackfillOrError"]["__typename"] == "PartitionBackfill"
assert result.data["partitionBackfillOrError"]["status"] == "REQUESTED"
assert result.data["partitionBackfillOrError"]["numRequested"] == 0
assert result.data["partitionBackfillOrError"]["numTotal"] == 2
assert len(result.data["partitionBackfillOrError"]["partitionNames"]) == 2

result = execute_dagster_graphql(
graphql_context, CANCEL_BACKFILL_MUTATION, variables={"backfillId": backfill_id}
Expand Down Expand Up @@ -210,7 +257,7 @@ def test_resume_backfill(self, graphql_context):
assert result.data["partitionBackfillOrError"]["__typename"] == "PartitionBackfill"
assert result.data["partitionBackfillOrError"]["status"] == "REQUESTED"
assert result.data["partitionBackfillOrError"]["numRequested"] == 0
assert result.data["partitionBackfillOrError"]["numTotal"] == 2
assert len(result.data["partitionBackfillOrError"]["partitionNames"]) == 2

# manually mark as failed
backfill = graphql_context.instance.get_backfill(backfill_id)
Expand Down Expand Up @@ -289,7 +336,7 @@ def test_launch_from_failure(self, graphql_context):
assert result.data["partitionBackfillOrError"]["__typename"] == "PartitionBackfill"
assert result.data["partitionBackfillOrError"]["status"] == "REQUESTED"
assert result.data["partitionBackfillOrError"]["numRequested"] == 0
assert result.data["partitionBackfillOrError"]["numTotal"] == 2
assert len(result.data["partitionBackfillOrError"]["partitionNames"]) == 2
assert result.data["partitionBackfillOrError"]["fromFailure"]

def test_launch_backfill_with_all_partitions_flag(self, graphql_context):
Expand Down Expand Up @@ -323,4 +370,4 @@ def test_launch_backfill_with_all_partitions_flag(self, graphql_context):
assert result.data["partitionBackfillOrError"]["__typename"] == "PartitionBackfill"
assert result.data["partitionBackfillOrError"]["status"] == "REQUESTED"
assert result.data["partitionBackfillOrError"]["numRequested"] == 0
assert result.data["partitionBackfillOrError"]["numTotal"] == 10
assert len(result.data["partitionBackfillOrError"]["partitionNames"]) == 10

0 comments on commit 1cfcac3

Please sign in to comment.