Skip to content

Commit

Permalink
fix loading partitioned source assets in non-partitioned non-asset jo…
Browse files Browse the repository at this point in the history
…bs (#12586)

### Summary & Motivation

Fixes #12575

The context passed to `load_input` will look like the context passed to
`load_input` when a non-partitioned asset depends on a partitioned
asset. I.e., by default, it will load all partitions of the partitioned
asset.

In the future, we might want to enable specifying a PartitionMapping in
some way, but that's out of the scope of this PR

### How I Tested These Changes
  • Loading branch information
sryza committed Feb 28, 2023
1 parent c8bd24a commit a41fecf
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 4 deletions.
Expand Up @@ -932,12 +932,10 @@ def wipe_input_asset_record(self, key: AssetKey) -> None:

def has_asset_partitions_for_input(self, input_name: str) -> bool:
asset_layer = self.pipeline_def.asset_layer
assets_def = asset_layer.assets_def_for_node(self.node_handle)
upstream_asset_key = asset_layer.asset_key_for_input(self.node_handle, input_name)

return (
upstream_asset_key is not None
and assets_def is not None
and asset_layer.partitions_def_for_asset(upstream_asset_key) is not None
)

Expand Down Expand Up @@ -965,8 +963,8 @@ def asset_partitions_subset_for_input(self, input_name: str) -> PartitionsSubset
if upstream_asset_key is not None:
upstream_asset_partitions_def = asset_layer.partitions_def_for_asset(upstream_asset_key)

if assets_def is not None and upstream_asset_partitions_def is not None:
partitions_def = assets_def.partitions_def
if upstream_asset_partitions_def is not None:
partitions_def = assets_def.partitions_def if assets_def else None
partitions_subset = (
partitions_def.empty_subset().with_partition_key_range(
self.asset_partition_key_range, dynamic_partitions_store=self.instance
Expand Down
Expand Up @@ -94,6 +94,34 @@ def job1():
assert io_manager.loaded_input


def test_non_partitioned_job_partitioned_source_asset():
partitions_def = StaticPartitionsDefinition(["foo", "bar"])
asset1 = SourceAsset("asset1", partitions_def=partitions_def)

class MyIOManager(IOManager):
def handle_output(self, context, obj):
...

def load_input(self, context):
self.loaded_input = True
assert context.asset_key == asset1.key
assert set(context.asset_partition_keys) == {"foo", "bar"}
return 5

@op
def op1(input1):
assert input1 == 5

io_manager = MyIOManager()

@job(resource_defs={"io_manager": IOManagerDefinition.hardcoded_io_manager(io_manager)})
def job1():
op1(asset1)

assert job1.execute_in_process().success
assert io_manager.loaded_input


def test_multiple_source_asset_inputs():
asset1 = SourceAsset("asset1", io_manager_key="iomanager1")
asset2 = SourceAsset("asset2", io_manager_key="iomanager2")
Expand Down

0 comments on commit a41fecf

Please sign in to comment.