Skip to content

Commit

Permalink
enable providing asset_key to build_output_context (#7696)
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed May 3, 2022
1 parent d6af7e1 commit 857079d
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 17 deletions.
46 changes: 29 additions & 17 deletions python_modules/dagster/dagster/core/execution/context/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class OutputContext:
resources (Optional[Resources]): The resources required by the output manager, specified by the
`required_resource_keys` parameter.
op_def (Optional[OpDefinition]): The definition of the op that produced the output.
asset_info: Optional[AssetOutputInfo]: (Experimental) Asset info corresponding to the
output.
"""

def __init__(
Expand All @@ -73,6 +75,7 @@ def __init__(
resources: Optional[Union["Resources", Dict[str, Any]]] = None,
step_context: Optional["StepExecutionContext"] = None,
op_def: Optional["OpDefinition"] = None,
asset_info: Optional[AssetOutputInfo] = None,
):
from dagster.core.definitions.resource_definition import IContainsGenerator, Resources
from dagster.core.execution.build_resources import build_resources
Expand All @@ -93,6 +96,7 @@ def __init__(
self._version = version
self._resource_config = resource_config
self._step_context = step_context
self._asset_info = asset_info

if isinstance(resources, Resources):
self._resources_cm = None
Expand Down Expand Up @@ -240,24 +244,21 @@ def resources(self) -> Any:

@property
def asset_info(self) -> Optional[AssetOutputInfo]:
if not self._name:
return None
# we cannot use self.step_context.solid_handle because when you create an InputContext
# using StepExecutionContext.for_input_manager, this will create an OutputContext object
# which has the same step_context as the source InputContext (meaning the solid_handle on
# that object will correspond to the step the output is loaded in, rather than the step
# in which it was created).
node_handle = self.step_context.execution_plan.get_step_by_key(self.step_key).solid_handle
return self.step_context.pipeline_def.asset_layer.asset_info_for_output(
node_handle=node_handle, output_name=self.name
)
return self._asset_info

@property
def has_asset_key(self) -> bool:
return self._asset_info is not None

@property
def asset_key(self) -> Optional[AssetKey]:
asset_info = self.asset_info
if asset_info is None:
return None
return asset_info.key
def asset_key(self) -> AssetKey:
if self._asset_info is None:
raise DagsterInvariantViolationError(
"Attempting to access asset_key, "
"but it was not provided when constructing the OutputContext"
)

return self._asset_info.key

@property
def step_context(self) -> "StepExecutionContext":
Expand Down Expand Up @@ -556,6 +557,11 @@ def get_output_context(
io_manager_key = output_def.io_manager_key
resource_config = resolved_run_config.resources[io_manager_key].config

node_handle = execution_plan.get_step_by_key(step.key).solid_handle
asset_info = pipeline_def.asset_layer.asset_info_for_output(
node_handle=node_handle, output_name=step_output.name
)

if step_context:
check.invariant(
not resources,
Expand All @@ -580,6 +586,7 @@ def get_output_context(
step_context=step_context,
resource_config=resource_config,
resources=resources,
asset_info=asset_info,
)


Expand Down Expand Up @@ -614,6 +621,7 @@ def build_output_context(
resources: Optional[Dict[str, Any]] = None,
solid_def: Optional[SolidDefinition] = None,
op_def: Optional[OpDefinition] = None,
asset_key: Optional[Union[AssetKey, str]] = None,
) -> "OutputContext":
"""Builds output context from provided parameters.
Expand All @@ -637,7 +645,9 @@ def build_output_context(
For a given key, you can provide either an actual instance of an object, or a resource
definition.
solid_def (Optional[SolidDefinition]): The definition of the solid that produced the output.
op_def (Optional[OpDefinition]): The definition of the solid that produced the output.
op_def (Optional[OpDefinition]): The definition of the op that produced the output.
asset_key: Optional[Union[AssetKey, Sequence[str], str]]: The asset key corresponding to the
output.
Examples:
Expand All @@ -663,6 +673,7 @@ def build_output_context(
resources = check.opt_dict_param(resources, "resources", key_type=str)
solid_def = check.opt_inst_param(solid_def, "solid_def", SolidDefinition)
op_def = check.opt_inst_param(op_def, "op_def", OpDefinition)
asset_key = AssetKey.from_coerceable(asset_key) if asset_key else None

return OutputContext(
step_key=step_key,
Expand All @@ -680,4 +691,5 @@ def build_output_context(
resources=resources,
step_context=None,
op_def=op_def,
asset_info=AssetOutputInfo(key=asset_key) if asset_key else None,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from dagster import AssetKey, build_output_context


def test_build_output_context_asset_key():
assert build_output_context(asset_key="apple").asset_key == AssetKey("apple")
assert build_output_context(asset_key=["apple", "banana"]).asset_key == AssetKey(
["apple", "banana"]
)
assert build_output_context(asset_key=AssetKey("apple")).asset_key == AssetKey("apple")

0 comments on commit 857079d

Please sign in to comment.