Skip to content

Commit

Permalink
add run_request_for_partition for job definition (#6790)
Browse files Browse the repository at this point in the history
* add run_request_for_partition for job definition

* reorder args
  • Loading branch information
prha committed Feb 24, 2022
1 parent f17a1fa commit d90d1dd
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 0 deletions.
11 changes: 11 additions & 0 deletions python_modules/dagster/dagster/core/definitions/job_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from .pipeline_definition import PipelineDefinition
from .preset import PresetDefinition
from .resource_definition import ResourceDefinition
from .run_request import RunRequest
from .version_strategy import VersionStrategy

if TYPE_CHECKING:
Expand Down Expand Up @@ -251,6 +252,16 @@ def get_partition_set_def(self) -> Optional["PartitionSetDefinition"]:

return self._cached_partition_set

def run_request_for_partition(self, partition_key: str, run_key: Optional[str]) -> RunRequest:
partition_set = self.get_partition_set_def()
if not partition_set:
check.failed("Called run_request_for_partition on a non-partitioned job")

partition = partition_set.get_partition(partition_key)
run_config = partition_set.run_config_for_partition(partition)
tags = partition_set.tags_for_partition(partition)
return RunRequest(run_key=run_key, run_config=run_config, tags=tags)

def with_hooks(self, hook_defs: AbstractSet[HookDefinition]) -> "JobDefinition":
"""Apply a set of hooks to all op instances within the job."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
job,
op,
reconstructable,
static_partitioned_config,
)
from dagster.core.storage.tags import PARTITION_NAME_TAG
from dagster.core.test_utils import environ, instance_for_test


Expand Down Expand Up @@ -125,3 +127,26 @@ def basic():

with environ({"SOME_ENV_VAR": "blah"}):
assert the_job.execute_in_process().success


def test_job_run_request():
def partition_fn(partition_key: str):
return {"ops": {"my_op": {"config": {"partition": partition_key}}}}

@static_partitioned_config(partition_keys=["a", "b", "c", "d"])
def my_partitioned_config(partition_key: str):
return partition_fn(partition_key)

@op
def my_op():
pass

@job(config=my_partitioned_config)
def my_job():
my_op()

for partition_key in ["a", "b", "c", "d"]:
run_request = my_job.run_request_for_partition(partition_key=partition_key, run_key=None)
assert run_request.run_config == partition_fn(partition_key)
assert run_request.tags
assert run_request.tags.get(PARTITION_NAME_TAG) == partition_key

0 comments on commit d90d1dd

Please sign in to comment.