Skip to content

Commit

Permalink
Configurable health check threshold for triggerer (#33089)
Browse files Browse the repository at this point in the history
Recently the Triggerer was forced to a health check threshold of
`trigger_heartbeat * 2.1` with a default of 5s for the heartbeat, this
generates a threshold of 10.5s, the previous threshold was 30s, this
leads to a very unstable situation where Triggerers are not given a
reasonable amount of time to heartbeat and their triggers are taken
from them.

This change allows the user to configure this threshold the same way we
do for the scheduler.
  • Loading branch information
o-nikolas committed Aug 6, 2023
1 parent bf41344 commit 6ec3b9a
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 72 deletions.
10 changes: 10 additions & 0 deletions airflow/config_templates/config.yml
Expand Up @@ -2258,6 +2258,16 @@ triggerer:
type: float
example: ~
default: "5"
triggerer_health_check_threshold:
description: |
If the last triggerer heartbeat happened more than triggerer_health_check_threshold
ago (in seconds), triggerer is considered unhealthy.
This is used by the health check in the "/health" endpoint and in `airflow jobs check` CLI
for TriggererJob.
version_added: 2.7.0
type: float
example: ~
default: "30"
kerberos:
description: ~
options:
Expand Down
2 changes: 2 additions & 0 deletions airflow/jobs/job.py
Expand Up @@ -129,6 +129,8 @@ def is_alive(self, grace_multiplier=2.1):
"""
if self.job_type == "SchedulerJob":
health_check_threshold: int = conf.getint("scheduler", "scheduler_health_check_threshold")
elif self.job_type == "TriggererJob":
health_check_threshold: int = conf.getint("triggerer", "triggerer_health_check_threshold")
else:
health_check_threshold: int = self.heartrate * grace_multiplier
return (
Expand Down
4 changes: 3 additions & 1 deletion airflow/jobs/triggerer_job_runner.py
Expand Up @@ -257,6 +257,8 @@ def __init__(
else:
raise ValueError(f"Capacity number {capacity} is invalid")

self.health_check_threshold = conf.getint("triggerer", "triggerer_health_check_threshold")

should_queue = True
if DISABLE_WRAPPER:
self.log.warning(
Expand Down Expand Up @@ -363,7 +365,7 @@ def _run_trigger_loop(self) -> None:

def load_triggers(self):
"""Query the database for the triggers we're supposed to be running and update the runner."""
Trigger.assign_unassigned(self.job.id, self.capacity, self.job.heartrate)
Trigger.assign_unassigned(self.job.id, self.capacity, self.health_check_threshold)
ids = Trigger.ids_for_triggerer(self.job.id)
self.trigger_runner.update_triggers(set(ids))

Expand Down
14 changes: 7 additions & 7 deletions airflow/models/trigger.py
Expand Up @@ -199,13 +199,15 @@ def ids_for_triggerer(cls, triggerer_id, session: Session = NEW_SESSION) -> list
@classmethod
@internal_api_call
@provide_session
def assign_unassigned(cls, triggerer_id, capacity, heartrate, session: Session = NEW_SESSION) -> None:
def assign_unassigned(
cls, triggerer_id, capacity, health_check_threshold, session: Session = NEW_SESSION
) -> None:
"""
Assign unassigned triggers based on a number of conditions.
Takes a triggerer_id, the capacity for that triggerer and the Triggerer job heartrate,
and assigns unassigned triggers until that capacity is reached, or there are no more
unassigned triggers.
Takes a triggerer_id, the capacity for that triggerer and the Triggerer job heartrate
health check threshold, and assigns unassigned triggers until that capacity is reached,
or there are no more unassigned triggers.
"""
from airflow.jobs.job import Job # To avoid circular import

Expand All @@ -214,9 +216,7 @@ def assign_unassigned(cls, triggerer_id, capacity, heartrate, session: Session =

if capacity <= 0:
return
# we multiply heartrate by a grace_multiplier to give the triggerer
# a chance to heartbeat before we consider it dead
health_check_threshold = heartrate * 2.1

alive_triggerer_ids = session.scalars(
select(Job.id).where(
Job.end_date.is_(None),
Expand Down
9 changes: 5 additions & 4 deletions tests/jobs/test_base_job.py
Expand Up @@ -21,7 +21,7 @@
import sys
from unittest.mock import ANY, Mock, patch

from pytest import raises
import pytest
from sqlalchemy.exc import OperationalError

from airflow.executors.sequential_executor import SequentialExecutor
Expand Down Expand Up @@ -84,7 +84,7 @@ def abort():

job = Job()
job_runner = MockJobRunner(job=job, func=abort)
with raises(RuntimeError):
with pytest.raises(RuntimeError):
run_job(job=job, execute_callable=job_runner._execute)

assert job.state == State.FAILED
Expand Down Expand Up @@ -148,8 +148,9 @@ def test_is_alive(self):
job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=10)
assert job.is_alive() is False, "Completed jobs even with recent heartbeat should not be alive"

def test_is_alive_scheduler(self):
job = Job(heartrate=10, state=State.RUNNING, job_type="SchedulerJob")
@pytest.mark.parametrize("job_type", ["SchedulerJob", "TriggererJob"])
def test_is_alive_scheduler(self, job_type):
job = Job(heartrate=10, state=State.RUNNING, job_type=job_type)
assert job.is_alive() is True

job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=20)
Expand Down
86 changes: 26 additions & 60 deletions tests/models/test_trigger.py
Expand Up @@ -141,10 +141,11 @@ def test_assign_unassigned(session, create_task_instance):
"""
Tests that unassigned triggers of all appropriate states are assigned.
"""
time_now = timezone.utcnow()
triggerer_heartrate = 10
finished_triggerer = Job(heartrate=triggerer_heartrate, state=State.SUCCESS)
TriggererJobRunner(finished_triggerer)
finished_triggerer.end_date = timezone.utcnow() - datetime.timedelta(hours=1)
finished_triggerer.end_date = time_now - datetime.timedelta(hours=1)
session.add(finished_triggerer)
assert not finished_triggerer.is_alive()
healthy_triggerer = Job(heartrate=triggerer_heartrate, state=State.RUNNING)
Expand All @@ -155,22 +156,37 @@ def test_assign_unassigned(session, create_task_instance):
TriggererJobRunner(new_triggerer)
session.add(new_triggerer)
assert new_triggerer.is_alive()
# This trigger's last heartbeat is older than the check threshold, expect
# its triggers to be taken by other healthy triggerers below
unhealthy_triggerer = Job(
heartrate=triggerer_heartrate,
state=State.RUNNING,
latest_heartbeat=time_now - datetime.timedelta(seconds=100),
)
TriggererJobRunner(unhealthy_triggerer)
session.add(unhealthy_triggerer)
# Triggerer is not healtht, its last heartbeat was too long ago
assert not unhealthy_triggerer.is_alive()
session.commit()
trigger_on_healthy_triggerer = Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
trigger_on_healthy_triggerer.id = 1
trigger_on_healthy_triggerer.triggerer_id = healthy_triggerer.id
trigger_on_unhealthy_triggerer = Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
trigger_on_unhealthy_triggerer.id = 2
trigger_on_unhealthy_triggerer.triggerer_id = unhealthy_triggerer.id
trigger_on_killed_triggerer = Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
trigger_on_killed_triggerer.id = 2
trigger_on_killed_triggerer.id = 3
trigger_on_killed_triggerer.triggerer_id = finished_triggerer.id
trigger_unassigned_to_triggerer = Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
trigger_unassigned_to_triggerer.id = 3
trigger_unassigned_to_triggerer.id = 4
assert trigger_unassigned_to_triggerer.triggerer_id is None
session.add(trigger_on_healthy_triggerer)
session.add(trigger_on_unhealthy_triggerer)
session.add(trigger_on_killed_triggerer)
session.add(trigger_unassigned_to_triggerer)
session.commit()
assert session.query(Trigger).count() == 3
Trigger.assign_unassigned(new_triggerer.id, 100, session=session, heartrate=triggerer_heartrate)
assert session.query(Trigger).count() == 4
Trigger.assign_unassigned(new_triggerer.id, 100, health_check_threshold=30)
session.expire_all()
# Check that trigger on killed triggerer and unassigned trigger are assigned to new triggerer
assert (
Expand All @@ -186,61 +202,11 @@ def test_assign_unassigned(session, create_task_instance):
session.query(Trigger).filter(Trigger.id == trigger_on_healthy_triggerer.id).one().triggerer_id
== healthy_triggerer.id
)


@pytest.mark.parametrize("check_triggerer_heartrate", [10, 60, 300])
def test_assign_unassigned_missing_heartbeat(session, create_task_instance, check_triggerer_heartrate):
"""
Tests that the triggers assigned to a dead triggers are considered as unassigned
and they are assigned to an alive triggerer.
"""
import time_machine

block_triggerer_heartrate = 9999
with time_machine.travel(datetime.datetime.utcnow(), tick=False) as t:
first_triggerer = Job(heartrate=block_triggerer_heartrate, state=State.RUNNING)
TriggererJobRunner(first_triggerer)
session.add(first_triggerer)
assert first_triggerer.is_alive()
second_triggerer = Job(heartrate=block_triggerer_heartrate, state=State.RUNNING)
TriggererJobRunner(second_triggerer)
session.add(second_triggerer)
assert second_triggerer.is_alive()
session.commit()
trigger_on_first_triggerer = Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
trigger_on_first_triggerer.id = 1
trigger_on_first_triggerer.triggerer_id = first_triggerer.id
trigger_on_second_triggerer = Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
trigger_on_second_triggerer.id = 2
trigger_on_second_triggerer.triggerer_id = second_triggerer.id
session.add(trigger_on_first_triggerer)
session.add(trigger_on_second_triggerer)
session.commit()
assert session.query(Trigger).count() == 2
triggers_ids = [
(first_triggerer.id, second_triggerer.id),
(first_triggerer.id, second_triggerer.id),
(first_triggerer.id, second_triggerer.id),
# Check that after more than 2.1 heartrates, the first triggerer is considered dead
# and the first trigger is assigned to the second triggerer
(second_triggerer.id, second_triggerer.id),
]
for i in range(4):
Trigger.assign_unassigned(
second_triggerer.id, 100, session=session, heartrate=check_triggerer_heartrate
)
session.expire_all()
# Check that trigger on killed triggerer and unassigned trigger are assigned to new triggerer
assert (
session.query(Trigger).filter(Trigger.id == trigger_on_first_triggerer.id).one().triggerer_id
== triggers_ids[i][0]
)
assert (
session.query(Trigger).filter(Trigger.id == trigger_on_second_triggerer.id).one().triggerer_id
== triggers_ids[i][1]
)
t.shift(datetime.timedelta(seconds=check_triggerer_heartrate))
second_triggerer.latest_heartbeat += datetime.timedelta(seconds=check_triggerer_heartrate)
# Check that trigger on unhealthy triggerer is assigned to new triggerer
assert (
session.query(Trigger).filter(Trigger.id == trigger_on_unhealthy_triggerer.id).one().triggerer_id
== new_triggerer.id
)


def test_get_sorted_triggers(session, create_task_instance):
Expand Down

0 comments on commit 6ec3b9a

Please sign in to comment.