Skip to content

Commit

Permalink
Fixes to root input manager memoization and no context provided (#7316)
Browse files Browse the repository at this point in the history
  • Loading branch information
dpeng817 committed Apr 6, 2022
1 parent 504c65f commit 8dca877
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 7 deletions.
16 changes: 11 additions & 5 deletions python_modules/dagster/dagster/core/execution/plan/inputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from dagster.core.definitions import InputDefinition, NodeHandle, PipelineDefinition
from dagster.core.definitions.events import AssetLineageInfo
from dagster.core.definitions.metadata import MetadataEntry
from dagster.core.definitions.version_strategy import ResourceVersionContext
from dagster.core.errors import (
DagsterExecutionLoadInputError,
DagsterInvariantViolationError,
Expand Down Expand Up @@ -183,9 +184,18 @@ def compute_version(self, step_versions, pipeline_def, resolved_run_config) -> O
root_manager_key
]

solid_config = resolved_run_config.solids.get(solid.name)
input_config = solid_config.inputs.get(self.input_name)
resource_config = resolved_run_config.resources.get(root_manager_key).config

version_context = ResourceVersionContext(
resource_def=root_manager_def,
resource_config=resource_config,
)

if pipeline_def.version_strategy is not None:
root_manager_def_version = pipeline_def.version_strategy.get_resource_version(
root_manager_def
version_context
)
else:
root_manager_def_version = root_manager_def.version
Expand All @@ -198,10 +208,6 @@ def compute_version(self, step_versions, pipeline_def, resolved_run_config) -> O
)

check_valid_version(root_manager_def_version)

solid_config = resolved_run_config.solids.get(solid.name)
input_config = solid_config.inputs.get(self.input_name)
resource_config = resolved_run_config.resources.get(root_manager_key).config
return join_and_hash(
resolve_config_version(input_config),
resolve_config_version(resource_config),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
from dagster.core.definitions.definition_config_schema import (
convert_user_facing_definition_config_schema,
)
from dagster.core.definitions.resource_definition import ResourceDefinition
from dagster.core.definitions.resource_definition import ResourceDefinition, is_context_provided
from dagster.core.storage.input_manager import InputManager
from dagster.utils.backcompat import experimental

from ..decorator_utils import get_function_params


class IInputManagerDefinition:
@property
Expand Down Expand Up @@ -159,7 +161,11 @@ def __init__(self, load_fn):
self._load_fn = load_fn

def load_input(self, context):
return self._load_fn(context)
return (
self._load_fn(context)
if is_context_provided(get_function_params(self._load_fn))
else self._load_fn()
)


class _InputManagerDecoratorCallable:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -920,3 +920,24 @@ def test_memoization_multiprocess_execution():
get_version_strategy_pipeline(), instance_ref=instance.get_ref()
)
assert len(memoized_plan.step_keys_to_execute) == 0


def test_source_hash_with_root_input_manager():
@root_input_manager
def my_input_manager():
return 5

@op(ins={"x": In(root_manager_key="manager")})
def the_op(x):
return x + 1

@job(version_strategy=SourceHashVersionStrategy(), resource_defs={"manager": my_input_manager})
def call_the_op():
the_op()

with instance_for_test() as instance:
result = call_the_op.execute_in_process(instance=instance)
assert result.success

memoized_plan = create_execution_plan(call_the_op, instance_ref=instance.get_ref())
assert len(memoized_plan.step_keys_to_execute) == 0

0 comments on commit 8dca877

Please sign in to comment.