Skip to content

Commit

Permalink
Add metric triggerer_heartbeat (#33320)
Browse files Browse the repository at this point in the history
* Add metric `triggerer_heartbeat`

* Fix copy paste
  • Loading branch information
vincbeck committed Aug 11, 2023
1 parent 23d5424 commit 8c43fc5
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 1 deletion.
7 changes: 6 additions & 1 deletion airflow/jobs/triggerer_job_runner.py
Expand Up @@ -31,6 +31,7 @@
from typing import TYPE_CHECKING

from sqlalchemy import func
from sqlalchemy.orm import Session

from airflow.configuration import conf
from airflow.jobs.base_job_runner import BaseJobRunner
Expand All @@ -54,7 +55,7 @@
ctx_trigger_id,
)
from airflow.utils.module_loading import import_string
from airflow.utils.session import provide_session
from airflow.utils.session import NEW_SESSION, provide_session

if TYPE_CHECKING:
from airflow.models import TaskInstance
Expand Down Expand Up @@ -280,6 +281,10 @@ def __init__(
# Set up runner async thread
self.trigger_runner = TriggerRunner()

@provide_session
def heartbeat_callback(self, session: Session = NEW_SESSION) -> None:
Stats.incr("triggerer_heartbeat", 1, 1)

def register_signals(self) -> None:
"""Register signals that stop child processes."""
signal.signal(signal.SIGINT, self._exit_gracefully)
Expand Down
Expand Up @@ -182,6 +182,7 @@ Name Descripti
``task_restored_to_dag.<dag_id>`` Number of tasks restored for a given dag (i.e. task instance which was
previously in REMOVED state in the DB is added to DAG file)
``task_instance_created_<operator_name>`` Number of tasks instances created for a given Operator
``triggerer_heartbeat`` Triggerer heartbeats
``triggers.blocked_main_thread`` Number of triggers that blocked the main thread (likely due to not being
fully asynchronous)
``triggers.failed`` Number of triggers that errored before they could fire an event
Expand Down

0 comments on commit 8c43fc5

Please sign in to comment.