Skip to content

Commit

Permalink
Shut down the daemon faster when it is interrupted (#11657)
Browse files Browse the repository at this point in the history
Summary:
we shut down the daemon by setting an event that each daemon thread
checks to terminate cleanly. Sometimes a particular daemon is in the
middle of a long sleep when this happens, making the daemon take forever
to spin down. Pass the shut down event through to the daemons so that
instead of calling time.sleep, they can call shutdown_event.wait

### Summary & Motivation

### How I Tested These Changes
  • Loading branch information
gibsondan committed Jan 13, 2023
1 parent 71e060a commit cfabdb6
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def test_thread_die_daemon(monkeypatch):

iteration_ran = {"ran": False}

def run_loop_error(_, _ctx):
def run_loop_error(_, _ctx, _shutdown_event):
iteration_ran["ran"] = True
raise KeyboardInterrupt
yield # pylint: disable=unreachable
Expand Down Expand Up @@ -183,7 +183,7 @@ def test_error_daemon(monkeypatch):

error_count = {"count": 0}

def run_loop_error(_, _ctx):
def run_loop_error(_, _ctx, _shutdown_event):
if should_raise_errors:
time.sleep(0.5)
error_count["count"] = error_count["count"] + 1
Expand Down Expand Up @@ -311,7 +311,7 @@ def test_multiple_error_daemon(monkeypatch):
with instance_for_test() as instance:
from dagster._daemon.daemon import SensorDaemon

def run_loop_error(_, _ctx):
def run_loop_error(_, _ctx, _shutdown_event):
# ?message stack cls_name cause"
yield SerializableErrorInfo("foobar", None, None, None)
yield SerializableErrorInfo("bizbuz", None, None, None)
Expand Down
14 changes: 11 additions & 3 deletions python_modules/dagster/dagster/_daemon/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def run_daemon_loop(
from dagster._core.telemetry_upload import uploading_logging_thread

with uploading_logging_thread():
daemon_generator = self.core_loop(workspace_process_context)
daemon_generator = self.core_loop(workspace_process_context, daemon_shutdown_event)

try:
while not daemon_shutdown_event.is_set():
Expand All @@ -97,7 +97,9 @@ def run_daemon_loop(
)
self._errors.appendleft((error_info, pendulum.now("UTC")))
daemon_generator.close()
daemon_generator = self.core_loop(workspace_process_context)
daemon_generator = self.core_loop(
workspace_process_context, daemon_shutdown_event
)
finally:
try:
self._check_add_heartbeat(
Expand Down Expand Up @@ -187,6 +189,7 @@ def _check_add_heartbeat(
def core_loop(
self,
workspace_process_context: TContext,
shutdown_event: Event,
) -> TDaemonGenerator:
"""
Execute the daemon loop, which should be a generator function that never finishes.
Expand All @@ -204,6 +207,7 @@ def __init__(self, interval_seconds):
def core_loop(
self,
workspace_process_context: TContext,
shutdown_event: Event,
) -> TDaemonGenerator:
while True:
start_time = time.time()
Expand All @@ -214,8 +218,8 @@ def core_loop(
self._logger.error("Caught error:\n%s", error_info)
yield error_info
while time.time() - start_time < self.interval_seconds:
shutdown_event.wait(0.5)
yield None
time.sleep(0.5)
yield None

@abstractmethod
Expand All @@ -231,6 +235,7 @@ def daemon_type(cls):
def core_loop(
self,
workspace_process_context: IWorkspaceProcessContext,
shutdown_event: Event,
) -> TDaemonGenerator:
scheduler = workspace_process_context.instance.scheduler
if not isinstance(scheduler, DagsterDaemonScheduler):
Expand All @@ -241,6 +246,7 @@ def core_loop(
self._logger,
scheduler.max_catchup_runs,
scheduler.max_tick_retries,
shutdown_event,
)


Expand All @@ -252,10 +258,12 @@ def daemon_type(cls):
def core_loop(
self,
workspace_process_context: IWorkspaceProcessContext,
shutdown_event: Event,
) -> TDaemonGenerator:
yield from execute_sensor_iteration_loop(
workspace_process_context,
self._logger,
shutdown_event,
)


Expand Down
5 changes: 4 additions & 1 deletion python_modules/dagster/dagster/_daemon/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ def _check_for_debug_crash(debug_crash_flags, key):
def execute_sensor_iteration_loop(
workspace_process_context: IWorkspaceProcessContext,
logger: logging.Logger,
shutdown_event: threading.Event,
until=None,
) -> TDaemonGenerator:
"""
Expand Down Expand Up @@ -265,7 +266,9 @@ def execute_sensor_iteration_loop(

loop_duration = end_time - start_time
sleep_time = max(0, MIN_INTERVAL_LOOP_TIME - loop_duration)
time.sleep(sleep_time)
shutdown_event.wait(sleep_time)

yield None


def execute_sensor_iteration(
Expand Down
3 changes: 2 additions & 1 deletion python_modules/dagster/dagster/_scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ def execute_scheduler_iteration_loop(
logger: logging.Logger,
max_catchup_runs: int,
max_tick_retries: int,
shutdown_event: threading.Event,
):
schedule_state_lock = threading.Lock()
scheduler_run_futures: Dict[str, Future] = {}
Expand Down Expand Up @@ -160,7 +161,7 @@ def execute_scheduler_iteration_loop(
if next_minute_time > end_time:
# Sleep until the beginning of the next minute, plus a small epsilon to
# be sure that we're past the start of the minute
time.sleep(next_minute_time - end_time + 0.001)
shutdown_event.wait(next_minute_time - end_time + 0.001)
yield


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import tempfile
import time
from contextlib import ExitStack, contextmanager
from unittest import mock

import pendulum
import pytest
Expand Down Expand Up @@ -1505,6 +1506,9 @@ def fake_sleep(s):

monkeypatch.setattr(time, "sleep", fake_sleep)

shutdown_event = mock.MagicMock()
shutdown_event.wait.side_effect = fake_sleep

with pendulum.test(freeze_datetime):
# 60 second custom interval
external_sensor = external_repo.get_external_sensor("custom_interval_sensor")
Expand Down Expand Up @@ -1538,6 +1542,7 @@ def fake_sleep(s):
execute_sensor_iteration_loop(
workspace_context,
get_default_daemon_logger("dagster.daemon.SensorDaemon"),
shutdown_event=shutdown_event,
until=freeze_datetime.add(seconds=65).timestamp(),
)
)
Expand Down

0 comments on commit cfabdb6

Please sign in to comment.