Skip to content

Commit

Permalink
fix UnresolvedAssetJobDefinition.run_request_for_partition when confi… (
Browse files Browse the repository at this point in the history
#10238)

fix UnresolvedAssetJobDefinition.run_request_for_partition when config is hardcoded
  • Loading branch information
sryza committed Oct 31, 2022
1 parent ef678e6 commit 966bfb7
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,21 +176,7 @@ def __init__(
partitioned_config = None

if partitions_def:
check.invariant(
not isinstance(config, ConfigMapping),
"Can't supply a ConfigMapping for 'config' when 'partitions_def' is supplied.",
)

if isinstance(config, PartitionedConfig):
check.invariant(
config.partitions_def == partitions_def,
"Can't supply a PartitionedConfig for 'config' with a different "
"PartitionsDefinition than supplied for 'partitions_def'.",
)
partitioned_config = config
else:
hardcoded_config = config if config else {}
partitioned_config = PartitionedConfig(partitions_def, lambda _: hardcoded_config)
partitioned_config = PartitionedConfig.from_flexible_config(config, partitions_def)
else:
if isinstance(config, ConfigMapping):
config_mapping = config
Expand Down
23 changes: 23 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
user_code_error_boundary,
)
from ..storage.pipeline_run import PipelineRun
from .config import ConfigMapping
from .mode import DEFAULT_MODE_NAME
from .run_request import RunRequest, SkipReason
from .schedule_definition import (
Expand Down Expand Up @@ -869,6 +870,28 @@ def get_run_config_for_partition_key(self, partition_key: str) -> Mapping[str, A
raise DagsterInvalidInvocationError(f"No partition for partition key {partition_key}.")
return self.run_config_for_partition_fn(partition[0])

@classmethod
def from_flexible_config(
cls,
config: Optional[Union[ConfigMapping, Mapping[str, object], "PartitionedConfig"]],
partitions_def: PartitionsDefinition,
) -> "PartitionedConfig":
check.invariant(
not isinstance(config, ConfigMapping),
"Can't supply a ConfigMapping for 'config' when 'partitions_def' is supplied.",
)

if isinstance(config, PartitionedConfig):
check.invariant(
config.partitions_def == partitions_def,
"Can't supply a PartitionedConfig for 'config' with a different "
"PartitionsDefinition than supplied for 'partitions_def'.",
)
return config
else:
hardcoded_config = config if config else {}
return cls(partitions_def, lambda _: cast(Mapping, hardcoded_config))

def __call__(self, *args, **kwargs):
if self._decorated_fn is None:
raise DagsterInvalidInvocationError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ def get_partition_set_def(self) -> Optional["PartitionSetDefinition"]:
if self.partitions_def is None:
return None

partitioned_config = self.config if isinstance(self.config, PartitionedConfig) else None
partitioned_config = PartitionedConfig.from_flexible_config(
self.config, self.partitions_def
)

tags_fn = (
partitioned_config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
StaticPartitionsDefinition,
daily_partitioned_config,
define_asset_job,
hourly_partitioned_config,
materialize,
)
from dagster._check import CheckError
from dagster._core.definitions import asset, build_assets_job, multi_asset
from dagster._core.definitions.asset_partitions import (
get_downstream_partitions_for_partition_range,
Expand Down Expand Up @@ -447,3 +449,22 @@ def myconfig(start, _end):
the_job = define_asset_job("job", config=myconfig).resolve([asset1], [])

assert the_job.execute_in_process(partition_key="2020-01-01").success


def test_mismatched_job_partitioned_config_with_asset_partitions():
daily_partitions_def = DailyPartitionsDefinition(start_date="2020-01-01")

@asset(config_schema={"day_of_month": int}, partitions_def=daily_partitions_def)
def asset1(context):
assert context.op_config["day_of_month"] == 1
assert context.partition_key == "2020-01-01"

@hourly_partitioned_config(start_date="2020-01-01-00:00")
def myconfig(start, _end):
return {"ops": {"asset1": {"config": {"day_of_month": start.day}}}}

with pytest.raises(
CheckError,
match="Can't supply a PartitionedConfig for 'config' with a different PartitionsDefinition than supplied for 'partitions_def'.",
):
define_asset_job("job", config=myconfig).resolve([asset1], [])
Original file line number Diff line number Diff line change
Expand Up @@ -662,3 +662,13 @@ def my_asset():
assert run_request_with_tags.tags
assert run_request_with_tags.tags.get(PARTITION_NAME_TAG) == partition_key
assert run_request_with_tags.tags.get("foo") == "bar"

my_job_hardcoded_config = define_asset_job(
"my_job_hardcoded_config",
"*",
config={"ops": {"my_asset": {"config": {"partition": "blabla"}}}},
partitions_def=partitions_def,
)

run_request = my_job_hardcoded_config.run_request_for_partition(partition_key="a", run_key=None)
assert run_request.run_config == {"ops": {"my_asset": {"config": {"partition": "blabla"}}}}

0 comments on commit 966bfb7

Please sign in to comment.