Skip to content

Commit

Permalink
Auto infer multipartition <-> single dimension mapping (#12400)
Browse files Browse the repository at this point in the history
The existing `SingleDimensionDependencyMapping` class handles the
upstream single-dimension -> downstream multipartitioned dependency
relationship. This PR refactors this class to also handle the opposite
relationship (upstream multipartition -> downstream single dimension),
renaming to `MultiToSingleDimensionMapping`.

This class previously required a `partition_dimension_name` argument,
which this PR makes optional. This argument is required when the
multipartitions definition has dimension partition defs that are
equivalent, in which case the class throws an error.

The final change is that we can now auto-infer when we should use this
partition mapping--when a single dimension partitions def matches a
dimension of a multipartitions def, and the two assets have a dependency
relationship. This PR adds tests to ensure that we choose this partition
mapping in those cases, otherwise relying on the
`IdentityPartitionMapping` or `AllPartitionMapping`.
  • Loading branch information
clairelin135 committed Feb 17, 2023
1 parent 1884193 commit 2e2eca1
Show file tree
Hide file tree
Showing 15 changed files with 581 additions and 143 deletions.
1 change: 1 addition & 0 deletions python_modules/dagster/dagster/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@
AllPartitionMapping as AllPartitionMapping,
IdentityPartitionMapping as IdentityPartitionMapping,
LastPartitionMapping as LastPartitionMapping,
MultiToSingleDimensionPartitionMapping as MultiToSingleDimensionPartitionMapping,
PartitionMapping as PartitionMapping,
StaticPartitionMapping as StaticPartitionMapping,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@
AllPartitionMapping as AllPartitionMapping,
IdentityPartitionMapping as IdentityPartitionMapping,
LastPartitionMapping as LastPartitionMapping,
MultiToSingleDimensionPartitionMapping as MultiToSingleDimensionPartitionMapping,
PartitionMapping as PartitionMapping,
)
from .partitioned_schedule import (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,12 @@ def get_partitions_def(self, asset_key: AssetKey) -> Optional[PartitionsDefiniti
def get_partition_mapping(
self, asset_key: AssetKey, in_asset_key: AssetKey
) -> PartitionMapping:
partitions_def = self.get_partitions_def(asset_key)
partition_mappings = self._partition_mappings_by_key.get(asset_key) or {}
return infer_partition_mapping(partition_mappings.get(in_asset_key), partitions_def)
return infer_partition_mapping(
partition_mappings.get(in_asset_key),
self.get_partitions_def(asset_key),
self.get_partitions_def(in_asset_key),
)

def is_partitioned(self, asset_key: AssetKey) -> bool:
return self.get_partitions_def(asset_key) is not None
Expand Down
10 changes: 7 additions & 3 deletions python_modules/dagster/dagster/_core/definitions/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,12 +615,16 @@ def get_partition_mapping(self, in_asset_key: AssetKey) -> Optional[PartitionMap
def get_partition_mapping_for_input(self, input_name: str) -> Optional[PartitionMapping]:
return self._partition_mappings.get(self._keys_by_input_name[input_name])

def infer_partition_mapping(self, in_asset_key: AssetKey) -> PartitionMapping:
def infer_partition_mapping(
self, upstream_asset_key: AssetKey, upstream_partitions_def: Optional[PartitionsDefinition]
) -> PartitionMapping:
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=ExperimentalWarning)

partition_mapping = self._partition_mappings.get(in_asset_key)
return infer_partition_mapping(partition_mapping, self._partitions_def)
partition_mapping = self._partition_mappings.get(upstream_asset_key)
return infer_partition_mapping(
partition_mapping, self._partitions_def, upstream_partitions_def
)

def get_output_name_for_asset_key(self, key: AssetKey) -> str:
for output_name, asset_key in self.keys_by_output_name.items():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -963,9 +963,9 @@ def _validate_and_set_fan_out(self, node_input: NodeInput, node_output: NodeOutp
if not node_input.node.definition.input_supports_dynamic_output_dep(node_input.input_name):
raise DagsterInvalidDefinitionError(
f"{node_input.node.describe_node()} cannot be downstream of dynamic output"
f' "{node_output.describe()}" since input "{node_input.input_name}" maps to a node'
" that is already downstream of another dynamic output. Nodes cannot be downstream"
" of more than one dynamic output"
f' "{node_output.describe()}" since input "{node_input.input_name}" maps to a'
" node that is already downstream of another dynamic output. Nodes cannot be"
" downstream of more than one dynamic output"
)

if self._collect_index.get(node_input.node_name):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,9 @@ def get_downstream_partition_keys(
f"Asset key {from_asset_key} is not partitioned. Cannot get partition keys."
)

partition_mapping = to_asset.infer_partition_mapping(from_asset_key)
partition_mapping = to_asset.infer_partition_mapping(
from_asset_key, from_asset.partitions_def
)
downstream_partition_key_subset = (
partition_mapping.get_downstream_partitions_for_partitions(
from_asset.partitions_def.empty_subset().with_partition_keys([partition_key]),
Expand Down
5 changes: 0 additions & 5 deletions python_modules/dagster/dagster/_core/definitions/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,11 +278,6 @@ def get_first_partition_key(
else:
return None

def get_default_partition_mapping(self):
from dagster._core.definitions.partition_mapping import IdentityPartitionMapping

return IdentityPartitionMapping()

def get_partition_keys_in_range(
self,
partition_key_range: PartitionKeyRange,
Expand Down

0 comments on commit 2e2eca1

Please sign in to comment.