Skip to content

Commit

Permalink
allow combining memoization with step selection (#6431)
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed Apr 15, 2022
1 parent 920c759 commit a744f84
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 8 deletions.
16 changes: 10 additions & 6 deletions python_modules/dagster/dagster/core/execution/plan/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,17 +237,15 @@ def build(self) -> "ExecutionPlan":
# Expects that if step_keys_to_execute was set, that the `plan` variable will have the
# reflected step_keys_to_execute
if pipeline_def.is_using_memoization(self._tags) and len(step_output_versions) == 0:
if self.step_keys_to_execute is not None:
raise DagsterInvariantViolationError(
"Cannot use both memoization and re-execution at this time."
)
if self._instance_ref is None:
raise DagsterInvariantViolationError(
"Attempted to build memoized execution plan without providing a persistent "
"DagsterInstance to create_execution_plan."
)
instance = DagsterInstance.from_ref(self._instance_ref)
plan = plan.build_memoized_plan(pipeline_def, self.resolved_run_config, instance)
plan = plan.build_memoized_plan(
pipeline_def, self.resolved_run_config, instance, self.step_keys_to_execute
)

return plan

Expand Down Expand Up @@ -826,6 +824,7 @@ def build_memoized_plan(
pipeline_def: PipelineDefinition,
resolved_run_config: ResolvedRunConfig,
instance: DagsterInstance,
selected_step_keys: Optional[List[str]],
) -> "ExecutionPlan":
"""
Returns:
Expand Down Expand Up @@ -907,8 +906,13 @@ def build_memoized_plan(
if not io_manager.has_output(context):
unmemoized_step_keys.add(step_output_handle.step_key)

if selected_step_keys is not None:
# Take the intersection unmemoized steps and selected steps
step_keys_to_execute = list(unmemoized_step_keys & set(selected_step_keys))
else:
step_keys_to_execute = list(unmemoized_step_keys)
return self.build_subset_plan(
list(unmemoized_step_keys),
step_keys_to_execute,
pipeline_def,
resolved_run_config,
step_output_versions=step_output_versions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def test_memoized_plan_memoized_results():
] = 4

memoized_plan = plan.build_memoized_plan(
versioned_pipeline, resolved_run_config, instance=None
versioned_pipeline, resolved_run_config, instance=None, selected_step_keys=None
)

assert memoized_plan.step_keys_to_execute == ["versioned_solid_takes_input"]
Expand Down Expand Up @@ -470,7 +470,10 @@ def wrap_pipeline():
(step_output_handle.step_key, step_output_handle.output_name, step_output_version)
] = 4
memoized_plan = unmemoized_plan.build_memoized_plan(
wrap_pipeline, ResolvedRunConfig.build(wrap_pipeline), instance=None
wrap_pipeline,
ResolvedRunConfig.build(wrap_pipeline),
instance=None,
selected_step_keys=None,
)
assert len(memoized_plan.step_keys_to_execute) == 0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,60 @@ def get_resource_version(self, _):
assert len(recorder) == 1


def test_memoization_with_step_selection():
@op
def op1():
pass

@op
def op2(arg1):
del arg1

@graph
def my_graph():
op2(op1())

class MyVersionStrategy(VersionStrategy):
def get_op_version(self, context):
if context.op_def.name == op1.name:
return "foo"
else:
# op2 will not be memoized
import uuid

return str(uuid.uuid4()).replace("-", "_")

with tempfile.TemporaryDirectory() as temp_dir:
with instance_for_test(temp_dir=temp_dir) as instance:
my_job = my_graph.to_job(
version_strategy=MyVersionStrategy(),
resource_defs={
"io_manager": versioned_filesystem_io_manager.configured(
{"base_dir": temp_dir}
),
},
tags={MEMOIZED_RUN_TAG: "True"},
)
single_op_selected_plan = create_execution_plan(
my_job, instance_ref=instance.get_ref(), step_keys_to_execute=["op1"]
)
assert len(single_op_selected_plan.step_keys_to_execute) == 1
assert single_op_selected_plan.step_keys_to_execute == ["op1"]

result = my_job.execute_in_process(instance=instance)
assert result.success

assert (
create_execution_plan(
my_job, instance_ref=instance.get_ref(), step_keys_to_execute=["op1"]
).step_keys_to_execute
== []
)
assert create_execution_plan(
my_job, instance_ref=instance.get_ref(), step_keys_to_execute=["op2"]
).step_keys_to_execute == ["op2"]


def test_memoization_with_default_strategy_overriden():
version = ["foo"]

Expand Down

0 comments on commit a744f84

Please sign in to comment.