Skip to content

Commit

Permalink
chore(daemon): retrieve daemon heartbeats in batch (#6843)
Browse files Browse the repository at this point in the history
  • Loading branch information
rexledesma committed Mar 1, 2022
1 parent 0158c07 commit d98aa5c
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 111 deletions.
15 changes: 10 additions & 5 deletions python_modules/dagster-graphql/dagster_graphql/schema/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from dagster import check
from dagster.core.instance import DagsterInstance, is_dagit_telemetry_enabled
from dagster.core.launcher.base import RunLauncher
from dagster.daemon.controller import get_daemon_status
from dagster.daemon.controller import get_daemon_statuses
from dagster.daemon.types import DaemonStatus

from .errors import GraphenePythonError
Expand Down Expand Up @@ -77,14 +77,19 @@ def resolve_id(self, _graphene_info):

def resolve_daemonStatus(self, _graphene_info, daemon_type):
check.str_param(daemon_type, "daemon_type")
return GrapheneDaemonStatus(
get_daemon_status(self._instance, daemon_type, ignore_errors=True)
status_by_type = get_daemon_statuses(
self._instance, daemon_types=[daemon_type], ignore_errors=True
)
return GrapheneDaemonStatus(status_by_type[daemon_type])

def resolve_allDaemonStatuses(self, _graphene_info):
return [
GrapheneDaemonStatus(get_daemon_status(self._instance, daemon_type, ignore_errors=True))
for daemon_type in self._instance.get_required_daemon_types()
GrapheneDaemonStatus(daemon_status)
for daemon_status in get_daemon_statuses(
self._instance,
daemon_types=self._instance.get_required_daemon_types(),
ignore_errors=True,
).values()
]


Expand Down
6 changes: 3 additions & 3 deletions python_modules/dagster/dagster/daemon/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
all_daemons_live,
daemon_controller_from_instance,
debug_daemon_heartbeats,
get_daemon_status,
get_daemon_statuses,
)
from dagster.daemon.daemon import get_telemetry_daemon_session_id
from dagster.utils.interrupts import capture_interrupts, raise_interrupts_as
Expand Down Expand Up @@ -107,8 +107,8 @@ def debug_heartbeat_command():
)
def debug_heartbeat_dump_command():
with DagsterInstance.get() as instance:
for daemon_type in instance.get_required_daemon_types():
click.echo(get_daemon_status(instance, daemon_type))
for daemon_status in get_daemon_statuses(instance, instance.get_required_daemon_types()):
click.echo(daemon_status)


@click.group(
Expand Down
144 changes: 76 additions & 68 deletions python_modules/dagster/dagster/daemon/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,31 +179,39 @@ def _daemon_thread_healthy(self, daemon_type):
thread = self._daemon_threads[daemon_type]
return thread.is_alive()

def _daemon_heartbeat_healthy(self, daemon_type):
def _daemon_heartbeat_health(self):
now = time.time()
try:
is_healthy = get_daemon_status(
daemon_statuses_by_type = get_daemon_statuses(
self._instance,
daemon_type,
daemon_types=self._daemons.keys(),
heartbeat_interval_seconds=self._heartbeat_interval_seconds,
heartbeat_tolerance_seconds=self._heartbeat_tolerance_seconds,
ignore_errors=True,
).healthy
if is_healthy:
self._last_healthy_heartbeat_times[daemon_type] = now
return is_healthy
)
daemon_health_by_type = {
daemon_type: daemon_status.healthy
for (daemon_type, daemon_status) in daemon_statuses_by_type.items()
}

for daemon_type, is_daemon_healthy in daemon_health_by_type.items():
if is_daemon_healthy:
self._last_healthy_heartbeat_times[daemon_type] = now

return daemon_health_by_type
except Exception:
self._logger.warning(
"Error attempting to check {daemon_type} heartbeat:".format(
daemon_type=daemon_type,
),
"Error attempting to check daemon heartbeats",
exc_info=sys.exc_info,
)

return (
self._last_healthy_heartbeat_times[daemon_type]
> now - self._heartbeat_tolerance_seconds
)
return {
daemon_type: (
self._last_healthy_heartbeat_times[daemon_type]
> now - self._heartbeat_tolerance_seconds
)
for daemon_type in self._daemons.keys()
}

def check_daemon_threads(self):
failed_daemons = [
Expand All @@ -222,8 +230,8 @@ def check_daemon_threads(self):
def check_daemon_heartbeats(self):
failed_daemons = [
daemon_type
for daemon_type in self._daemon_threads
if not self._daemon_heartbeat_healthy(daemon_type)
for daemon_type, is_daemon_healthy in self._daemon_heartbeat_health().items()
if not is_daemon_healthy
]

if failed_daemons:
Expand Down Expand Up @@ -306,20 +314,17 @@ def all_daemons_healthy(
):
"""
True if all required daemons have had a recent heartbeat with no errors
"""

statuses = [
get_daemon_status(
instance,
daemon_type,
heartbeat_interval_seconds=heartbeat_interval_seconds,
heartbeat_tolerance_seconds=heartbeat_tolerance_seconds,
curr_time_seconds=curr_time_seconds,
)
for daemon_type in instance.get_required_daemon_types()
]
return all([status.healthy for status in statuses])
statuses_by_type = get_daemon_statuses(
instance,
daemon_types=instance.get_required_daemon_types(),
heartbeat_interval_seconds=heartbeat_interval_seconds,
heartbeat_tolerance_seconds=heartbeat_tolerance_seconds,
curr_time_seconds=curr_time_seconds,
)

return all(status.healthy for status in statuses_by_type.values())


def all_daemons_live(
Expand All @@ -332,23 +337,21 @@ def all_daemons_live(
True if all required daemons have had a recent heartbeat, regardless of if it contained errors.
"""

statuses = [
get_daemon_status(
instance,
daemon_type,
heartbeat_interval_seconds=heartbeat_interval_seconds,
heartbeat_tolerance_seconds=heartbeat_tolerance_seconds,
curr_time_seconds=curr_time_seconds,
ignore_errors=True,
)
for daemon_type in instance.get_required_daemon_types()
]
return all([status.healthy for status in statuses])
statuses_by_type = get_daemon_statuses(
instance,
daemon_types=instance.get_required_daemon_types(),
heartbeat_interval_seconds=heartbeat_interval_seconds,
heartbeat_tolerance_seconds=heartbeat_tolerance_seconds,
curr_time_seconds=curr_time_seconds,
ignore_errors=True,
)

return all(status.healthy for status in statuses_by_type.values())


def get_daemon_status(
def get_daemon_statuses(
instance,
daemon_type,
daemon_types,
curr_time_seconds=None,
ignore_errors=False,
heartbeat_interval_seconds=DEFAULT_HEARTBEAT_INTERVAL_SECONDS,
Expand All @@ -358,36 +361,41 @@ def get_daemon_status(
curr_time_seconds, "curr_time_seconds", default=pendulum.now("UTC").float_timestamp
)

# check if daemon required
if daemon_type not in instance.get_required_daemon_types():
return DaemonStatus(
daemon_type=daemon_type, required=False, healthy=None, last_heartbeat=None
)
daemon_statuses_by_type = {}

# check if daemon present
heartbeats = instance.get_daemon_heartbeats()
if daemon_type not in heartbeats:
return DaemonStatus(
daemon_type=daemon_type, required=True, healthy=False, last_heartbeat=None
)
for daemon_type in daemon_types:
# check if daemon is not required
if daemon_type not in instance.get_required_daemon_types():
daemon_statuses_by_type[daemon_type] = DaemonStatus(
daemon_type=daemon_type, required=False, healthy=None, last_heartbeat=None
)
else:
# check if daemon has a heartbeat
heartbeats = instance.get_daemon_heartbeats()
if daemon_type not in heartbeats:
daemon_statuses_by_type[daemon_type] = DaemonStatus(
daemon_type=daemon_type, required=True, healthy=False, last_heartbeat=None
)
else:
# check if daemon has sent a recent heartbeat
latest_heartbeat = heartbeats[daemon_type]
hearbeat_timestamp = latest_heartbeat.timestamp
maximum_tolerated_time = (
hearbeat_timestamp + heartbeat_interval_seconds + heartbeat_tolerance_seconds
)
healthy = curr_time_seconds <= maximum_tolerated_time

# check if daemon has sent a recent heartbeat
latest_heartbeat = heartbeats[daemon_type]
hearbeat_timestamp = latest_heartbeat.timestamp
maximum_tolerated_time = (
hearbeat_timestamp + heartbeat_interval_seconds + heartbeat_tolerance_seconds
)
healthy = curr_time_seconds <= maximum_tolerated_time
if not ignore_errors and latest_heartbeat.errors:
healthy = False

if not ignore_errors and latest_heartbeat.errors:
healthy = False
daemon_statuses_by_type[daemon_type] = DaemonStatus(
daemon_type=daemon_type,
required=True,
healthy=healthy,
last_heartbeat=heartbeats[daemon_type],
)

return DaemonStatus(
daemon_type=daemon_type,
required=True,
healthy=healthy,
last_heartbeat=heartbeats[daemon_type],
)
return daemon_statuses_by_type


def debug_daemon_heartbeats(instance):
Expand Down

0 comments on commit d98aa5c

Please sign in to comment.