Skip to content

Commit

Permalink
Toggle to enable auto run reexecution daemon (#8277)
Browse files Browse the repository at this point in the history
New instance config for auto run reexecution. Disabled by default, and raises if enabled with storages that don't support it
  • Loading branch information
johannkm committed Jun 13, 2022
1 parent 511e5ec commit 13f7f16
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def instance():
"module": "dagster.core.test_utils",
"class": "MockedRunCoordinator",
},
"run_retries": {"enabled": True},
},
) as instance:
yield instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
from dagster.core.snap import snapshot_from_execution_plan
from dagster.core.storage.pipeline_run import RunsFilter
from dagster.core.storage.tags import MAX_RETRIES_TAG
from dagster.core.test_utils import create_run_for_test
from dagster.core.test_utils import create_run_for_test, instance_for_test
from dagster.daemon.auto_run_reexecution.auto_run_reexecution import (
consume_new_runs_for_automatic_reexecution,
filter_runs_to_should_retry,
)
from dagster.daemon.auto_run_reexecution.event_log_consumer import EventLogConsumerDaemon

from .utils import foo, get_foo_pipeline_handle

Expand Down Expand Up @@ -235,3 +236,13 @@ def test_consume_new_runs_for_automatic_reexecution(instance, workspace):
)
)
assert len(instance.run_coordinator.queue()) == 2


def test_daemon_enabled(instance):
with instance_for_test() as run_retries_disabled_instance:
assert (
EventLogConsumerDaemon.daemon_type()
not in run_retries_disabled_instance.get_required_daemon_types()
)

assert EventLogConsumerDaemon.daemon_type() in instance.get_required_daemon_types()
23 changes: 23 additions & 0 deletions python_modules/dagster/dagster/core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,18 @@ def __init__(
"worker will be marked as failed, but will not be resumed.",
)

if self.run_retries_enabled:
check.invariant(
self.run_storage.supports_kvs(),
"Run retries are enabled, but the configured run storage does not support them. "
"Consider switching to Postgres or Mysql.",
)
check.invariant(
self.event_log_storage.supports_event_consumer_queries(),
"Run retries are enabled, but the configured event log storage does not support them. "
"Consider switching to Postgres or Mysql.",
)

# ctors

@staticmethod
Expand Down Expand Up @@ -667,6 +679,14 @@ def cancellation_thread_poll_interval_seconds(self) -> int:
"cancellation_thread_poll_interval_seconds", 10
)

@property
def run_retries_enabled(self) -> bool:
return self.get_settings("run_retries").get("enabled", False)

@property
def run_retries_max_retries(self) -> int:
return self.get_settings("run_retries").get("max_retries")

# python logs

@property
Expand Down Expand Up @@ -2005,6 +2025,7 @@ def wipe_daemon_heartbeats(self):
def get_required_daemon_types(self):
from dagster.core.run_coordinator import QueuedRunCoordinator
from dagster.core.scheduler import DagsterDaemonScheduler
from dagster.daemon.auto_run_reexecution.event_log_consumer import EventLogConsumerDaemon
from dagster.daemon.daemon import (
BackfillDaemon,
MonitoringDaemon,
Expand All @@ -2025,6 +2046,8 @@ def get_required_daemon_types(self):
daemons.append(QueuedRunCoordinatorDaemon.daemon_type())
if self.run_monitoring_enabled:
daemons.append(MonitoringDaemon.daemon_type())
if self.run_retries_enabled:
daemons.append(EventLogConsumerDaemon.daemon_type())
return daemons

# backfill
Expand Down
14 changes: 8 additions & 6 deletions python_modules/dagster/dagster/core/instance/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,18 +161,20 @@ def dagster_instance_config_schema():
"python_logs": python_logs_config_schema(),
"run_monitoring": Field(
{
"enabled": Field(
Bool,
is_required=False,
),
"enabled": Field(Bool, is_required=False),
"start_timeout_seconds": Field(int, is_required=False),
"max_resume_run_attempts": Field(int, is_required=False),
"poll_interval_seconds": Field(int, is_required=False),
"cancellation_thread_poll_interval_seconds": Field(int, is_required=False),
},
),
"run_retries": Field(
{
"enabled": Field(bool, is_required=False, default_value=False),
"max_retries": Field(int, is_required=False, default_value=0),
}
),
"code_servers": Field(
{"local_startup_timeout": Field(int, is_required=False)},
is_required=False,
{"local_startup_timeout": Field(int, is_required=False)}, is_required=False
),
}
8 changes: 7 additions & 1 deletion python_modules/dagster/dagster/core/instance/ref.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,13 @@ def from_dir(base_dir, config_filename=DAGSTER_CONFIG_YAML_FILENAME, overrides=N
defaults["run_launcher"],
)

settings_keys = {"telemetry", "python_logs", "run_monitoring", "code_servers"}
settings_keys = {
"telemetry",
"python_logs",
"run_monitoring",
"run_retries",
"code_servers",
}
settings = {key: config_value.get(key) for key in settings_keys if config_value.get(key)}

return InstanceRef(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
from dagster.core.workspace.workspace import IWorkspace
from dagster.utils.error import serializable_error_info_from_exc_info

DEFAULT_MAX_RETRIES = 0


def filter_runs_to_should_retry(
runs: List[DagsterRun], instance: DagsterInstance, default_max_retries: int
Expand Down Expand Up @@ -147,7 +145,7 @@ def consume_new_runs_for_automatic_reexecution(
for run, retry_number in filter_runs_to_should_retry(
[cast(DagsterRun, run_record.pipeline_run) for run_record in run_records],
instance,
DEFAULT_MAX_RETRIES,
instance.run_retries_max_retries,
):

yield
Expand Down
3 changes: 3 additions & 0 deletions python_modules/dagster/dagster/daemon/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from dagster.core.instance import DagsterInstance
from dagster.core.workspace import IWorkspace
from dagster.core.workspace.load_target import WorkspaceLoadTarget
from dagster.daemon.auto_run_reexecution.event_log_consumer import EventLogConsumerDaemon
from dagster.daemon.daemon import (
BackfillDaemon,
DagsterDaemon,
Expand Down Expand Up @@ -305,6 +306,8 @@ def create_daemon_of_type(daemon_type, instance):
return BackfillDaemon(interval_seconds=DEFAULT_DAEMON_INTERVAL_SECONDS)
elif daemon_type == MonitoringDaemon.daemon_type():
return MonitoringDaemon(interval_seconds=instance.run_monitoring_poll_interval_seconds)
elif daemon_type == EventLogConsumerDaemon.daemon_type():
return EventLogConsumerDaemon()
else:
raise Exception(f"Unexpected daemon type {daemon_type}")

Expand Down

0 comments on commit 13f7f16

Please sign in to comment.