Skip to content

Commit

Permalink
dynamic out bug fix (#6780)
Browse files Browse the repository at this point in the history
Fixes a bug where we were not properly handling dependency resolution in some more complex cases. 

## Test Plan

added test case that failed before the fix
  • Loading branch information
alangenfeld committed Mar 21, 2022
1 parent dfeff6c commit 4812f93
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 4 deletions.
11 changes: 8 additions & 3 deletions python_modules/dagster/dagster/core/execution/plan/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -1310,7 +1310,9 @@ def _get_steps_to_execute_by_level(


def _get_executable_step_deps(
step_dict, step_handles_to_execute, executable_map
step_dict,
step_handles_to_execute,
executable_map,
) -> Dict[str, Set[str]]:
"""
Returns:
Expand All @@ -1329,8 +1331,11 @@ def _get_executable_step_deps(
filtered_deps = []
depends_on_unresolved = False
for dep in step.get_execution_dependency_keys():
if dep in executable_map and dep not in unresolved_set:
filtered_deps.append(dep)
if dep in executable_map:
if dep not in unresolved_set:
filtered_deps.append(dep)
else:
depends_on_unresolved = True
elif dep in step_keys_to_execute:
depends_on_unresolved = True

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,3 +260,27 @@ def test_dealloc_prev_outputs():

# there may be 1 still referenced by outer iteration frames
assert result.output_for_solid("spawn", "refs") <= 1


def test_collect_and_map():
@op(out=DynamicOut())
def dyn_vals():
for i in range(3):
yield DynamicOutput(i, mapping_key=f"num_{i}")

@op
def echo(x):
return x

@op
def add_each(vals, x):
return [v + x for v in vals]

@graph
def both_w_echo():
d1 = dyn_vals()
r = d1.map(lambda x: add_each(echo(d1.collect()), x))
echo.alias("final")(r.collect())

result = both_w_echo.execute_in_process()
assert result.output_for_node("final") == [[0, 1, 2], [1, 2, 3], [2, 3, 4]]
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,6 @@ def _bad():
):

@pipeline
def _bad():
def _bad_other():
x = dynamic_solid()
x.map(lambda y: add(x.collect(), y))

0 comments on commit 4812f93

Please sign in to comment.