Skip to content

Commit

Permalink
GQL Add dynamic partition mutation (#12562)
Browse files Browse the repository at this point in the history
In Dagit, we want to be able to add a new partition to a dynamic
partitions definition through the UI.

This PR adds a mutation that allows for adding this new partition.
  • Loading branch information
clairelin135 committed Mar 1, 2023
1 parent c78992f commit 216c9fc
Show file tree
Hide file tree
Showing 11 changed files with 295 additions and 2 deletions.
1 change: 1 addition & 0 deletions js_modules/dagit/packages/core/src/app/Permissions.tsx
Expand Up @@ -18,6 +18,7 @@ export const EXPECTED_PERMISSIONS = {
wipe_assets: true,
launch_partition_backfill: true,
cancel_partition_backfill: true,
edit_dynamic_partitions: true,
};

export type PermissionResult = {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Large diffs are not rendered by default.

22 changes: 22 additions & 0 deletions js_modules/dagit/packages/core/src/graphql/schema.graphql

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 26 additions & 0 deletions js_modules/dagit/packages/core/src/graphql/types.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

@@ -0,0 +1,80 @@
from typing import TYPE_CHECKING

from dagster._core.definitions.selector import (
RepositorySelector,
)
from dagster._core.workspace.permissions import Permissions

from dagster_graphql.schema.errors import GrapheneDuplicateDynamicPartitionError

from ..utils import UserFacingGraphQLError, assert_permission_for_location, capture_error

if TYPE_CHECKING:
from ...schema.inputs import GrapheneRepositorySelector
from ...schema.partition_sets import GrapheneAddDynamicPartitionSuccess


def _repository_contains_dynamic_partitions_def(
graphene_info, repository_selector: RepositorySelector, partitions_def_name: str
) -> bool:
from dagster._core.host_representation.external_data import (
ExternalDynamicPartitionsDefinitionData,
)

if graphene_info.context.has_repository_location(repository_selector.location_name):
repo_loc = graphene_info.context.get_repository_location(repository_selector.location_name)
if repo_loc.has_repository(repository_selector.repository_name):
repository = repo_loc.get_repository(repository_selector.repository_name)
matching_dynamic_partitions_defs = [
asset_node.partitions_def_data
for asset_node in repository.external_repository_data.external_asset_graph_data
if asset_node.partitions_def_data
and isinstance(
asset_node.partitions_def_data, ExternalDynamicPartitionsDefinitionData
)
and asset_node.partitions_def_data.name == partitions_def_name
]
return len(matching_dynamic_partitions_defs) > 0
return False


@capture_error
def add_dynamic_partition(
graphene_info,
repository_selector: "GrapheneRepositorySelector",
partitions_def_name: str,
partition_key: str,
) -> "GrapheneAddDynamicPartitionSuccess":
from dagster_graphql.schema.errors import GrapheneUnauthorizedError

from ...schema.partition_sets import GrapheneAddDynamicPartitionSuccess

unpacked_repository_selector = RepositorySelector.from_graphql_input(repository_selector)

assert_permission_for_location(
graphene_info,
Permissions.EDIT_DYNAMIC_PARTITIONS,
unpacked_repository_selector.location_name,
)

if not _repository_contains_dynamic_partitions_def(
graphene_info, unpacked_repository_selector, partitions_def_name
):
raise UserFacingGraphQLError(
GrapheneUnauthorizedError(
message=(
"The repository does not contain a dynamic partitions definition with the given"
" name."
)
)
)

if graphene_info.context.instance.has_dynamic_partition(partitions_def_name, partition_key):
raise UserFacingGraphQLError(
GrapheneDuplicateDynamicPartitionError(partitions_def_name, partition_key)
)

graphene_info.context.instance.add_dynamic_partitions(partitions_def_name, [partition_key])
return GrapheneAddDynamicPartitionSuccess(
partitionsDefName=partitions_def_name, partitionKey=partition_key
)
19 changes: 19 additions & 0 deletions python_modules/dagster-graphql/dagster_graphql/schema/errors.py
Expand Up @@ -495,6 +495,24 @@ def __init__(self, message=None):
self.message = message if message else "Authorization failed"


class GrapheneDuplicateDynamicPartitionError(graphene.ObjectType):
class Meta:
interfaces = (GrapheneError,)
name = "DuplicateDynamicPartitionError"

partitions_def_name = graphene.NonNull(graphene.String)
partition_name = graphene.NonNull(graphene.String)

def __init__(self, partitions_def_name, partition_name):
super().__init__()
self.partitions_def_name = check.str_param(partitions_def_name, "partitions_def_name")
self.partition_name = check.str_param(partition_name, "partition_name")
self.message = (
f"Partition {self.partition_name} already exists in dynamic partitions definition"
f" {self.partitions_def_name}."
)


types = [
GrapheneAssetNotFoundError,
GrapheneConflictingExecutionParamsError,
Expand Down Expand Up @@ -525,4 +543,5 @@ def __init__(self, message=None):
GrapheneScheduleNotFoundError,
GrapheneSchedulerNotDefinedError,
GrapheneSensorNotFoundError,
GrapheneDuplicateDynamicPartitionError,
]
Expand Up @@ -27,9 +27,11 @@

from .backfill import GraphenePartitionBackfill
from .errors import (
GrapheneDuplicateDynamicPartitionError,
GraphenePartitionSetNotFoundError,
GraphenePipelineNotFoundError,
GraphenePythonError,
GrapheneUnauthorizedError,
)
from .inputs import GrapheneRunsFilter
from .pipelines.pipeline import GrapheneRun
Expand All @@ -39,6 +41,25 @@
from .util import ResolveInfo, non_null_list


class GrapheneAddDynamicPartitionSuccess(graphene.ObjectType):
partitionsDefName = graphene.NonNull(graphene.String)
partitionKey = graphene.NonNull(graphene.String)

class Meta:
name = "AddDynamicPartitionSuccess"


class GrapheneAddDynamicPartitionResult(graphene.Union):
class Meta:
types = (
GrapheneAddDynamicPartitionSuccess,
GrapheneUnauthorizedError,
GraphenePythonError,
GrapheneDuplicateDynamicPartitionError,
)
name = "AddDynamicPartitionResult"


class GraphenePartitionTags(graphene.ObjectType):
results = non_null_list(GraphenePipelineTag)

Expand Down
Expand Up @@ -11,6 +11,7 @@
create_and_launch_partition_backfill,
resume_partition_backfill,
)
from dagster_graphql.implementation.execution.dynamic_partitions import add_dynamic_partition
from dagster_graphql.implementation.execution.launch_execution import (
launch_pipeline_execution,
launch_pipeline_reexecution,
Expand Down Expand Up @@ -57,7 +58,9 @@
GrapheneExecutionParams,
GrapheneLaunchBackfillParams,
GrapheneReexecutionParams,
GrapheneRepositorySelector,
)
from ..partition_sets import GrapheneAddDynamicPartitionResult
from ..pipelines.pipeline import GrapheneRun
from ..runs import (
GrapheneLaunchRunReexecutionResult,
Expand Down Expand Up @@ -329,6 +332,33 @@ def mutate(self, graphene_info: ResolveInfo, backfillId: str):
return resume_partition_backfill(graphene_info, backfillId)


class GrapheneAddDynamicPartitionMutation(graphene.Mutation):
"""Adds a partition to a dynamic partition set."""

Output = graphene.NonNull(GrapheneAddDynamicPartitionResult)

class Arguments:
repositorySelector = graphene.NonNull(GrapheneRepositorySelector)
partitionsDefName = graphene.NonNull(graphene.String)
partitionKey = graphene.NonNull(graphene.String)

class Meta:
name = "AddDynamicPartitionMutation"

@capture_error
@require_permission_check(Permissions.EDIT_DYNAMIC_PARTITIONS)
def mutate(
self,
graphene_info: ResolveInfo,
repositorySelector: GrapheneRepositorySelector,
partitionsDefName: str,
partitionKey: str,
):
return add_dynamic_partition(
graphene_info, repositorySelector, partitionsDefName, partitionKey
)


@capture_error
def create_execution_params_and_launch_pipeline_reexec(graphene_info, execution_params_dict):
# refactored into a helper function here in order to wrap with @capture_error,
Expand Down Expand Up @@ -694,3 +724,4 @@ class Meta:
cancel_partition_backfill = GrapheneCancelBackfillMutation.Field()
log_telemetry = GrapheneLogTelemetryMutation.Field()
set_nux_seen = GrapheneSetNuxSeenMutation.Field()
add_dynamic_partition = GrapheneAddDynamicPartitionMutation.Field()

0 comments on commit 216c9fc

Please sign in to comment.