Skip to content

Commit

Permalink
[bug] hacky fix for step launcher behavior (#6866)
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Mar 14, 2022
1 parent 8e6bccd commit c7bfb9c
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ def model_func(x, a, b):
"database": "postgres_replica",
}


AIRBYTE_CONFIG = {"host": "localhost", "port": "8000"}
DBT_PROJECT_DIR = file_relative_path(__file__, "../mds_dbt")
DBT_PROFILES_DIR = file_relative_path(__file__, "../mds_dbt/config")
Expand Down
15 changes: 11 additions & 4 deletions python_modules/dagster/dagster/core/definitions/step_launcher.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Dict, NamedTuple, Optional
from typing import TYPE_CHECKING, Dict, NamedTuple, Optional, Sequence

from dagster import check
from dagster.core.definitions.reconstructable import ReconstructablePipeline
from dagster.core.execution.retries import RetryMode
from dagster.core.storage.pipeline_run import PipelineRun

if TYPE_CHECKING:
from dagster.core.events.log import EventLogEntry
from dagster.core.execution.plan.state import KnownExecutionState


Expand All @@ -22,7 +23,8 @@ class StepRunRef(
("recon_pipeline", ReconstructablePipeline),
("prior_attempts_count", int),
("known_state", Optional["KnownExecutionState"]),
("parent_run", Optional[PipelineRun]),
("run_group", Sequence[PipelineRun]),
("upstream_output_events", Sequence["EventLogEntry"]),
],
)
):
Expand All @@ -41,9 +43,11 @@ def __new__(
recon_pipeline: ReconstructablePipeline,
prior_attempts_count: int,
known_state: Optional["KnownExecutionState"],
parent_run: Optional[PipelineRun],
run_group: Optional[Sequence[PipelineRun]],
upstream_output_events: Optional[Sequence["EventLogEntry"]],
):
from dagster.core.execution.plan.state import KnownExecutionState
from dagster.core.storage.event_log import EventLogEntry

return super(StepRunRef, cls).__new__(
cls,
Expand All @@ -55,7 +59,10 @@ def __new__(
check.inst_param(recon_pipeline, "recon_pipeline", ReconstructablePipeline),
check.int_param(prior_attempts_count, "prior_attempts_count"),
check.opt_inst_param(known_state, "known_state", KnownExecutionState),
check.opt_inst_param(parent_run, "parent_run", PipelineRun),
check.opt_list_param(run_group, "run_group", of_type=PipelineRun),
check.opt_list_param(
upstream_output_events, "upstream_output_events", of_type=EventLogEntry
),
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
)
from dagster.core.definitions.step_launcher import StepLauncher, StepRunRef
from dagster.core.errors import raise_execution_interrupts
from dagster.core.events import DagsterEvent
from dagster.core.events import DagsterEvent, DagsterEventType
from dagster.core.execution.api import create_execution_plan
from dagster.core.execution.context.system import StepExecutionContext
from dagster.core.execution.context_creation_pipeline import PlanExecutionContextManager
Expand Down Expand Up @@ -102,6 +102,46 @@ def _module_in_package_dir(file_path: str, package_dir: str) -> str:
return ".".join(without_extension.split(os.sep))


def _upstream_events_and_runs(step_context: StepExecutionContext):
"Grabs the minimal set of output events and runs to inform a remote instance of how to load each output."
step_inputs = step_context.step.step_inputs
upstream_output_handles = set().union(
*(step_input.source.step_output_handle_dependencies for step_input in step_inputs)
)
current_run = step_context.pipeline_run
events = []
runs = []
while True:
runs.append(current_run)
# note: this would cost N db calls where N = number of parent runs
step_output_records = step_context.instance.all_logs(
current_run.run_id, of_type=DagsterEventType.STEP_OUTPUT
)
# if the parent run has yielded an StepOutput event for the given step output,
# we find the source run id
for r in step_output_records:
output_handle = r.dagster_event.step_output_data.step_output_handle
# if this output matches one of the required step outputs, add it to the list of
# required events
if output_handle in upstream_output_handles:
events.append(r)
upstream_output_handles.remove(output_handle)
# found all the necessary events
if not upstream_output_handles:
break

if current_run.parent_run_id is None:
step_context.log.warn(
f"Could not find outputs in the logs for output handles: {upstream_output_handles}"
)
break

# else, keep looking backwards
current_run = step_context.instance.get_run_by_id(current_run.parent_run_id)

return events, runs


def step_context_to_step_run_ref(
step_context: StepExecutionContext,
prior_attempts_count: int,
Expand Down Expand Up @@ -149,8 +189,7 @@ def step_context_to_step_run_ref(
solids_to_execute=recon_pipeline.solids_to_execute,
)

parent_run_id = step_context.pipeline_run.parent_run_id
parent_run = step_context.instance.get_run_by_id(parent_run_id) if parent_run_id else None
upstream_output_events, run_group = _upstream_events_and_runs(step_context)
return StepRunRef(
run_config=step_context.run_config,
pipeline_run=step_context.pipeline_run,
Expand All @@ -160,7 +199,8 @@ def step_context_to_step_run_ref(
recon_pipeline=recon_pipeline, # type: ignore
prior_attempts_count=prior_attempts_count,
known_state=step_context.execution_plan.known_state,
parent_run=parent_run,
run_group=run_group,
upstream_output_events=upstream_output_events,
)


Expand All @@ -180,11 +220,14 @@ def external_instance_from_step_run_ref(
DagsterInstance: A DagsterInstance that can be used to execute an external step.
"""
instance = DagsterInstance.ephemeral()
# re-execution expects the parent run to be available on the instance, so add these
if step_run_ref.parent_run:
# re-execution expects the parent run(s) to be available on the instance, so add these
for run in step_run_ref.run_group:
# remove the pipeline_snapshot_id, as this instance doesn't have any snapshots
instance.add_run(step_run_ref.pipeline_run._replace(pipeline_snapshot_id=None))
instance.add_run(step_run_ref.parent_run._replace(pipeline_snapshot_id=None))
instance.add_run(run._replace(pipeline_snapshot_id=None))
# the can_load() function on the step context currently depends on reading output events
# from the instance, so we make sure the remote instance has the relevant events
for entry in step_run_ref.upstream_output_events:
instance.handle_new_event(entry)
if event_listener_fn:
instance.add_event_listener(step_run_ref.run_id, event_listener_fn)
return instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,7 @@ def load_input_object(self, step_context):
from dagster.core.events import DagsterEvent

values = []

# some upstream steps may have skipped and we allow fan-in to continue in their absence
source_handles_to_skip = list(
filter(lambda x: not step_context.can_load(x), self.step_output_handle_dependencies)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,26 +79,33 @@ def request_retry_local_external_step_launcher(context):
return RequestRetryLocalExternalStepLauncher(**context.resource_config)


def define_dynamic_job():
def _define_dynamic_job(launch_initial, launch_final):
from typing import List

@op(required_resource_keys={"first_step_launcher"}, out=DynamicOut(int))
initial_launcher = (
local_external_step_launcher if launch_initial else ResourceDefinition.mock_resource()
)
final_launcher = (
local_external_step_launcher if launch_final else ResourceDefinition.mock_resource()
)

@op(required_resource_keys={"initial_launcher"}, out=DynamicOut(int))
def dynamic_outs():
for i in range(0, 3):
yield DynamicOutput(value=i, mapping_key=f"num_{i}")

@op(required_resource_keys={"second_step_launcher"})
@op
def increment(i):
return i + 1

@op
@op(required_resource_keys={"final_launcher"})
def total(ins: List[int]):
return sum(ins)

@job(
resource_defs={
"first_step_launcher": local_external_step_launcher,
"second_step_launcher": local_external_step_launcher,
"initial_launcher": initial_launcher,
"final_launcher": final_launcher,
"io_manager": fs_io_manager,
}
)
Expand Down Expand Up @@ -142,6 +149,18 @@ def my_job():
return my_job


def define_dynamic_job_all_launched():
return _define_dynamic_job(True, True)


def define_dynamic_job_first_launched():
return _define_dynamic_job(True, False)


def define_dynamic_job_last_launched():
return _define_dynamic_job(False, True)


def define_basic_job_all_launched():
return _define_basic_job(True, True)

Expand Down Expand Up @@ -300,31 +319,35 @@ def test_pipeline(mode):
assert result.result_for_solid("add_one").output_value() == 3


def test_dynamic_job():
@pytest.mark.parametrize(
"job_fn",
[
define_dynamic_job_all_launched,
define_dynamic_job_first_launched,
define_dynamic_job_last_launched,
],
)
def test_dynamic_job(job_fn):
with tempfile.TemporaryDirectory() as tmpdir:
with instance_for_test() as instance:
result = execute_pipeline(
pipeline=reconstructable(define_dynamic_job),
pipeline=reconstructable(job_fn),
run_config={
"resources": {
"first_step_launcher": {
"initial_launcher": {
"config": {"scratch_dir": tmpdir},
},
"second_step_launcher": {
"final_launcher": {
"config": {"scratch_dir": tmpdir},
},
"io_manager": {"config": {"base_dir": tmpdir}},
}
},
instance=instance,
)
assert result.result_for_solid("total").output_value() == 6
assert result.output_for_solid("total") == 6


@pytest.mark.skip(
reason="Reexecution will fail with step launchers because it relies on querying event log "
"storage which is not present on the external step"
)
@pytest.mark.parametrize(
"job_fn",
[
Expand Down

0 comments on commit c7bfb9c

Please sign in to comment.