Skip to content

Commit

Permalink
Fix partitioned asset jobs with double downstream non-partitioned (#6986
Browse files Browse the repository at this point in the history
)
  • Loading branch information
sryza committed Mar 11, 2022
1 parent fae347c commit b86b0f1
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def run_config_for_partition_fn(partition_key: str) -> Dict[str, Any]:

inputs_dict: Dict[str, Dict[str, Any]] = {}
for in_asset_key, input_def in assets_def.input_defs_by_asset_key.items():
upstream_partitions_def = partitions_defs_by_asset_key[in_asset_key]
upstream_partitions_def = partitions_defs_by_asset_key.get(in_asset_key)
if assets_def.partitions_def is not None and upstream_partitions_def is not None:
upstream_partition_key_range = get_upstream_partitions_for_partition_range(
assets_def, upstream_partitions_def, in_asset_key, asset_partition_key_range
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ class MyIOManager(IOManager):
def handle_output(self, context, obj):
if context.op_def.name == "upstream_asset":
assert context.asset_partition_key == "b"
elif context.op_def.name == "downstream_asset":
elif context.op_def.name in ["downstream_asset", "double_downstream_asset"]:
assert not context.has_asset_partitions
with pytest.raises(Exception): # TODO: better error message
assert context.asset_partition_key_range
Expand All @@ -258,9 +258,13 @@ def upstream_asset(context):
def downstream_asset(upstream_asset):
assert upstream_asset is None

@asset
def double_downstream_asset(downstream_asset):
assert downstream_asset is None

my_job = build_assets_job(
"my_job",
assets=[upstream_asset, downstream_asset],
assets=[upstream_asset, downstream_asset, double_downstream_asset],
resource_defs={"io_manager": IOManagerDefinition.hardcoded_io_manager(MyIOManager())},
)
result = my_job.execute_in_process(partition_key="b")
Expand Down

0 comments on commit b86b0f1

Please sign in to comment.