Skip to content

Commit

Permalink
output_asset_partition_key -> asset_partition_key_for_output (#8327)
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed Jun 10, 2022
1 parent 81de3d9 commit 70558e1
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ from dagster import DailyPartitionsDefinition, asset
@asset(partitions_def=DailyPartitionsDefinition(start_date="2022-01-01"))
def my_daily_partitioned_asset(context):
context.log.info(
f"Processing asset partition '{context.output_asset_partition_key()}'"
f"Processing asset partition '{context.asset_partition_key_for_output()}'"
)
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
@asset(partitions_def=DailyPartitionsDefinition(start_date="2022-01-01"))
def my_daily_partitioned_asset(context):
context.log.info(
f"Processing asset partition '{context.output_asset_partition_key()}'"
f"Processing asset partition '{context.asset_partition_key_for_output()}'"
)
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def id_range_for_time(context) -> Output[Tuple[int, int]]:
"""
For the configured time partition, searches for the range of ids that were created in that time.
"""
start, end = context.output_asset_partitions_time_window()
start, end = context.asset_partitions_time_window_for_output()
id_range, metadata = _id_range_for_time(
start.timestamp(), end.timestamp(), context.resources.hn_client
)
Expand Down
19 changes: 19 additions & 0 deletions python_modules/dagster/dagster/core/execution/context/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from dagster.core.instance import DagsterInstance
from dagster.core.log_manager import DagsterLogManager
from dagster.core.storage.pipeline_run import DagsterRun, PipelineRun
from dagster.utils.backcompat import deprecation_warning
from dagster.utils.forked_pdb import ForkedPdb

from .system import StepExecutionContext
Expand Down Expand Up @@ -322,12 +323,30 @@ def asset_key_for_output(self, output_name: str = "result") -> AssetKey:
return asset_output_info.key

def output_asset_partition_key(self, output_name: str = "result") -> str:
deprecation_warning(
"OpExecutionContext.output_asset_partition_key",
"0.16.0",
additional_warn_txt="Use OpExecutionContext.asset_partition_key_for_output instead.",
)

return self.asset_partition_key_for_output(output_name)

def asset_partition_key_for_output(self, output_name: str = "result") -> str:
"""Returns the asset partition key for the given output. Defaults to "result", which is the
name of the default output.
"""
return self._step_execution_context.asset_partition_key_for_output(output_name)

def output_asset_partitions_time_window(self, output_name: str = "result") -> TimeWindow:
deprecation_warning(
"OpExecutionContext.output_asset_partitions_time_window",
"0.16.0",
additional_warn_txt="Use OpExecutionContext.asset_partitions_time_window_for_output instead.",
)

return self.asset_partitions_time_window_for_output(output_name)

def asset_partitions_time_window_for_output(self, output_name: str = "result") -> TimeWindow:
"""The time window for the partitions of the output asset.
Raises an error if either of the following are true:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,14 +214,14 @@ def load_input(self, context):

@asset(partitions_def=upstream_partitions_def)
def upstream_asset(context):
assert context.output_asset_partition_key() == "2"
assert context.asset_partition_key_for_output() == "2"

@asset(
partitions_def=downstream_partitions_def,
partition_mappings={"upstream_asset": TrailingWindowPartitionMapping()},
)
def downstream_asset(context, upstream_asset):
assert context.output_asset_partition_key() == "2"
assert context.asset_partition_key_for_output() == "2"
assert upstream_asset is None

my_job = build_assets_job(
Expand Down Expand Up @@ -257,7 +257,7 @@ def load_input(self, context):

@asset(partitions_def=upstream_partitions_def)
def upstream_asset(context):
assert context.output_asset_partition_key() == "b"
assert context.asset_partition_key_for_output() == "b"

@asset
def downstream_asset(upstream_asset):
Expand Down Expand Up @@ -614,24 +614,24 @@ def load_input(self, context):
partitions_def=upstream_partitions_def,
)
def upstream_asset(context):
assert context.output_asset_partition_key("out1") == "2"
assert context.output_asset_partition_key("out2") == "2"
assert context.asset_partition_key_for_output("out1") == "2"
assert context.asset_partition_key_for_output("out2") == "2"
return (Output(1, output_name="out1"), Output(2, output_name="out2"))

@asset(
partitions_def=downstream_partitions_def,
partition_mappings={"upstream_asset_1": TrailingWindowPartitionMapping()},
)
def downstream_asset_1(context, upstream_asset_1):
assert context.output_asset_partition_key() == "2"
assert context.asset_partition_key_for_output() == "2"
assert upstream_asset_1 is None

@asset(
partitions_def=downstream_partitions_def,
partition_mappings={"upstream_asset_2": TrailingWindowPartitionMapping()},
)
def downstream_asset_2(context, upstream_asset_2):
assert context.output_asset_partition_key() == "2"
assert context.asset_partition_key_for_output() == "2"
assert upstream_asset_2 is None

my_job = build_assets_job(
Expand All @@ -657,15 +657,15 @@ def test_from_graph():

@op
def my_op(context):
assert context.output_asset_partition_key() == "a"
assert context.asset_partition_key_for_output() == "a"

@graph
def upstream_asset():
return my_op()

@op
def my_op2(context, upstream_asset):
assert context.output_asset_partition_key() == "a"
assert context.asset_partition_key_for_output() == "a"
return upstream_asset

@graph
Expand Down

1 comment on commit 70558e1

@vercel
Copy link

@vercel vercel bot commented on 70558e1 Jun 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.