Skip to content

Commit

Permalink
test dynamic_output output for node (#7718)
Browse files Browse the repository at this point in the history
  • Loading branch information
clairelin135 committed May 4, 2022
1 parent 948499e commit ae55df0
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,10 @@ def _filter_outputs_by_handle(

# For the mapped output case, where step keys are in the format
# "step_key[upstream_mapped_output_name]" within the step output handle.
if step_output_handle.step_key.startswith(f"{step_key}["):
if (
step_output_handle.step_key.startswith(f"{step_key}[")
and step_output_handle.output_name == output_name
):
output_found = True
key_start = step_output_handle.step_key.find("[")
key_end = step_output_handle.step_key.find("]")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
DynamicOut,
DynamicOutput,
Out,
Output,
daily_partitioned_config,
job,
op,
Expand Down Expand Up @@ -306,3 +307,31 @@ def my_failure_job():
result = my_failure_job.execute_in_process(raise_on_error=False)
assert not result.success
assert not result.dagster_run.is_success


def test_dynamic_output_for_node():
@op(out=DynamicOut())
def fanout():
for i in range(3):
yield DynamicOutput(value=i, mapping_key=str(i))

@op(
out={
"output1": Out(int),
"output2": Out(int),
}
)
def return_as_tuple(x):
yield Output(value=x, output_name="output1")
yield Output(value=5, output_name="output2")

@job
def myjob():
fanout().map(return_as_tuple)

# get result
result = myjob.execute_in_process()

# assertions
assert result.output_for_node("return_as_tuple", "output1") == {"0": 0, "1": 1, "2": 2}
assert result.output_for_node("return_as_tuple", "output2") == {"0": 5, "1": 5, "2": 5}

0 comments on commit ae55df0

Please sign in to comment.