Skip to content

Commit

Permalink
Add run_id as execute_in_process arg (#7317)
Browse files Browse the repository at this point in the history
  • Loading branch information
dpeng817 committed Apr 6, 2022
1 parent 84550f6 commit 504c65f
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,7 @@ def execute_in_process(
instance: Optional["DagsterInstance"] = None,
resources: Optional[Dict[str, Any]] = None,
raise_on_error: bool = True,
run_id: Optional[str] = None,
) -> "ExecuteInProcessResult":
if not isinstance(self.node_def, GraphDefinition):
raise DagsterInvalidInvocationError(
Expand Down Expand Up @@ -643,6 +644,7 @@ def execute_in_process(
instance=instance,
output_capturing_enabled=True,
raise_on_error=raise_on_error,
run_id=run_id,
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,7 @@ def execute_in_process(
resources: Optional[Dict[str, Any]] = None,
raise_on_error: bool = True,
op_selection: Optional[List[str]] = None,
run_id: Optional[str] = None,
) -> "ExecuteInProcessResult":
"""
Execute this graph in-process, collecting results in-memory.
Expand Down Expand Up @@ -677,6 +678,7 @@ def execute_in_process(
instance=instance,
output_capturing_enabled=True,
raise_on_error=raise_on_error,
run_id=run_id,
)

@property
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ def execute_in_process(
partition_key: Optional[str] = None,
raise_on_error: bool = True,
op_selection: Optional[List[str]] = None,
run_id: Optional[str] = None,
) -> "ExecuteInProcessResult":
"""
Execute the Job in-process, gathering results in-memory.
Expand Down Expand Up @@ -203,6 +204,7 @@ def execute_in_process(
output_capturing_enabled=True,
raise_on_error=raise_on_error,
run_tags=tags,
run_id=run_id,
)

@property
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def core_execute_in_process(
output_capturing_enabled: bool,
raise_on_error: bool,
run_tags: Optional[Dict[str, Any]] = None,
run_id: Optional[str] = None,
) -> ExecuteInProcessResult:
pipeline_def = ephemeral_pipeline
mode_def = pipeline_def.get_mode_definition()
Expand All @@ -46,6 +47,7 @@ def core_execute_in_process(
run_config=run_config,
mode=mode_def.name,
tags={**pipeline_def.tags, **(run_tags or {})},
run_id=run_id,
)
run_id = pipeline_run.run_id

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
DagsterInvalidConfigError,
DagsterInvalidDefinitionError,
)
from dagster.core.test_utils import instance_for_test
from dagster.loggers import json_console_logger


Expand Down Expand Up @@ -1008,3 +1009,22 @@ def my_pipeline(sync_signal):
the_job = my_pipeline.to_job()
result = the_job.execute_in_process()
assert result.success


def test_run_id_execute_in_process():
@graph
def blank():
pass

with instance_for_test() as instance:
result = blank.execute_in_process(instance=instance, run_id="foo")
assert result.success
assert instance.get_run_by_id("foo")

result = blank.to_job().execute_in_process(instance=instance, run_id="bar")
assert result.success
assert instance.get_run_by_id("bar")

result = blank.alias("some_name").execute_in_process(instance=instance, run_id="baz")
assert result.success
assert instance.get_run_by_id("baz")

0 comments on commit 504c65f

Please sign in to comment.