Skip to content

Commit

Permalink
OpExecutionContext.output_asset_key (#7961)
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed May 25, 2022
1 parent e7fca20 commit 4e1a53f
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,15 @@ def selected_output_names(self) -> AbstractSet[str]:
selected_outputs.add(output_name)
return selected_outputs

def asset_key_for_output(self, output_name: str = "result") -> AssetKey:
asset_output_info = self.pipeline_def.asset_layer.asset_info_for_output(
node_handle=self.op_handle, output_name=output_name
)
if asset_output_info is None:
check.failed(f"Output '{output_name}' has no asset")
else:
return asset_output_info.key

def output_asset_partition_key(self, output_name: str = "result") -> str:
"""Returns the asset partition key for the given output. Defaults to "result", which is the
name of the default output.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ def _asset_keys_for_node(result, node_name):

def test_single_asset_pipeline():
@asset
def asset1():
def asset1(context):
assert context.asset_key_for_output() == AssetKey(["asset1"])
return 1

job = build_assets_job("a", [asset1])
Expand Down Expand Up @@ -730,6 +731,25 @@ def my_graph(x, y):
assert assets_def.asset_keys_by_output_name["result"] == AssetKey("my_graph")


def test_execute_graph_asset():
@op(out={"x": Out(), "y": Out()})
def x_op(context):
assert context.asset_key_for_output("x") == AssetKey("x_asset")
return 1, 2

@graph(out={"x": GraphOut(), "y": GraphOut()})
def my_graph():
x, y = x_op()
return {"x": x, "y": y}

assets_def = AssetsDefinition.from_graph(
graph_def=my_graph,
asset_keys_by_output_name={"y": AssetKey("y_asset"), "x": AssetKey("x_asset")},
)

assert AssetGroup([assets_def]).build_job("abc").execute_in_process().success


def test_graph_asset_partitioned():
@op
def my_op(context):
Expand Down

0 comments on commit 4e1a53f

Please sign in to comment.