Skip to content

Commit

Permalink
Fix check_step_health with delayed op retries (#10458)
Browse files Browse the repository at this point in the history
* Failing executor test

* fix
  • Loading branch information
johannkm authored and yuhan committed Nov 10, 2022
1 parent 87c8389 commit 00e4491
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 1 deletion.
Expand Up @@ -222,10 +222,15 @@ def execute(self, plan_context: PlanOrchestrationContext, execution_plan: Execut
dagster_event.is_step_success
or dagster_event.is_step_failure
or dagster_event.is_resource_init_failure
or dagster_event.is_step_up_for_retry
):
assert isinstance(dagster_event.step_key, str)
del running_steps[dagster_event.step_key]
active_execution.verify_complete(plan_context, dagster_event.step_key)

if not dagster_event.is_step_up_for_retry:
active_execution.verify_complete(
plan_context, dagster_event.step_key
)

# process skips from failures or uncovered inputs
list(active_execution.plan_events_iterator(plan_context))
Expand Down
@@ -0,0 +1,145 @@
# pylint: disable=unused-argument
import subprocess

from dagster import OpExecutionContext, RetryRequested, executor, job, op, reconstructable
from dagster._config import Permissive
from dagster._core.definitions.executor_definition import multiple_process_executor_requirements
from dagster._core.execution.api import execute_pipeline
from dagster._core.execution.retries import RetryMode
from dagster._core.executor.step_delegating import (
CheckStepHealthResult,
StepDelegatingExecutor,
StepHandler,
)
from dagster._core.test_utils import instance_for_test
from dagster._utils import merge_dicts


class TestStepHandler(StepHandler):
launched_first_attempt = False
launched_second_attempt = False
processes = [] # type: ignore

@property
def name(self):
return "TestStepHandler"

def launch_step(self, step_handler_context):

assert step_handler_context.execute_step_args.step_keys_to_execute == ["retry_op"]

attempt_count = (
step_handler_context.execute_step_args.known_state.get_retry_state().get_attempt_count(
"retry_op"
)
)
if attempt_count == 0:
assert TestStepHandler.launched_first_attempt is False
assert TestStepHandler.launched_second_attempt is False
TestStepHandler.launched_first_attempt = True
elif attempt_count == 1:
assert TestStepHandler.launched_first_attempt is True
assert TestStepHandler.launched_second_attempt is False
TestStepHandler.launched_second_attempt = True
else:
raise Exception("Unexpected attempt count")

print("TestStepHandler Launching Step!") # pylint: disable=print-call
TestStepHandler.processes.append(
subprocess.Popen(step_handler_context.execute_step_args.get_command_args())
)
return iter(())

def check_step_health(self, step_handler_context) -> CheckStepHealthResult:

assert step_handler_context.execute_step_args.step_keys_to_execute == ["retry_op"]

attempt_count = (
step_handler_context.execute_step_args.known_state.get_retry_state().get_attempt_count(
"retry_op"
)
)
if attempt_count == 0:
assert TestStepHandler.launched_first_attempt is True
assert TestStepHandler.launched_second_attempt is False
elif attempt_count == 1:
assert TestStepHandler.launched_first_attempt is True
assert (
TestStepHandler.launched_second_attempt
), "Second attempt not launched, shouldn't be checking on it"

return CheckStepHealthResult.healthy()

def terminate_step(self, step_handler_context):
raise NotImplementedError()

@classmethod
def reset(cls):
cls.launched_first_attempt = False
cls.launched_second_attempt = False

@classmethod
def wait_for_processes(cls):
for p in cls.processes:
p.wait(timeout=5)


@executor(
name="retry_assertion_executor",
requirements=multiple_process_executor_requirements(),
config_schema=Permissive(),
)
def retry_assertion_executor(exc_init):
return StepDelegatingExecutor(
TestStepHandler(),
**(merge_dicts({"retries": RetryMode.ENABLED}, exc_init.executor_config)),
check_step_health_interval_seconds=0,
)


@op(config_schema={"fails_before_pass": int})
def retry_op(context: OpExecutionContext):
if context.retry_number < context.op_config["fails_before_pass"]:
# enough for check_step_health to be called, since we set check_step_health_interval_seconds=0
raise RetryRequested(seconds_to_wait=5)


@job(executor_def=retry_assertion_executor)
def retry_job():
retry_op()


def test_retries_no_check_step_health_during_wait():
TestStepHandler.reset()
with instance_for_test() as instance:
result = execute_pipeline(
reconstructable(retry_job),
instance=instance,
run_config={
"execution": {"config": {}},
"ops": {"retry_op": {"config": {"fails_before_pass": 1}}},
},
)
TestStepHandler.wait_for_processes()
assert result.success


def test_retries_exhausted():
TestStepHandler.reset()
with instance_for_test() as instance:
result = execute_pipeline(
reconstructable(retry_job),
instance=instance,
run_config={
"execution": {"config": {}},
"ops": {"retry_op": {"config": {"fails_before_pass": 2}}},
},
)
TestStepHandler.wait_for_processes()
assert not result.success
assert not [
e
for e in result.event_list
if "Attempted to mark step retry_op as complete that was not known to be in flight"
in str(e)
]

0 comments on commit 00e4491

Please sign in to comment.