Skip to content

Commit

Permalink
run_config argument on run_request_for_partition (#10279)
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed Nov 1, 2022
1 parent 9a3b5bc commit f7f1e13
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,7 @@ def run_request_for_partition(
run_key: Optional[str] = None,
tags: Optional[Mapping[str, str]] = None,
asset_selection: Optional[Sequence[AssetKey]] = None,
run_config: Optional[Mapping[str, Any]] = None,
) -> RunRequest:
"""
Creates a RunRequest object for a run that processes the given partition.
Expand All @@ -573,6 +574,9 @@ def run_request_for_partition(
value means that a run will always be launched per evaluation.
tags (Optional[Dict[str, str]]): A dictionary of tags (string key-value pairs) to attach
to the launched run.
run_config (Optional[Mapping[str, Any]]: Configuration for the run. If the job has
a :py:class:`PartitionedConfig`, this value will override replace the config
provided by it.
Returns:
RunRequest: an object that requests a run to process the given partition.
Expand All @@ -582,7 +586,6 @@ def run_request_for_partition(
check.failed("Called run_request_for_partition on a non-partitioned job")

partition = partition_set.get_partition(partition_key)
run_config = partition_set.run_config_for_partition(partition)
run_request_tags = (
{**tags, **partition_set.tags_for_partition(partition)}
if tags
Expand All @@ -591,7 +594,9 @@ def run_request_for_partition(

return RunRequest(
run_key=run_key,
run_config=run_config,
run_config=run_config
if run_config is not None
else partition_set.run_config_for_partition(partition),
tags=run_request_tags,
job_name=self.name,
asset_selection=asset_selection,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import operator
from functools import reduce
from typing import TYPE_CHECKING, Any, Dict, NamedTuple, Optional, Sequence, Union, cast
from typing import TYPE_CHECKING, Any, Dict, Mapping, NamedTuple, Optional, Sequence, Union, cast

import dagster._check as check
from dagster._core.definitions import AssetKey
Expand Down Expand Up @@ -100,6 +100,7 @@ def run_request_for_partition(
run_key: Optional[str] = None,
tags: Optional[Dict[str, str]] = None,
asset_selection: Optional[Sequence[AssetKey]] = None,
run_config: Optional[Mapping[str, Any]] = None,
) -> RunRequest:
"""
Creates a RunRequest object for a run that processes the given partition.
Expand All @@ -112,6 +113,9 @@ def run_request_for_partition(
value means that a run will always be launched per evaluation.
tags (Optional[Dict[str, str]]): A dictionary of tags (string key-value pairs) to attach
to the launched run.
run_config (Optional[Mapping[str, Any]]: Configuration for the run. If the job has
a :py:class:`PartitionedConfig`, this value will override replace the config
provided by it.
Returns:
RunRequest: an object that requests a run to process the given partition.
Expand All @@ -121,7 +125,6 @@ def run_request_for_partition(
check.failed("Called run_request_for_partition on a non-partitioned job")

partition = partition_set.get_partition(partition_key)
run_config = partition_set.run_config_for_partition(partition)
run_request_tags = (
{**tags, **partition_set.tags_for_partition(partition)}
if tags
Expand All @@ -130,7 +133,9 @@ def run_request_for_partition(

return RunRequest(
run_key=run_key,
run_config=run_config,
run_config=run_config
if run_config is not None
else partition_set.run_config_for_partition(partition),
tags=run_request_tags,
asset_selection=asset_selection,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -672,3 +672,6 @@ def my_asset():

run_request = my_job_hardcoded_config.run_request_for_partition(partition_key="a", run_key=None)
assert run_request.run_config == {"ops": {"my_asset": {"config": {"partition": "blabla"}}}}
assert my_job_hardcoded_config.run_request_for_partition(
partition_key="a", run_config={"a": 5}
).run_config == {"a": 5}
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ def my_job():
assert run_request_with_tags.tags.get(PARTITION_NAME_TAG) == partition_key
assert run_request_with_tags.tags.get("foo") == "bar"

assert my_job.run_request_for_partition(partition_key="a", run_config={"a": 5}).run_config == {
"a": 5
}


# Datetime is not serializable
@op
Expand Down

0 comments on commit f7f1e13

Please sign in to comment.