Skip to content

Commit

Permalink
Restore sensor daemon yielding when evaluating sensors synchronously (#…
Browse files Browse the repository at this point in the history
…8756)

* deflake

* drop synchronous executor

* fix tests
  • Loading branch information
prha committed Jul 6, 2022
1 parent 0779ab7 commit 5f92ca1
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 71 deletions.
Expand Up @@ -45,7 +45,7 @@ def _create_sensor_tick(graphql_context):
graphql_context.instance,
logger,
workspace,
executor=SingleThreadPoolExecutor(),
threadpool_executor=SingleThreadPoolExecutor(),
debug_futures=futures,
)
)
Expand Down
Expand Up @@ -512,7 +512,7 @@ def _create_tick(graphql_context):
graphql_context.instance,
logger,
workspace,
executor=SingleThreadPoolExecutor(),
threadpool_executor=SingleThreadPoolExecutor(),
debug_futures=futures,
)
)
Expand Down
141 changes: 82 additions & 59 deletions python_modules/dagster/dagster/daemon/sensor.py
Expand Up @@ -3,6 +3,7 @@
import sys
import time
from collections import defaultdict
from contextlib import ExitStack
from typing import Dict, NamedTuple, Optional

import pendulum
Expand Down Expand Up @@ -176,46 +177,6 @@ def _check_for_debug_crash(debug_crash_flags, key):
RELOAD_WORKSPACE = 60


class SynchronousExecutor:
"""
Executes functions in series without creating threads, creating a uniform execution interface
"""

def __init__(self, **kwargs):
pass

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, exc_traceback):
pass

def submit(self, fn, *args, **kwargs):
future = concurrent.futures.Future()

try:
result = fn(*args, **kwargs)
future.set_result(result)
except Exception as e:
future.set_exception(e)

return future

def shutdown(self, wait=True):
pass


def sensor_executor(instance):
settings = instance.get_settings("sensors")

if settings.get("use_threads"):
return concurrent.futures.ThreadPoolExecutor(
max_workers=settings.get("num_workers"),
thread_name_prefix="sensor_daemon_worker",
)
return SynchronousExecutor()


def execute_sensor_iteration_loop(instance, workspace, logger, until=None):
"""
Helper function that performs sensor evaluations on a tighter loop, while reusing grpc locations
Expand All @@ -225,7 +186,18 @@ def execute_sensor_iteration_loop(instance, workspace, logger, until=None):
"""
workspace_loaded_time = pendulum.now("UTC").timestamp()

with sensor_executor(instance) as executor:
with ExitStack() as stack:
settings = instance.get_settings("sensors")
if settings.get("use_threads"):
threadpool_executor = stack.enter_context(
concurrent.futures.ThreadPoolExecutor(
max_workers=settings.get("num_workers"),
thread_name_prefix="sensor_daemon_worker",
)
)
else:
threadpool_executor = None

workspace_iteration = 0
start_time = pendulum.now("UTC").timestamp()
while True:
Expand All @@ -245,7 +217,7 @@ def execute_sensor_iteration_loop(instance, workspace, logger, until=None):
instance,
logger,
workspace.get_workspace_copy_for_iteration(),
executor,
threadpool_executor,
log_verbose_checks=(workspace_iteration == 0),
)

Expand All @@ -260,7 +232,7 @@ def execute_sensor_iteration(
instance,
logger,
workspace,
executor,
threadpool_executor=None,
log_verbose_checks=True,
debug_crash_flags=None,
debug_futures=None,
Expand Down Expand Up @@ -359,8 +331,54 @@ def execute_sensor_iteration(
elif _is_under_min_interval(sensor_state, external_sensor, now):
continue

future = executor.submit(
_process_tick,
if threadpool_executor:
# add the sensor evaluations to a threadpool
future = threadpool_executor.submit(
_process_tick,
logger,
instance,
workspace,
now,
external_sensor,
sensor_state,
sensor_debug_crash_flags,
tick_retention_settings,
)

# for tests, add the futures to enable for waiting
if debug_futures is not None:
debug_futures[external_sensor.selector_id] = future
yield

else:
# evaluate the sensors in a loop, synchronously, yielding to allow the sensor daemon to
# heartbeat
yield from _process_tick_generator(
logger,
instance,
workspace,
now,
external_sensor,
sensor_state,
sensor_debug_crash_flags,
tick_retention_settings,
)


def _process_tick(
logger,
instance,
workspace,
now,
external_sensor,
sensor_state,
sensor_debug_crash_flags,
tick_retention_settings,
):
# evaluate the tick immediately, but from within a thread. The main thread should be able to
# heartbeat to keep the daemon alive
list(
_process_tick_generator(
logger,
instance,
workspace,
Expand All @@ -370,11 +388,10 @@ def execute_sensor_iteration(
sensor_debug_crash_flags,
tick_retention_settings,
)
if debug_futures is not None:
debug_futures[external_sensor.selector_id] = future
)


def _process_tick(
def _process_tick_generator(
logger,
instance,
workspace,
Expand All @@ -384,6 +401,7 @@ def _process_tick(
sensor_debug_crash_flags,
tick_retention_settings,
):
error_info = None
try:
tick = instance.create_tick(
TickData(
Expand All @@ -402,15 +420,13 @@ def _process_tick(
external_sensor, tick, instance, logger, tick_retention_settings
) as tick_context:
_check_for_debug_crash(sensor_debug_crash_flags, "TICK_HELD")
return list(
_evaluate_sensor(
tick_context,
instance,
workspace,
external_sensor,
sensor_state,
sensor_debug_crash_flags,
)
yield from _evaluate_sensor(
tick_context,
instance,
workspace,
external_sensor,
sensor_state,
sensor_debug_crash_flags,
)

except Exception:
Expand All @@ -419,6 +435,8 @@ def _process_tick(
f"Sensor daemon caught an error for sensor {external_sensor.name} : {error_info.to_string()}"
)

yield error_info


def _evaluate_sensor(
context,
Expand All @@ -445,6 +463,8 @@ def _evaluate_sensor(
state.instigator_data.cursor if state.instigator_data else None,
)

yield

assert isinstance(sensor_runtime_data, SensorExecutionData)
if not sensor_runtime_data.run_requests:
if sensor_runtime_data.pipeline_run_reactions:
Expand Down Expand Up @@ -500,6 +520,7 @@ def _evaluate_sensor(
context.logger.info(f"No run requests returned for {external_sensor.name}, skipping")
context.update_state(TickStatus.SKIPPED, cursor=sensor_runtime_data.cursor)

yield
return # Done with run status sensors

skipped_runs = []
Expand Down Expand Up @@ -531,12 +552,12 @@ def _evaluate_sensor(
if isinstance(run, SkippedSensorRun):
skipped_runs.append(run)
context.add_run_info(run_id=None, run_key=run_request.run_key)
yield
continue

_check_for_debug_crash(sensor_debug_crash_flags, "RUN_CREATED")

error_info = None

try:
context.logger.info(
"Launching run for {sensor_name}".format(sensor_name=external_sensor.name)
Expand All @@ -552,7 +573,7 @@ def _evaluate_sensor(
context.logger.error(
f"Run {run.run_id} created successfully but failed to launch: " f"{str(error_info)}"
)
yield error_info
yield error_info

_check_for_debug_crash(sensor_debug_crash_flags, "RUN_LAUNCHED")

Expand All @@ -571,6 +592,8 @@ def _evaluate_sensor(
else:
context.update_state(TickStatus.SKIPPED, cursor=sensor_runtime_data.cursor)

yield


def _is_under_min_interval(state, external_sensor, now):
if not state.instigator_data:
Expand Down
Expand Up @@ -44,7 +44,7 @@ def _test_launch_sensor_runs_in_subprocess(instance_ref, execution_datetime, deb
instance,
logger,
workspace,
executor=SingleThreadPoolExecutor(),
threadpool_executor=SingleThreadPoolExecutor(),
debug_crash_flags=debug_crash_flags,
debug_futures=futures,
)
Expand Down
Expand Up @@ -44,11 +44,7 @@
)
from dagster.core.workspace.load_target import PythonFileTarget
from dagster.daemon import get_default_daemon_logger
from dagster.daemon.sensor import (
SynchronousExecutor,
execute_sensor_iteration,
execute_sensor_iteration_loop,
)
from dagster.daemon.sensor import execute_sensor_iteration, execute_sensor_iteration_loop
from dagster.seven.compat.pendulum import create_pendulum_time, to_timezone


Expand Down Expand Up @@ -386,13 +382,13 @@ def workspace_load_target(attribute="the_repo"):
def get_sensor_executors():
return [
pytest.param(
SynchronousExecutor(),
marks=pytest.mark.skipif(sys.version_info.minor != 9, reason="multithreaded timeouts"),
None,
marks=pytest.mark.skipif(sys.version_info.minor != 9, reason="timeouts"),
id="synchronous",
),
pytest.param(
SingleThreadPoolExecutor(),
marks=pytest.mark.skipif(sys.version_info.minor != 9, reason="multithreaded timeouts"),
marks=pytest.mark.skipif(sys.version_info.minor != 9, reason="timeouts"),
id="threadpool",
),
]
Expand All @@ -406,7 +402,7 @@ def evaluate_sensors(instance, workspace, executor, timeout=75):
instance,
logger,
workspace,
executor=executor,
threadpool_executor=executor,
debug_futures=futures,
)
)
Expand Down

0 comments on commit 5f92ca1

Please sign in to comment.